1
0
Fork 0
mirror of https://github.com/openstf/stf synced 2025-10-05 10:39:25 +02:00

Working topo and startup.

This commit is contained in:
Simo Kinnunen 2014-01-13 19:08:30 +09:00
parent 23543ec361
commit 3be72ae8c9
8 changed files with 448 additions and 201 deletions

View file

@ -1,15 +1,165 @@
var program = require('commander')
var pkg = require('../package')
var cliargs = require('./util/cliargs')
program
.version(pkg.version)
program
.command('provider')
.description('run STF provider')
.command('provider [serial..]')
.description('start provider')
.option('-s, --connect-dev-sub <endpoint>', 'device sub endpoint',
cliargs.list)
.option('-p, --connect-dev-push <endpoint>', 'device push endpoint',
cliargs.list)
.action(function() {
require('./provider')
var serials = cliargs.allUnknownArgs(arguments)
, options = cliargs.lastArg(arguments)
if (!options.connectDevSub) {
this.missingArgument('--connect-dev-sub')
}
if (!options.connectDevPush) {
this.missingArgument('--connect-dev-push')
}
require('./roles/provider')({
filter: function(device) {
return serials.length === 0 || serials.indexOf(device.id) !== -1
}
, fork: function(device) {
var fork = require('child_process').fork
return fork(__filename, [
'device', device.id
, '--connect-sub', options.connectDevSub.join(',')
, '--connect-push', options.connectDevPush.join(',')
])
}
})
})
program
.command('device <serial>')
.description('start device worker')
.option('-s, --connect-sub <endpoint>', 'sub endpoint', cliargs.list)
.option('-p, --connect-push <endpoint>', 'push endpoint', cliargs.list)
.action(function(serial, options) {
if (!options.connectSub) {
this.missingArgument('--connect-sub')
}
if (!options.connectPush) {
this.missingArgument('--connect-push')
}
require('./roles/device')({
serial: serial
, endpoints: {
sub: options.connectSub
, push: options.connectPush
}
})
})
program
.command('coordinator <name>')
.description('start coordinator')
.option('-a, --connect-app-dealer <endpoint>', 'app dealer endpoint',
cliargs.list)
.option('-d, --connect-dev-dealer <endpoint>', 'device dealer endpoint',
cliargs.list)
.action(function(name, options) {
if (!options.connectAppDealer) {
this.missingArgument('--connect-app-dealer')
}
if (!options.connectDevDealer) {
this.missingArgument('--connect-dev-dealer')
}
require('./roles/coordinator')({
name: name
, endpoints: {
appDealer: options.connectAppDealer
, devDealer: options.connectDevDealer
}
})
})
program
.command('triproxy <name>')
.description('start triproxy')
.option('-u, --bind-pub <endpoint>', 'pub endpoint',
String, 'tcp://*:7111')
.option('-d, --bind-dealer <endpoint>', 'dealer endpoint',
String, 'tcp://*:7112')
.option('-p, --bind-pull <endpoint>', 'pull endpoint',
String, 'tcp://*:7113')
.action(function(name, options) {
require('./roles/triproxy')({
name: name
, endpoints: {
pub: options.bindPub
, dealer: options.bindDealer
, pull: options.bindPull
}
})
})
program
.command('local [serial..]')
.description('start everything locally')
.option('--bind-app-pub <endpoint>', 'app pub endpoint',
String, 'tcp://127.0.0.1:7111')
.option('--bind-app-dealer <endpoint>', 'app dealer endpoint',
String, 'tcp://127.0.0.1:7112')
.option('--bind-app-pull <endpoint>', 'app pull endpoint',
String, 'tcp://127.0.0.1:7113')
.option('--bind-dev-pub <endpoint>', 'device pub endpoint',
String, 'tcp://127.0.0.1:7114')
.option('--bind-dev-dealer <endpoint>', 'device dealer endpoint',
String, 'tcp://127.0.0.1:7115')
.option('--bind-dev-pull <endpoint>', 'device pull endpoint',
String, 'tcp://127.0.0.1:7116')
.action(function() {
var options = cliargs.lastArg(arguments)
, fork = require('child_process').fork
// app triproxy
fork(__filename, [
'triproxy', 'app001'
, '--bind-pub', options.bindAppPub
, '--bind-dealer', options.bindAppDealer
, '--bind-pull', options.bindAppPull
])
// device triproxy
fork(__filename, [
'triproxy', 'dev001'
, '--bind-pub', options.bindDevPub
, '--bind-dealer', options.bindDevDealer
, '--bind-pull', options.bindDevPull
])
// coordinator one
fork(__filename, [
'coordinator', 'coord001'
, '--connect-app-dealer', options.bindAppDealer
, '--connect-dev-dealer', options.bindDevDealer
])
// coordinator two
fork(__filename, [
'coordinator', 'coord002'
, '--connect-app-dealer', options.bindAppDealer
, '--connect-dev-dealer', options.bindDevDealer
])
// provider
fork(__filename, [
'provider'
, '--connect-dev-sub', options.bindDevPub
, '--connect-dev-push', options.bindDevPull
].concat(cliargs.allUnknownArgs(arguments)))
})
program.parse(process.argv)

