1
0
Fork 0
mirror of https://github.com/openstf/stf synced 2025-10-04 18:29:17 +02:00

New rewrite of provider. By moving to promises we should no longer have issues with double-quits.

This commit is contained in:
Simo Kinnunen 2014-02-04 15:42:52 +09:00
parent 8a23b37deb
commit adaf3da228
3 changed files with 232 additions and 270 deletions

View file

@ -28,10 +28,6 @@ program
, 'name (or os.hostname())' , 'name (or os.hostname())'
, String , String
, os.hostname()) , os.hostname())
.option('-t, --restart-threshold <ms>'
, 'restart worker only if it stays alive for longer than this'
, Number
, 10000)
.option('--min-port <port>' .option('--min-port <port>'
, 'minimum port number for worker use' , 'minimum port number for worker use'
, Number , Number
@ -57,8 +53,6 @@ program
require('./roles/provider')({ require('./roles/provider')({
name: options.name name: options.name
, restartThreshold: options.restartThreshold
, restartTimeout: 1000
, killTimeout: 10000 , killTimeout: 10000
, ports: cliutil.range(options.minPort, options.maxPort) , ports: cliutil.range(options.minPort, options.maxPort)
, filter: function(device) { , filter: function(device) {

View file

@ -9,12 +9,12 @@ var _ = require('lodash')
var logger = require('../util/logger') var logger = require('../util/logger')
var wire = require('../wire') var wire = require('../wire')
var wireutil = require('../wire/util') var wireutil = require('../wire/util')
var procutil = require('../util/procutil')
module.exports = function(options) { module.exports = function(options) {
var log = logger.createLogger('provider') var log = logger.createLogger('provider')
var client = Promise.promisifyAll(adb.createClient()) var client = Promise.promisifyAll(adb.createClient())
var workers = Object.create(null) var workers = {}
var tracker = new events.EventEmitter()
var lists = { var lists = {
all: [] all: []
, ready: [] , ready: []
@ -58,300 +58,243 @@ module.exports = function(options) {
push.connect(endpoint) push.connect(endpoint)
}) })
tracker.on('add', function(device) { // Track and manage devices
lists.all.push(device.id) client.trackDevicesAsync().then(function(tracker) {
push.send([ log.info('Tracking devices')
wireutil.global
, wireutil.envelope(new wire.DevicePresentMessage(
device.id
, options.name
, wireutil.toDeviceStatus(device.type)
))
])
maybeConnect(device)
})
tracker.on('change', function(device) { // Helper for ignoring unwanted devices
push.send([ function filterDevice(listener) {
wireutil.global if (options.filter) {
, wireutil.envelope(new wire.DeviceStatusMessage( return function(device) {
device.id if (options.filter(device)) {
, wireutil.toDeviceStatus(device.type) listener(device)
)) }
])
maybeConnect(device) || maybeDisconnect(device)
})
tracker.on('remove', function(device) {
_.pull(lists.all, device.id)
push.send([
wireutil.global
, wireutil.envelope(new wire.DeviceAbsentMessage(
device.id
))
])
maybeDisconnect(device)
})
client.trackDevicesAsync()
.then(function(unfilteredTracker) {
log.info('Tracking devices')
unfilteredTracker.on('add', function(device) {
if (isWantedDevice(device)) {
log.info('Found device "%s" (%s)', device.id, device.type)
tracker.emit('add', device)
} }
else { }
log.info('Ignoring device "%s" (%s)', device.id, device.type) return listener
}
})
unfilteredTracker.on('change', function(device) {
if (isWantedDevice(device)) {
log.info('Device "%s" is now "%s"', device.id, device.type)
tracker.emit('change', device)
}
})
unfilteredTracker.on('remove', function(device) {
if (isWantedDevice(device)) {
log.info('Lost device "%s" (%s)', device.id, device.type)
tracker.emit('remove', device)
}
})
})
function pushDeviceStatus(device, type) {
push.send([wireutil.global,
wireutil.makeDeviceStatusMessage(device.id, type, options.name)])
}
function isWantedDevice(device) {
return options.filter ? options.filter(device) : true
}
function isConnectable(device) {
switch (device.type) {
case 'device':
case 'emulator':
return true
default:
return false
} }
}
function isConnected(device) { // To make things easier, we're going to cheat a little, and make all
return workers[device.id] // device events go to their own EventEmitters. This way we can keep all
} // device data in the same scope.
var flippedTracker = new events.EventEmitter()
function maybeConnect(device) { tracker.on('add', filterDevice(function(device) {
if (isConnectable(device) && !isConnected(device)) { log.info('Found device "%s" (%s)', device.id, device.type)
log.info('Spawning device worker "%s"', device.id)
var ports = options.ports.splice(0, 2)
, proc = options.fork(device, ports)
function messageListener(message) { // Tell others we found a device
switch (message) { push.send([
case 'ready': wireutil.global
_.pull(lists.waiting, device.id) , wireutil.envelope(new wire.DevicePresentMessage(
lists.ready.push(device.id) device.id
break , options.name
default: , wireutil.toDeviceStatus(device.type)
log.warn( ))
'Unknown message from worker "%s": "%s"' ])
, device.id
, message
)
break
}
}
function errorListener(err) {
log.error(
'Device worker "%s" had an error: %s'
, device.id
, err.message
)
}
function exitListener(code, signal) {
var worker = cleanupWorker(device.id)
switch (code) {
case 0:
log.info('Device worker "%s" stopped cleanly', device.id)
break
case 143: // SIGTERM
log.warn('Device worker "%s" was killed before becoming operational'
, device.id)
break
default:
if (Date.now() - worker.started < options.restartThreshold) {
log.error(
'Device worker "%s" died with exit code %d, ' +
'NOT restarting due to threshold of %dms not being met'
, device.id
, code
, options.restartThreshold
)
}
else {
log.error(
'Device worker "%s" died with exit code %d, ' +
'attempting to restart in %dms if device is still around'
, device.id
, code
, options.restartTimeout
)
waitForAnyChanges(device)
.timeout(options.restartTimeout)
.then(function(device) {
// Most likely we lost the device, but our tracker didn't
// see it before the process died
log.warn(
'Not restarting device worker "%s" due to tracker ' +
'activity (but the change may cause it to start)'
, device.id
)
})
.catch(function() {
log.info('Restarting device worker "%s"', device.id)
maybeConnect(device)
})
}
break
}
}
proc.on('error', errorListener)
proc.on('exit', exitListener)
proc.on('message', messageListener)
workers[device.id] = {
device: device
, proc: proc
, started: Date.now()
, ports: ports
, unbind: function() {
proc.removeListener('error', errorListener)
proc.removeListener('exit', exitListener)
proc.removeListener('message', messageListener)
}
}
lists.waiting.push(device.id)
// Statistics
lists.all.push(device.id)
delayedTotals() delayedTotals()
return true var privateTracker = new events.EventEmitter()
} , resolver = Promise.defer()
return false , timer
} , worker
function maybeDisconnect(device) { // When any event occurs on the added device
if (isConnected(device)) { function deviceListener(type, device) {
log.info('Releasing device worker "%s"', device.id) // Okay, this is a bit unnecessary but it allows us to get rid of an
gracefullyKillWorker(device.id) // ugly switch statement and return to the original style.
return true privateTracker.emit(type, device)
}
return false
}
function waitForAnyChanges(device) {
var resolver = Promise.defer()
function maybeResolve(otherDevice) {
if (otherDevice.id === device.id) {
resolver.resolve(otherDevice)
} }
}
tracker.on('add', maybeResolve) // When the added device changes
tracker.on('change', maybeResolve) function changeListener(device) {
tracker.on('remove', maybeResolve) log.info('Device "%s" is now "%s"', device.id, device.type)
return resolver.promise.finally(function() { // Tell others the device changed
tracker.removeListener('add', maybeResolve) push.send([
tracker.removeListener('change', maybeResolve) wireutil.global
tracker.removeListener('remove', maybeResolve) , wireutil.envelope(new wire.DeviceStatusMessage(
}) device.id
} , wireutil.toDeviceStatus(device.type)
))
])
function tryKillWorker(id) { check(device)
var deferred = Promise.defer() }
, worker = workers[id]
function onExit() { // When the added device gets removed
cleanupWorker(id) function removeListener(device) {
log.info('Gracefully killed device worker "%s"', id) log.info('Lost device "%s" (%s)', device.id, device.type)
deferred.resolve()
}
worker.unbind() clearTimeout(timer)
worker.proc.once('exit', onExit) flippedTracker.removeListener(device.id, deviceListener)
worker.proc.kill('SIGTERM') _.pull(lists.all, device.id)
delayedTotals()
return deferred.promise.finally(function() { // Tell others the device is gone
worker.proc.removeListener('exit', onExit) push.send([
}) wireutil.global
} , wireutil.envelope(new wire.DeviceAbsentMessage(
device.id
))
])
function forceKillWorker(id) { stop()
log.warn('Force killing device worker "%s"', id) }
var deferred = Promise.defer() // Check if we can do anything with the device
, worker = workers[id] function check(device) {
clearTimeout(timer)
switch (device.type) {
case 'device':
case 'emulator':
timer = setTimeout(work, 100)
break
default:
stop()
break
}
}
function onExit() { // Starts a device worker and keeps it alive
cleanupWorker(id) function work() {
log.warn('Force killed device worker "%s"', id) return worker = workers[device.id] = spawn(device)
deferred.resolve() .then(function() {
} log.info('Device worker "%s" has retired', device.id)
worker = workers[device.id] = null
})
.catch(procutil.ExitError, function(err) {
log.info('Restarting device worker "%s"', device.id)
return Promise.delay(500)
.then(function() {
return work()
})
})
}
worker.unbind() // No more work required
worker.proc.once('exit', onExit) function stop() {
worker.proc.kill('SIGKILL') if (worker) {
log.info('Shutting down device worker "%s"', device.id)
worker.cancel()
}
}
return deferred.promise.finally(function() { // Spawn a device worker
worker.proc.removeListener('exit', onExit) function spawn(device) {
}) var ports = options.ports.splice(0, 2)
} , proc = options.fork(device, ports)
, resolver = Promise.defer()
function gracefullyKillWorker(id) { function exitListener(code, signal) {
return tryKillWorker(id) if (signal) {
.timeout(options.killTimeout) log.warn(
.catch(function() { 'Device worker "%s" was killed with signal %s, assuming ' +
log.error('Device worker "%s" did not stop in time', id) 'deliberate action and not restarting'
return forceKillWorker(id) , device.id
.timeout(options.killTimeout) , signal
}) )
} resolver.resolve()
}
else if (code === 0) {
log.info('Device worker "%s" stopped cleanly', device.id)
resolver.resolve()
}
else {
log.error(
'Device worker "%s" died with code %s'
, device.id
, code
)
resolver.reject(new procutil.ExitError(code))
}
}
function errorListener(err) {
log.error(
'Device worker "%s" had an error: %s'
, device.id
, err.message
)
}
function messageListener(message) {
switch (message) {
case 'ready':
_.pull(lists.waiting, device.id)
lists.ready.push(device.id)
break
default:
log.warn(
'Unknown message from device worker "%s": "%s"'
, device.id
, message
)
break
}
}
proc.on('exit', exitListener)
proc.on('error', errorListener)
proc.on('message', messageListener)
return resolver.promise
.finally(function() {
log.info('Cleaning up device worker "%s"', device.id)
proc.removeListener('exit', exitListener)
proc.removeListener('error', errorListener)
proc.removeListener('message', messageListener)
// Return used ports to the main pool
Array.prototype.push.apply(options.ports, ports)
// Update lists
_.pull(lists.ready, device.id)
_.pull(lists.waiting, device.id)
})
.cancellable()
.catch(Promise.CancellationError, function(err) {
log.info('Gracefully killing device worker "%s"', device.id)
return procutil.gracefullyKill(proc, options.killTimeout)
})
.catch(Promise.TimeoutError, function(err) {
log.error(
'Device worker "%s" did not stop in time: %s'
, device.id
, err.message
)
})
}
flippedTracker.on(device.id, deviceListener)
privateTracker.on('change', changeListener)
privateTracker.on('remove', removeListener)
check(device)
}))
tracker.on('change', filterDevice(function(device) {
flippedTracker.emit(device.id, 'change', device)
}))
tracker.on('remove', filterDevice(function(device) {
flippedTracker.emit(device.id, 'remove', device)
}))
})
function gracefullyExit() { function gracefullyExit() {
log.info('Stopping all workers') log.info('Stopping all workers')
Promise.all(Object.keys(workers).map(gracefullyKillWorker)) Promise.all(Object.keys(workers).map(function(serial) {
return workers[serial].cancel()
}))
.done(function() { .done(function() {
log.info('All cleaned up') log.info('All cleaned up')
process.exit(0) process.exit(0)
}) })
} }
function cleanupWorker(id) {
var worker = workers[id]
delete workers[id]
Array.prototype.push.apply(options.ports, worker.ports)
_.pull(lists.ready, id)
_.pull(lists.waiting, id)
push.send([
wireutil.global
, wireutil.envelope(new wire.DeviceAbsentMessage(
id
))
])
delayedTotals()
return worker
}
process.on('SIGINT', function(e) { process.on('SIGINT', function(e) {
log.info('Received SIGINT') log.info('Received SIGINT')
gracefullyExit() gracefullyExit()

View file

@ -34,3 +34,28 @@ module.exports.fork = function() {
}) })
}) })
} }
// Export
module.exports.gracefullyKill = function(proc, timeout) {
function killer(signal) {
var deferred = Promise.defer()
function onExit() {
deferred.resolve()
}
proc.once('exit', onExit)
proc.kill(signal)
return deferred.promise.finally(function() {
proc.removeListener('exit', onExit)
})
}
return killer('SIGTERM')
.timeout(timeout)
.catch(function() {
return killer('SIGKILL')
.timeout(timeout)
})
}