diff --git a/lib/cli.js b/lib/cli.js index 5fc479bf..f510a4d2 100644 --- a/lib/cli.js +++ b/lib/cli.js @@ -28,10 +28,6 @@ program , 'name (or os.hostname())' , String , os.hostname()) - .option('-t, --restart-threshold ' - , 'restart worker only if it stays alive for longer than this' - , Number - , 10000) .option('--min-port ' , 'minimum port number for worker use' , Number @@ -57,8 +53,6 @@ program require('./roles/provider')({ name: options.name - , restartThreshold: options.restartThreshold - , restartTimeout: 1000 , killTimeout: 10000 , ports: cliutil.range(options.minPort, options.maxPort) , filter: function(device) { diff --git a/lib/roles/provider.js b/lib/roles/provider.js index 91ca3a5d..8c6327d4 100644 --- a/lib/roles/provider.js +++ b/lib/roles/provider.js @@ -9,12 +9,12 @@ var _ = require('lodash') var logger = require('../util/logger') var wire = require('../wire') var wireutil = require('../wire/util') +var procutil = require('../util/procutil') module.exports = function(options) { var log = logger.createLogger('provider') var client = Promise.promisifyAll(adb.createClient()) - var workers = Object.create(null) - var tracker = new events.EventEmitter() + var workers = {} var lists = { all: [] , ready: [] @@ -58,300 +58,243 @@ module.exports = function(options) { push.connect(endpoint) }) - tracker.on('add', function(device) { - lists.all.push(device.id) - push.send([ - wireutil.global - , wireutil.envelope(new wire.DevicePresentMessage( - device.id - , options.name - , wireutil.toDeviceStatus(device.type) - )) - ]) - maybeConnect(device) - }) + // Track and manage devices + client.trackDevicesAsync().then(function(tracker) { + log.info('Tracking devices') - tracker.on('change', function(device) { - push.send([ - wireutil.global - , wireutil.envelope(new wire.DeviceStatusMessage( - device.id - , wireutil.toDeviceStatus(device.type) - )) - ]) - maybeConnect(device) || maybeDisconnect(device) - }) - - tracker.on('remove', function(device) { - _.pull(lists.all, device.id) - push.send([ - wireutil.global - , wireutil.envelope(new wire.DeviceAbsentMessage( - device.id - )) - ]) - maybeDisconnect(device) - }) - - client.trackDevicesAsync() - .then(function(unfilteredTracker) { - log.info('Tracking devices') - - unfilteredTracker.on('add', function(device) { - if (isWantedDevice(device)) { - log.info('Found device "%s" (%s)', device.id, device.type) - tracker.emit('add', device) + // Helper for ignoring unwanted devices + function filterDevice(listener) { + if (options.filter) { + return function(device) { + if (options.filter(device)) { + listener(device) + } } - else { - log.info('Ignoring device "%s" (%s)', device.id, device.type) - } - }) - - unfilteredTracker.on('change', function(device) { - if (isWantedDevice(device)) { - log.info('Device "%s" is now "%s"', device.id, device.type) - tracker.emit('change', device) - } - }) - - unfilteredTracker.on('remove', function(device) { - if (isWantedDevice(device)) { - log.info('Lost device "%s" (%s)', device.id, device.type) - tracker.emit('remove', device) - } - }) - }) - - function pushDeviceStatus(device, type) { - push.send([wireutil.global, - wireutil.makeDeviceStatusMessage(device.id, type, options.name)]) - } - - function isWantedDevice(device) { - return options.filter ? options.filter(device) : true - } - - function isConnectable(device) { - switch (device.type) { - case 'device': - case 'emulator': - return true - default: - return false + } + return listener } - } - function isConnected(device) { - return workers[device.id] - } + // To make things easier, we're going to cheat a little, and make all + // device events go to their own EventEmitters. This way we can keep all + // device data in the same scope. + var flippedTracker = new events.EventEmitter() - function maybeConnect(device) { - if (isConnectable(device) && !isConnected(device)) { - log.info('Spawning device worker "%s"', device.id) - var ports = options.ports.splice(0, 2) - , proc = options.fork(device, ports) + tracker.on('add', filterDevice(function(device) { + log.info('Found device "%s" (%s)', device.id, device.type) - function messageListener(message) { - switch (message) { - case 'ready': - _.pull(lists.waiting, device.id) - lists.ready.push(device.id) - break - default: - log.warn( - 'Unknown message from worker "%s": "%s"' - , device.id - , message - ) - break - } - } - - function errorListener(err) { - log.error( - 'Device worker "%s" had an error: %s' - , device.id - , err.message - ) - } - - function exitListener(code, signal) { - var worker = cleanupWorker(device.id) - switch (code) { - case 0: - log.info('Device worker "%s" stopped cleanly', device.id) - break - case 143: // SIGTERM - log.warn('Device worker "%s" was killed before becoming operational' - , device.id) - break - default: - if (Date.now() - worker.started < options.restartThreshold) { - log.error( - 'Device worker "%s" died with exit code %d, ' + - 'NOT restarting due to threshold of %dms not being met' - , device.id - , code - , options.restartThreshold - ) - } - else { - log.error( - 'Device worker "%s" died with exit code %d, ' + - 'attempting to restart in %dms if device is still around' - , device.id - , code - , options.restartTimeout - ) - waitForAnyChanges(device) - .timeout(options.restartTimeout) - .then(function(device) { - // Most likely we lost the device, but our tracker didn't - // see it before the process died - log.warn( - 'Not restarting device worker "%s" due to tracker ' + - 'activity (but the change may cause it to start)' - , device.id - ) - }) - .catch(function() { - log.info('Restarting device worker "%s"', device.id) - maybeConnect(device) - }) - } - break - } - } - - proc.on('error', errorListener) - proc.on('exit', exitListener) - proc.on('message', messageListener) - - workers[device.id] = { - device: device - , proc: proc - , started: Date.now() - , ports: ports - , unbind: function() { - proc.removeListener('error', errorListener) - proc.removeListener('exit', exitListener) - proc.removeListener('message', messageListener) - } - } - - lists.waiting.push(device.id) + // Tell others we found a device + push.send([ + wireutil.global + , wireutil.envelope(new wire.DevicePresentMessage( + device.id + , options.name + , wireutil.toDeviceStatus(device.type) + )) + ]) + // Statistics + lists.all.push(device.id) delayedTotals() - return true - } - return false - } + var privateTracker = new events.EventEmitter() + , resolver = Promise.defer() + , timer + , worker - function maybeDisconnect(device) { - if (isConnected(device)) { - log.info('Releasing device worker "%s"', device.id) - gracefullyKillWorker(device.id) - return true - } - return false - } - - function waitForAnyChanges(device) { - var resolver = Promise.defer() - - function maybeResolve(otherDevice) { - if (otherDevice.id === device.id) { - resolver.resolve(otherDevice) + // When any event occurs on the added device + function deviceListener(type, device) { + // Okay, this is a bit unnecessary but it allows us to get rid of an + // ugly switch statement and return to the original style. + privateTracker.emit(type, device) } - } - tracker.on('add', maybeResolve) - tracker.on('change', maybeResolve) - tracker.on('remove', maybeResolve) + // When the added device changes + function changeListener(device) { + log.info('Device "%s" is now "%s"', device.id, device.type) - return resolver.promise.finally(function() { - tracker.removeListener('add', maybeResolve) - tracker.removeListener('change', maybeResolve) - tracker.removeListener('remove', maybeResolve) - }) - } + // Tell others the device changed + push.send([ + wireutil.global + , wireutil.envelope(new wire.DeviceStatusMessage( + device.id + , wireutil.toDeviceStatus(device.type) + )) + ]) - function tryKillWorker(id) { - var deferred = Promise.defer() - , worker = workers[id] + check(device) + } - function onExit() { - cleanupWorker(id) - log.info('Gracefully killed device worker "%s"', id) - deferred.resolve() - } + // When the added device gets removed + function removeListener(device) { + log.info('Lost device "%s" (%s)', device.id, device.type) - worker.unbind() - worker.proc.once('exit', onExit) - worker.proc.kill('SIGTERM') + clearTimeout(timer) + flippedTracker.removeListener(device.id, deviceListener) + _.pull(lists.all, device.id) + delayedTotals() - return deferred.promise.finally(function() { - worker.proc.removeListener('exit', onExit) - }) - } + // Tell others the device is gone + push.send([ + wireutil.global + , wireutil.envelope(new wire.DeviceAbsentMessage( + device.id + )) + ]) - function forceKillWorker(id) { - log.warn('Force killing device worker "%s"', id) + stop() + } - var deferred = Promise.defer() - , worker = workers[id] + // Check if we can do anything with the device + function check(device) { + clearTimeout(timer) + switch (device.type) { + case 'device': + case 'emulator': + timer = setTimeout(work, 100) + break + default: + stop() + break + } + } - function onExit() { - cleanupWorker(id) - log.warn('Force killed device worker "%s"', id) - deferred.resolve() - } + // Starts a device worker and keeps it alive + function work() { + return worker = workers[device.id] = spawn(device) + .then(function() { + log.info('Device worker "%s" has retired', device.id) + worker = workers[device.id] = null + }) + .catch(procutil.ExitError, function(err) { + log.info('Restarting device worker "%s"', device.id) + return Promise.delay(500) + .then(function() { + return work() + }) + }) + } - worker.unbind() - worker.proc.once('exit', onExit) - worker.proc.kill('SIGKILL') + // No more work required + function stop() { + if (worker) { + log.info('Shutting down device worker "%s"', device.id) + worker.cancel() + } + } - return deferred.promise.finally(function() { - worker.proc.removeListener('exit', onExit) - }) - } + // Spawn a device worker + function spawn(device) { + var ports = options.ports.splice(0, 2) + , proc = options.fork(device, ports) + , resolver = Promise.defer() - function gracefullyKillWorker(id) { - return tryKillWorker(id) - .timeout(options.killTimeout) - .catch(function() { - log.error('Device worker "%s" did not stop in time', id) - return forceKillWorker(id) - .timeout(options.killTimeout) - }) - } + function exitListener(code, signal) { + if (signal) { + log.warn( + 'Device worker "%s" was killed with signal %s, assuming ' + + 'deliberate action and not restarting' + , device.id + , signal + ) + resolver.resolve() + } + else if (code === 0) { + log.info('Device worker "%s" stopped cleanly', device.id) + resolver.resolve() + } + else { + log.error( + 'Device worker "%s" died with code %s' + , device.id + , code + ) + resolver.reject(new procutil.ExitError(code)) + } + } + + function errorListener(err) { + log.error( + 'Device worker "%s" had an error: %s' + , device.id + , err.message + ) + } + + function messageListener(message) { + switch (message) { + case 'ready': + _.pull(lists.waiting, device.id) + lists.ready.push(device.id) + break + default: + log.warn( + 'Unknown message from device worker "%s": "%s"' + , device.id + , message + ) + break + } + } + + proc.on('exit', exitListener) + proc.on('error', errorListener) + proc.on('message', messageListener) + + return resolver.promise + .finally(function() { + log.info('Cleaning up device worker "%s"', device.id) + + proc.removeListener('exit', exitListener) + proc.removeListener('error', errorListener) + proc.removeListener('message', messageListener) + + // Return used ports to the main pool + Array.prototype.push.apply(options.ports, ports) + + // Update lists + _.pull(lists.ready, device.id) + _.pull(lists.waiting, device.id) + }) + .cancellable() + .catch(Promise.CancellationError, function(err) { + log.info('Gracefully killing device worker "%s"', device.id) + return procutil.gracefullyKill(proc, options.killTimeout) + }) + .catch(Promise.TimeoutError, function(err) { + log.error( + 'Device worker "%s" did not stop in time: %s' + , device.id + , err.message + ) + }) + } + + flippedTracker.on(device.id, deviceListener) + privateTracker.on('change', changeListener) + privateTracker.on('remove', removeListener) + check(device) + })) + + tracker.on('change', filterDevice(function(device) { + flippedTracker.emit(device.id, 'change', device) + })) + + tracker.on('remove', filterDevice(function(device) { + flippedTracker.emit(device.id, 'remove', device) + })) + }) function gracefullyExit() { log.info('Stopping all workers') - Promise.all(Object.keys(workers).map(gracefullyKillWorker)) + Promise.all(Object.keys(workers).map(function(serial) { + return workers[serial].cancel() + })) .done(function() { log.info('All cleaned up') process.exit(0) }) } - function cleanupWorker(id) { - var worker = workers[id] - delete workers[id] - Array.prototype.push.apply(options.ports, worker.ports) - _.pull(lists.ready, id) - _.pull(lists.waiting, id) - push.send([ - wireutil.global - , wireutil.envelope(new wire.DeviceAbsentMessage( - id - )) - ]) - delayedTotals() - return worker - } - process.on('SIGINT', function(e) { log.info('Received SIGINT') gracefullyExit() diff --git a/lib/util/procutil.js b/lib/util/procutil.js index d363aa48..16779103 100644 --- a/lib/util/procutil.js +++ b/lib/util/procutil.js @@ -34,3 +34,28 @@ module.exports.fork = function() { }) }) } + +// Export +module.exports.gracefullyKill = function(proc, timeout) { + function killer(signal) { + var deferred = Promise.defer() + + function onExit() { + deferred.resolve() + } + + proc.once('exit', onExit) + proc.kill(signal) + + return deferred.promise.finally(function() { + proc.removeListener('exit', onExit) + }) + } + + return killer('SIGTERM') + .timeout(timeout) + .catch(function() { + return killer('SIGKILL') + .timeout(timeout) + }) +}