View file

@ -1,37 +0,0 @@
var assert = require('assert')
var Promise = require('bluebird')
var logger = require('./util/logger')
var log = logger.createLogger('device')
function readSerialNumber() {
return Promise.try(function() {
assert.ok(process.env.ANDROID_SERIAL,
'Missing environment variable ANDROID_SERIAL')
return process.env.ANDROID_SERIAL
})
}
function gracefullyExit() {
log.info('Bye')
process.exit(0)
}
process.on('SIGINT', function() {
gracefullyExit()
})
process.on('SIGTERM', function() {
gracefullyExit()
})
Promise.spawn(function* () {
var serial = yield readSerialNumber()
// Show serial number in logs
logger.setGlobalIdentifier(serial)
// Report
log.info('Started')
})

View file

@ -1,161 +0,0 @@
var path = require('path')
var fork = require('child_process').fork
var adb = require('adbkit')
var Promise = require('bluebird')
var log = require('./util/logger').createLogger('provider')
var client = adb.createClient()
var workers = Object.create(null)
client.trackDevices(function(err, tracker) {
if (err) {
log.fatal('Unable to track devices: %s', err.message)
throw err
}
log.info('Tracking devices')
tracker.on('add', function(device) {
log.info('Found device "%s" (%s)', device.id, device.type)
maybeConnect(device)
})
tracker.on('change', function(device) {
log.info('Device "%s" is now "%s"', device.id, device.type)
maybeConnect(device) || maybeDisconnect(device)
})
tracker.on('remove', function(device) {
log.info('Lost device "%s" (%s)', device.id, device.type)
maybeDisconnect(device)
})
})
function isConnectable(device) {
switch (device.type) {
case 'device':
case 'emulator':
return true
default:
return false
}
}
function isConnected(device) {
return workers[device.id]
}
function maybeConnect(device) {
if (isConnectable(device) && !isConnected(device)) {
log.info('Spawning worker for device "%s"', device.id)
var proc = fork(path.join(__dirname, 'device'), {
env: {
ANDROID_SERIAL: device.id
}
})
proc.on('error', function(err) {
log.error('Device worker "%s" had an error: %s',
device.id, err.message)
})
proc.on('exit', function(code, signal) {
var data = workers[device.id]
delete workers[device.id]
if (code === 0) {
log.info('Device worker "%s" stopped cleanly', device.id)
}
else {
log.error('Device worker "%s" had a dirty exit (code %d)',
device.id, code)
if (Date.now() - data.started < 10000) {
log.error('Device worker "%s" failed within 10 seconds of startup,' +
' will not attempt to restart', device.id)
}
else {
log.info('Restarting worker of "%s"', device.id)
maybeConnect(device)
}
}
})
workers[device.id] = {
device: device
, proc: proc
, started: Date.now()
}
return true
}
return false
}
function maybeDisconnect(device) {
if (isConnected(device)) {
log.info('Releasing worker of %s', device.id)
gracefullyKillWorker(device.id, function() { /* noop */ })
return true
}
return false
}
function tryKillWorker(id) {
var deferred = Promise.defer(),
worker = workers[id]
function onExit() {
deferred.resolve()
}
worker.proc.once('exit', onExit)
worker.proc.kill('SIGTERM')
return deferred.promise.finally(function() {
worker.proc.removeListener('exit', onExit)
})
}
function forceKillWorker(id) {
log.warn('Force killing worker of device "%s"', id)
var deferred = Promise.defer()
, worker = workers[id]
function onExit() {
deferred.resolve()
}
worker.proc.once('exit', onExit)
worker.proc.kill('SIGKILL')
return deferred.promise.finally(function() {
worker.proc.removeListener('exit', onExit)
})
}
function gracefullyKillWorker(id) {
return tryKillWorker(id)
.timeout(10000)
.catch(function() {
log.error('Device worker "%s" did not stop in time', id)
return forceKillWorker(id)
.timeout(10000)
.then(deferred.resolve)
})
}
function gracefullyExit() {
log.info('Stopping all workers')
Promise.all(Object.keys(workers).map(gracefullyKillWorker))
.done(function() {
log.info('All cleaned up')
process.exit(0)
})
}
process.on('SIGINT', function(e) {
log.info('Received SIGINT')
gracefullyExit()
})
process.on('SIGTERM', function(e) {
log.info('Received SIGTERM')
gracefullyExit()
})

