From 816904b9bbf170f767f9f5473cf18dd97777dd1a Mon Sep 17 00:00:00 2001 From: Simo Kinnunen Date: Thu, 30 Jan 2014 12:15:22 +0900 Subject: [PATCH] React to device reboots more gracefully. Previously, the provider would attempt to restart the worker as soon as an essential service died, even if the device was soon released after. Now that doesn't happen. --- lib/cli.js | 7 +++ lib/roles/provider.js | 126 +++++++++++++++++++++++++++++++++--------- 2 files changed, 106 insertions(+), 27 deletions(-) diff --git a/lib/cli.js b/lib/cli.js index 22b34df8..0e4e32b3 100644 --- a/lib/cli.js +++ b/lib/cli.js @@ -24,6 +24,10 @@ program , 'name (or os.hostname())' , String , os.hostname()) + .option('-t, --restart-threshold ' + , 'restart worker only if it stays alive for longer than this' + , Number + , 10000) .action(function() { var serials = cliutil.allUnknownArgs(arguments) , options = cliutil.lastArg(arguments) @@ -37,6 +41,9 @@ program require('./roles/provider')({ name: options.name + , restartThreshold: options.restartThreshold + , restartTimeout: 1000 + , killTimeout: 10000 , filter: function(device) { return serials.length === 0 || serials.indexOf(device.id) !== -1 } diff --git a/lib/roles/provider.js b/lib/roles/provider.js index 2624c29e..97a54f17 100644 --- a/lib/roles/provider.js +++ b/lib/roles/provider.js @@ -1,4 +1,5 @@ var path = require('path') +var events = require('events') var adb = require('adbkit') var Promise = require('bluebird') @@ -12,6 +13,7 @@ module.exports = function(options) { var log = logger.createLogger('provider') var client = Promise.promisifyAll(adb.createClient()) var workers = Object.create(null) + var tracker = new events.EventEmitter() // Output var push = zmq.socket('push') @@ -20,34 +22,46 @@ module.exports = function(options) { push.connect(endpoint) }) + tracker.on('add', function(device) { + pushDeviceStatus(device, device.type) + maybeConnect(device) + }) + + tracker.on('change', function(device) { + pushDeviceStatus(device, device.type) + maybeConnect(device) || maybeDisconnect(device) + }) + + tracker.on('remove', function(device) { + pushDeviceStatus(device, 'absent') + maybeDisconnect(device) + }) + client.trackDevicesAsync() - .then(function(tracker) { + .then(function(unfilteredTracker) { log.info('Tracking devices') - tracker.on('add', function(device) { + unfilteredTracker.on('add', function(device) { if (isWantedDevice(device)) { log.info('Found device "%s" (%s)', device.id, device.type) - pushDeviceStatus(device, device.type) - maybeConnect(device) + tracker.emit('add', device) } else { log.info('Ignoring device "%s" (%s)', device.id, device.type) } }) - tracker.on('change', function(device) { + unfilteredTracker.on('change', function(device) { if (isWantedDevice(device)) { log.info('Device "%s" is now "%s"', device.id, device.type) - pushDeviceStatus(device, device.type) - maybeConnect(device) || maybeDisconnect(device) + tracker.emit('change', device) } }) - tracker.on('remove', function(device) { + unfilteredTracker.on('remove', function(device) { if (isWantedDevice(device)) { log.info('Lost device "%s" (%s)', device.id, device.type) - pushDeviceStatus(device, 'absent') - maybeDisconnect(device) + tracker.emit('remove', device) } }) }) @@ -77,13 +91,15 @@ module.exports = function(options) { function maybeConnect(device) { if (isConnectable(device) && !isConnected(device)) { - log.info('Spawning worker for device "%s"', device.id) + log.info('Spawning device worker "%s"', device.id) var proc = options.fork(device) - proc.on('error', function(err) { + + function errorListener(err) { log.error('Device worker "%s" had an error: %s', device.id, err.message) - }) - proc.on('exit', function(code, signal) { + } + + function exitListener(code, signal) { var data = workers[device.id] delete workers[device.id] switch (code) { @@ -95,23 +111,54 @@ module.exports = function(options) { , device.id) break default: - log.error('Device worker "%s" had a dirty exit (code %d)', - device.id, code) - if (Date.now() - data.started < 10000) { - log.error('Device worker "%s" failed within 10 seconds of startup,' + - ' will not attempt to restart', device.id) + if (Date.now() - data.started < options.restartThreshold) { + log.error( + 'Device worker "%s" died with exit code %d, ' + + 'NOT restarting due to threshold of %dms not being met' + , device.id + , code + , options.restartThreshold + ) } else { - log.info('Restarting worker of "%s"', device.id) - maybeConnect(device) + log.error( + 'Device worker "%s" died with exit code %d, ' + + 'attempting to restart in %dms if device is still around' + , device.id + , code + , options.restartTimeout + ) + waitForAnyChanges(device) + .timeout(options.restartTimeout) + .then(function(device) { + // Most likely we lost the device, but our tracker didn't + // see it before the process died + log.warn( + 'Not restarting device worker "%s" due to tracker ' + + 'activity (but the change may cause it to start)' + , device.id + ) + }) + .catch(function() { + log.info('Restarting device worker "%s"', device.id) + maybeConnect(device) + }) } break } - }) + } + + proc.on('error', errorListener) + proc.on('exit', exitListener) + workers[device.id] = { device: device , proc: proc , started: Date.now() + , unbind: function() { + proc.removeListener('error', errorListener) + proc.removeListener('exit', exitListener) + } } return true } @@ -120,21 +167,44 @@ module.exports = function(options) { function maybeDisconnect(device) { if (isConnected(device)) { - log.info('Releasing worker of %s', device.id) + log.info('Releasing device worker "%s"', device.id) gracefullyKillWorker(device.id) return true } return false } + function waitForAnyChanges(device) { + var resolver = Promise.defer() + + function maybeResolve(otherDevice) { + if (otherDevice.id === device.id) { + resolver.resolve(otherDevice) + } + } + + tracker.on('add', maybeResolve) + tracker.on('change', maybeResolve) + tracker.on('remove', maybeResolve) + + return resolver.promise.finally(function() { + tracker.removeListener('add', maybeResolve) + tracker.removeListener('change', maybeResolve) + tracker.removeListener('remove', maybeResolve) + }) + } + function tryKillWorker(id) { var deferred = Promise.defer(), worker = workers[id] function onExit() { + delete workers[id] + log.info('Gracefully killed device worker "%s"', id) deferred.resolve() } + worker.unbind() worker.proc.once('exit', onExit) worker.proc.kill('SIGTERM') @@ -144,15 +214,18 @@ module.exports = function(options) { } function forceKillWorker(id) { - log.warn('Force killing worker of device "%s"', id) + log.warn('Force killing device worker "%s"', id) var deferred = Promise.defer() , worker = workers[id] function onExit() { + delete workers[id] + log.warn('Force killed device worker "%s"', id) deferred.resolve() } + worker.unbind() worker.proc.once('exit', onExit) worker.proc.kill('SIGKILL') @@ -163,12 +236,11 @@ module.exports = function(options) { function gracefullyKillWorker(id) { return tryKillWorker(id) - .timeout(10000) + .timeout(options.killTimeout) .catch(function() { log.error('Device worker "%s" did not stop in time', id) return forceKillWorker(id) - .timeout(10000) - .then(deferred.resolve) + .timeout(options.killTimeout) }) }