diff --git a/lib/cli.js b/lib/cli.js index 22b34df8..0e4e32b3 100644 --- a/lib/cli.js +++ b/lib/cli.js @@ -24,6 +24,10 @@ program , 'name (or os.hostname())' , String , os.hostname()) + .option('-t, --restart-threshold ' + , 'restart worker only if it stays alive for longer than this' + , Number + , 10000) .action(function() { var serials = cliutil.allUnknownArgs(arguments) , options = cliutil.lastArg(arguments) @@ -37,6 +41,9 @@ program require('./roles/provider')({ name: options.name + , restartThreshold: options.restartThreshold + , restartTimeout: 1000 + , killTimeout: 10000 , filter: function(device) { return serials.length === 0 || serials.indexOf(device.id) !== -1 } diff --git a/lib/roles/provider.js b/lib/roles/provider.js index 2624c29e..97a54f17 100644 --- a/lib/roles/provider.js +++ b/lib/roles/provider.js @@ -1,4 +1,5 @@ var path = require('path') +var events = require('events') var adb = require('adbkit') var Promise = require('bluebird') @@ -12,6 +13,7 @@ module.exports = function(options) { var log = logger.createLogger('provider') var client = Promise.promisifyAll(adb.createClient()) var workers = Object.create(null) + var tracker = new events.EventEmitter() // Output var push = zmq.socket('push') @@ -20,34 +22,46 @@ module.exports = function(options) { push.connect(endpoint) }) + tracker.on('add', function(device) { + pushDeviceStatus(device, device.type) + maybeConnect(device) + }) + + tracker.on('change', function(device) { + pushDeviceStatus(device, device.type) + maybeConnect(device) || maybeDisconnect(device) + }) + + tracker.on('remove', function(device) { + pushDeviceStatus(device, 'absent') + maybeDisconnect(device) + }) + client.trackDevicesAsync() - .then(function(tracker) { + .then(function(unfilteredTracker) { log.info('Tracking devices') - tracker.on('add', function(device) { + unfilteredTracker.on('add', function(device) { if (isWantedDevice(device)) { log.info('Found device "%s" (%s)', device.id, device.type) - pushDeviceStatus(device, device.type) - maybeConnect(device) + tracker.emit('add', device) } else { log.info('Ignoring device "%s" (%s)', device.id, device.type) } }) - tracker.on('change', function(device) { + unfilteredTracker.on('change', function(device) { if (isWantedDevice(device)) { log.info('Device "%s" is now "%s"', device.id, device.type) - pushDeviceStatus(device, device.type) - maybeConnect(device) || maybeDisconnect(device) + tracker.emit('change', device) } }) - tracker.on('remove', function(device) { + unfilteredTracker.on('remove', function(device) { if (isWantedDevice(device)) { log.info('Lost device "%s" (%s)', device.id, device.type) - pushDeviceStatus(device, 'absent') - maybeDisconnect(device) + tracker.emit('remove', device) } }) }) @@ -77,13 +91,15 @@ module.exports = function(options) { function maybeConnect(device) { if (isConnectable(device) && !isConnected(device)) { - log.info('Spawning worker for device "%s"', device.id) + log.info('Spawning device worker "%s"', device.id) var proc = options.fork(device) - proc.on('error', function(err) { + + function errorListener(err) { log.error('Device worker "%s" had an error: %s', device.id, err.message) - }) - proc.on('exit', function(code, signal) { + } + + function exitListener(code, signal) { var data = workers[device.id] delete workers[device.id] switch (code) { @@ -95,23 +111,54 @@ module.exports = function(options) { , device.id) break default: - log.error('Device worker "%s" had a dirty exit (code %d)', - device.id, code) - if (Date.now() - data.started < 10000) { - log.error('Device worker "%s" failed within 10 seconds of startup,' + - ' will not attempt to restart', device.id) + if (Date.now() - data.started < options.restartThreshold) { + log.error( + 'Device worker "%s" died with exit code %d, ' + + 'NOT restarting due to threshold of %dms not being met' + , device.id + , code + , options.restartThreshold + ) } else { - log.info('Restarting worker of "%s"', device.id) - maybeConnect(device) + log.error( + 'Device worker "%s" died with exit code %d, ' + + 'attempting to restart in %dms if device is still around' + , device.id + , code + , options.restartTimeout + ) + waitForAnyChanges(device) + .timeout(options.restartTimeout) + .then(function(device) { + // Most likely we lost the device, but our tracker didn't + // see it before the process died + log.warn( + 'Not restarting device worker "%s" due to tracker ' + + 'activity (but the change may cause it to start)' + , device.id + ) + }) + .catch(function() { + log.info('Restarting device worker "%s"', device.id) + maybeConnect(device) + }) } break } - }) + } + + proc.on('error', errorListener) + proc.on('exit', exitListener) + workers[device.id] = { device: device , proc: proc , started: Date.now() + , unbind: function() { + proc.removeListener('error', errorListener) + proc.removeListener('exit', exitListener) + } } return true } @@ -120,21 +167,44 @@ module.exports = function(options) { function maybeDisconnect(device) { if (isConnected(device)) { - log.info('Releasing worker of %s', device.id) + log.info('Releasing device worker "%s"', device.id) gracefullyKillWorker(device.id) return true } return false } + function waitForAnyChanges(device) { + var resolver = Promise.defer() + + function maybeResolve(otherDevice) { + if (otherDevice.id === device.id) { + resolver.resolve(otherDevice) + } + } + + tracker.on('add', maybeResolve) + tracker.on('change', maybeResolve) + tracker.on('remove', maybeResolve) + + return resolver.promise.finally(function() { + tracker.removeListener('add', maybeResolve) + tracker.removeListener('change', maybeResolve) + tracker.removeListener('remove', maybeResolve) + }) + } + function tryKillWorker(id) { var deferred = Promise.defer(), worker = workers[id] function onExit() { + delete workers[id] + log.info('Gracefully killed device worker "%s"', id) deferred.resolve() } + worker.unbind() worker.proc.once('exit', onExit) worker.proc.kill('SIGTERM') @@ -144,15 +214,18 @@ module.exports = function(options) { } function forceKillWorker(id) { - log.warn('Force killing worker of device "%s"', id) + log.warn('Force killing device worker "%s"', id) var deferred = Promise.defer() , worker = workers[id] function onExit() { + delete workers[id] + log.warn('Force killed device worker "%s"', id) deferred.resolve() } + worker.unbind() worker.proc.once('exit', onExit) worker.proc.kill('SIGKILL') @@ -163,12 +236,11 @@ module.exports = function(options) { function gracefullyKillWorker(id) { return tryKillWorker(id) - .timeout(10000) + .timeout(options.killTimeout) .catch(function() { log.error('Device worker "%s" did not stop in time', id) return forceKillWorker(id) - .timeout(10000) - .then(deferred.resolve) + .timeout(options.killTimeout) }) }