29
lib/roles/coordinator.js Normal file
View file

@ -0,0 +1,29 @@
var zmq = require('zmq')
var logger = require('../util/logger')
module.exports = function(options) {
var log = logger.createLogger('coordinator')
if (options.name) {
logger.setGlobalIdentifier(options.name)
}
// App side
var appDealer = zmq.socket('dealer')
options.endpoints.appDealer.forEach(function(endpoint) {
log.info('App dealer connected to %s', endpoint)
appDealer.connect(endpoint)
})
// Device side
var devDealer = zmq.socket('dealer')
options.endpoints.devDealer.forEach(function(endpoint) {
log.info('Device dealer connected to %s', endpoint)
devDealer.connect(endpoint)
})
devDealer.on('message', function() {
log.debug(arguments)
})
}

52
lib/roles/device.js Normal file
View file

@ -0,0 +1,52 @@
var assert = require('assert')
var Promise = require('bluebird')
var zmq = require('zmq')
var adbkit = require('adbkit')
module.exports = function(options) {
var logger = require('../util/logger')
var log = logger.createLogger('device')
// Show serial number in logs
logger.setGlobalIdentifier(options.serial)
// Input
var sub = zmq.socket('sub')
options.endpoints.sub.forEach(function(endpoint) {
log.info('Receiving input from %s', endpoint)
sub.connect(endpoint)
})
sub.on('message', function() {
var args = [].slice.call(target)
, channel = args.unshift()
, cmd = args.unshift()
})
// Respond to messages directed to everyone
sub.subscribe('ALL')
// Output
var push = zmq.socket('push')
options.endpoints.push.forEach(function(endpoint) {
log.info('Sending output to %s', endpoint)
push.connect(endpoint)
})
// Introduce worker
push.send(['HELO', options.serial])
function gracefullyExit() {
log.info('Bye')
process.exit(0)
}
process.on('SIGINT', function() {
gracefullyExit()
})
process.on('SIGTERM', function() {
gracefullyExit()
})
}

169
lib/roles/provider.js Normal file
View file

@ -0,0 +1,169 @@
var adb = require('adbkit')
var Promise = require('bluebird')
module.exports = function(options) {
var log = require('../util/logger').createLogger('provider')
var client = adb.createClient()
var workers = Object.create(null)
client.trackDevices(function(err, tracker) {
if (err) {
log.fatal('Unable to track devices: %s', err.message)
throw err
}
log.info('Tracking devices')
tracker.on('add', function(device) {
if (isWantedDevice(device)) {
log.info('Found device "%s" (%s)', device.id, device.type)
maybeConnect(device)
}
else {
log.info('Ignoring device "%s" (%s)', device.id, device.type)
}
})
tracker.on('change', function(device) {
if (isWantedDevice(device)) {
log.info('Device "%s" is now "%s"', device.id, device.type)
maybeConnect(device) || maybeDisconnect(device)
}
})
tracker.on('remove', function(device) {
if (isWantedDevice(device)) {
log.info('Lost device "%s" (%s)', device.id, device.type)
maybeDisconnect(device)
}
})
})
function isWantedDevice(device) {
return options.filter ? options.filter(device) : true
}
function isConnectable(device) {
switch (device.type) {
case 'device':
case 'emulator':
return true
default:
return false
}
}
function isConnected(device) {
return workers[device.id]
}
function maybeConnect(device) {
if (isConnectable(device) && !isConnected(device)) {
log.info('Spawning worker for device "%s"', device.id)
var proc = options.fork(device)
proc.on('error', function(err) {
log.error('Device worker "%s" had an error: %s',
device.id, err.message)
})
proc.on('exit', function(code, signal) {
var data = workers[device.id]
delete workers[device.id]
if (code === 0) {
log.info('Device worker "%s" stopped cleanly', device.id)
}
else {
log.error('Device worker "%s" had a dirty exit (code %d)',
device.id, code)
if (Date.now() - data.started < 10000) {
log.error('Device worker "%s" failed within 10 seconds of startup,' +
' will not attempt to restart', device.id)
}
else {
log.info('Restarting worker of "%s"', device.id)
maybeConnect(device)
}
}
})
workers[device.id] = {
device: device
, proc: proc
, started: Date.now()
}
return true
}
return false
}
function maybeDisconnect(device) {
if (isConnected(device)) {
log.info('Releasing worker of %s', device.id)
gracefullyKillWorker(device.id, function() { /* noop */ })
return true
}
return false
}
function tryKillWorker(id) {
var deferred = Promise.defer(),
worker = workers[id]
function onExit() {
deferred.resolve()
}
worker.proc.once('exit', onExit)
worker.proc.kill('SIGTERM')
return deferred.promise.finally(function() {
worker.proc.removeListener('exit', onExit)
})
}
function forceKillWorker(id) {
log.warn('Force killing worker of device "%s"', id)
var deferred = Promise.defer()
, worker = workers[id]
function onExit() {
deferred.resolve()
}
worker.proc.once('exit', onExit)
worker.proc.kill('SIGKILL')
return deferred.promise.finally(function() {
worker.proc.removeListener('exit', onExit)
})
}
function gracefullyKillWorker(id) {
return tryKillWorker(id)
.timeout(10000)
.catch(function() {
log.error('Device worker "%s" did not stop in time', id)
return forceKillWorker(id)
.timeout(10000)
.then(deferred.resolve)
})
}
function gracefullyExit() {
log.info('Stopping all workers')
Promise.all(Object.keys(workers).map(gracefullyKillWorker))
.done(function() {
log.info('All cleaned up')
process.exit(0)
})
}
process.on('SIGINT', function(e) {
log.info('Received SIGINT')
gracefullyExit()
})
process.on('SIGTERM', function(e) {
log.info('Received SIGTERM')
gracefullyExit()
})
}

34
lib/roles/triproxy.js Normal file
View file

@ -0,0 +1,34 @@
var zmq = require('zmq')
var logger = require('../util/logger')
module.exports = function(options) {
var log = logger.createLogger('triproxy')
if (options.name) {
logger.setGlobalIdentifier(options.name)
}
function proxy(to) {
return function() {
to.send([].slice.call(arguments))
}
}
// App/device output
var pub = zmq.socket('pub')
pub.bindSync(options.endpoints.pub)
log.info('PUB socket bound on', options.endpoints.pub)
// Coordinator input/output
var dealer = zmq.socket('dealer')
dealer.bindSync(options.endpoints.dealer)
dealer.on('message', proxy(pub))
log.info('DEALER socket bound on', options.endpoints.dealer)
// App/device input
var pull = zmq.socket('pull')
pull.bindSync(options.endpoints.pull)
pull.on('message', proxy(dealer))
log.info('PULL socket bound on', options.endpoints.pull)
}

11
lib/util/cliargs.js Normal file
View file

@ -0,0 +1,11 @@
module.exports.list = function(val) {
return val.split(/\s*,\s*/g).filter(Boolean)
}
module.exports.allUnknownArgs = function(args) {
return [].slice.call(args, 0, -1).filter(Boolean)
}
module.exports.lastArg = function(args) {
return args[args.length - 1]
}