diff --git a/lib/cli.js b/lib/cli.js index 81757cee..d12c60aa 100644 --- a/lib/cli.js +++ b/lib/cli.js @@ -1,15 +1,165 @@ var program = require('commander') var pkg = require('../package') +var cliargs = require('./util/cliargs') program .version(pkg.version) program - .command('provider') - .description('run STF provider') + .command('provider [serial..]') + .description('start provider') + .option('-s, --connect-dev-sub ', 'device sub endpoint', + cliargs.list) + .option('-p, --connect-dev-push ', 'device push endpoint', + cliargs.list) .action(function() { - require('./provider') + var serials = cliargs.allUnknownArgs(arguments) + , options = cliargs.lastArg(arguments) + + if (!options.connectDevSub) { + this.missingArgument('--connect-dev-sub') + } + if (!options.connectDevPush) { + this.missingArgument('--connect-dev-push') + } + + require('./roles/provider')({ + filter: function(device) { + return serials.length === 0 || serials.indexOf(device.id) !== -1 + } + , fork: function(device) { + var fork = require('child_process').fork + return fork(__filename, [ + 'device', device.id + , '--connect-sub', options.connectDevSub.join(',') + , '--connect-push', options.connectDevPush.join(',') + ]) + } + }) + }) + +program + .command('device ') + .description('start device worker') + .option('-s, --connect-sub ', 'sub endpoint', cliargs.list) + .option('-p, --connect-push ', 'push endpoint', cliargs.list) + .action(function(serial, options) { + if (!options.connectSub) { + this.missingArgument('--connect-sub') + } + if (!options.connectPush) { + this.missingArgument('--connect-push') + } + + require('./roles/device')({ + serial: serial + , endpoints: { + sub: options.connectSub + , push: options.connectPush + } + }) + }) + +program + .command('coordinator ') + .description('start coordinator') + .option('-a, --connect-app-dealer ', 'app dealer endpoint', + cliargs.list) + .option('-d, --connect-dev-dealer ', 'device dealer endpoint', + cliargs.list) + .action(function(name, options) { + if (!options.connectAppDealer) { + this.missingArgument('--connect-app-dealer') + } + if (!options.connectDevDealer) { + this.missingArgument('--connect-dev-dealer') + } + + require('./roles/coordinator')({ + name: name + , endpoints: { + appDealer: options.connectAppDealer + , devDealer: options.connectDevDealer + } + }) + }) + +program + .command('triproxy ') + .description('start triproxy') + .option('-u, --bind-pub ', 'pub endpoint', + String, 'tcp://*:7111') + .option('-d, --bind-dealer ', 'dealer endpoint', + String, 'tcp://*:7112') + .option('-p, --bind-pull ', 'pull endpoint', + String, 'tcp://*:7113') + .action(function(name, options) { + require('./roles/triproxy')({ + name: name + , endpoints: { + pub: options.bindPub + , dealer: options.bindDealer + , pull: options.bindPull + } + }) + }) + +program + .command('local [serial..]') + .description('start everything locally') + .option('--bind-app-pub ', 'app pub endpoint', + String, 'tcp://127.0.0.1:7111') + .option('--bind-app-dealer ', 'app dealer endpoint', + String, 'tcp://127.0.0.1:7112') + .option('--bind-app-pull ', 'app pull endpoint', + String, 'tcp://127.0.0.1:7113') + .option('--bind-dev-pub ', 'device pub endpoint', + String, 'tcp://127.0.0.1:7114') + .option('--bind-dev-dealer ', 'device dealer endpoint', + String, 'tcp://127.0.0.1:7115') + .option('--bind-dev-pull ', 'device pull endpoint', + String, 'tcp://127.0.0.1:7116') + .action(function() { + var options = cliargs.lastArg(arguments) + , fork = require('child_process').fork + + // app triproxy + fork(__filename, [ + 'triproxy', 'app001' + , '--bind-pub', options.bindAppPub + , '--bind-dealer', options.bindAppDealer + , '--bind-pull', options.bindAppPull + ]) + + // device triproxy + fork(__filename, [ + 'triproxy', 'dev001' + , '--bind-pub', options.bindDevPub + , '--bind-dealer', options.bindDevDealer + , '--bind-pull', options.bindDevPull + ]) + + // coordinator one + fork(__filename, [ + 'coordinator', 'coord001' + , '--connect-app-dealer', options.bindAppDealer + , '--connect-dev-dealer', options.bindDevDealer + ]) + + // coordinator two + fork(__filename, [ + 'coordinator', 'coord002' + , '--connect-app-dealer', options.bindAppDealer + , '--connect-dev-dealer', options.bindDevDealer + ]) + + // provider + fork(__filename, [ + 'provider' + , '--connect-dev-sub', options.bindDevPub + , '--connect-dev-push', options.bindDevPull + ].concat(cliargs.allUnknownArgs(arguments))) }) program.parse(process.argv) diff --git a/lib/device.js b/lib/device.js deleted file mode 100644 index d66c3028..00000000 --- a/lib/device.js +++ /dev/null @@ -1,37 +0,0 @@ -var assert = require('assert') - -var Promise = require('bluebird') - -var logger = require('./util/logger') -var log = logger.createLogger('device') - -function readSerialNumber() { - return Promise.try(function() { - assert.ok(process.env.ANDROID_SERIAL, - 'Missing environment variable ANDROID_SERIAL') - return process.env.ANDROID_SERIAL - }) -} - -function gracefullyExit() { - log.info('Bye') - process.exit(0) -} - -process.on('SIGINT', function() { - gracefullyExit() -}) - -process.on('SIGTERM', function() { - gracefullyExit() -}) - -Promise.spawn(function* () { - var serial = yield readSerialNumber() - - // Show serial number in logs - logger.setGlobalIdentifier(serial) - - // Report - log.info('Started') -}) diff --git a/lib/provider.js b/lib/provider.js deleted file mode 100644 index 651ec194..00000000 --- a/lib/provider.js +++ /dev/null @@ -1,161 +0,0 @@ -var path = require('path') -var fork = require('child_process').fork - -var adb = require('adbkit') -var Promise = require('bluebird') - -var log = require('./util/logger').createLogger('provider') -var client = adb.createClient() -var workers = Object.create(null) - -client.trackDevices(function(err, tracker) { - if (err) { - log.fatal('Unable to track devices: %s', err.message) - throw err - } - - log.info('Tracking devices') - - tracker.on('add', function(device) { - log.info('Found device "%s" (%s)', device.id, device.type) - maybeConnect(device) - }) - - tracker.on('change', function(device) { - log.info('Device "%s" is now "%s"', device.id, device.type) - maybeConnect(device) || maybeDisconnect(device) - }) - - tracker.on('remove', function(device) { - log.info('Lost device "%s" (%s)', device.id, device.type) - maybeDisconnect(device) - }) -}) - -function isConnectable(device) { - switch (device.type) { - case 'device': - case 'emulator': - return true - default: - return false - } -} - -function isConnected(device) { - return workers[device.id] -} - -function maybeConnect(device) { - if (isConnectable(device) && !isConnected(device)) { - log.info('Spawning worker for device "%s"', device.id) - var proc = fork(path.join(__dirname, 'device'), { - env: { - ANDROID_SERIAL: device.id - } - }) - proc.on('error', function(err) { - log.error('Device worker "%s" had an error: %s', - device.id, err.message) - }) - proc.on('exit', function(code, signal) { - var data = workers[device.id] - delete workers[device.id] - if (code === 0) { - log.info('Device worker "%s" stopped cleanly', device.id) - } - else { - log.error('Device worker "%s" had a dirty exit (code %d)', - device.id, code) - if (Date.now() - data.started < 10000) { - log.error('Device worker "%s" failed within 10 seconds of startup,' + - ' will not attempt to restart', device.id) - } - else { - log.info('Restarting worker of "%s"', device.id) - maybeConnect(device) - } - } - }) - workers[device.id] = { - device: device - , proc: proc - , started: Date.now() - } - return true - } - return false -} - -function maybeDisconnect(device) { - if (isConnected(device)) { - log.info('Releasing worker of %s', device.id) - gracefullyKillWorker(device.id, function() { /* noop */ }) - return true - } - return false -} - -function tryKillWorker(id) { - var deferred = Promise.defer(), - worker = workers[id] - - function onExit() { - deferred.resolve() - } - - worker.proc.once('exit', onExit) - worker.proc.kill('SIGTERM') - - return deferred.promise.finally(function() { - worker.proc.removeListener('exit', onExit) - }) -} - -function forceKillWorker(id) { - log.warn('Force killing worker of device "%s"', id) - - var deferred = Promise.defer() - , worker = workers[id] - - function onExit() { - deferred.resolve() - } - - worker.proc.once('exit', onExit) - worker.proc.kill('SIGKILL') - - return deferred.promise.finally(function() { - worker.proc.removeListener('exit', onExit) - }) -} - -function gracefullyKillWorker(id) { - return tryKillWorker(id) - .timeout(10000) - .catch(function() { - log.error('Device worker "%s" did not stop in time', id) - return forceKillWorker(id) - .timeout(10000) - .then(deferred.resolve) - }) -} - -function gracefullyExit() { - log.info('Stopping all workers') - Promise.all(Object.keys(workers).map(gracefullyKillWorker)) - .done(function() { - log.info('All cleaned up') - process.exit(0) - }) -} - -process.on('SIGINT', function(e) { - log.info('Received SIGINT') - gracefullyExit() -}) - -process.on('SIGTERM', function(e) { - log.info('Received SIGTERM') - gracefullyExit() -}) diff --git a/lib/roles/coordinator.js b/lib/roles/coordinator.js new file mode 100644 index 00000000..9d590cbb --- /dev/null +++ b/lib/roles/coordinator.js @@ -0,0 +1,29 @@ +var zmq = require('zmq') + +var logger = require('../util/logger') + +module.exports = function(options) { + var log = logger.createLogger('coordinator') + + if (options.name) { + logger.setGlobalIdentifier(options.name) + } + + // App side + var appDealer = zmq.socket('dealer') + options.endpoints.appDealer.forEach(function(endpoint) { + log.info('App dealer connected to %s', endpoint) + appDealer.connect(endpoint) + }) + + // Device side + var devDealer = zmq.socket('dealer') + options.endpoints.devDealer.forEach(function(endpoint) { + log.info('Device dealer connected to %s', endpoint) + devDealer.connect(endpoint) + }) + + devDealer.on('message', function() { + log.debug(arguments) + }) +} diff --git a/lib/roles/device.js b/lib/roles/device.js new file mode 100644 index 00000000..09178116 --- /dev/null +++ b/lib/roles/device.js @@ -0,0 +1,52 @@ +var assert = require('assert') + +var Promise = require('bluebird') +var zmq = require('zmq') +var adbkit = require('adbkit') + +module.exports = function(options) { + var logger = require('../util/logger') + var log = logger.createLogger('device') + + // Show serial number in logs + logger.setGlobalIdentifier(options.serial) + + // Input + var sub = zmq.socket('sub') + options.endpoints.sub.forEach(function(endpoint) { + log.info('Receiving input from %s', endpoint) + sub.connect(endpoint) + }) + + sub.on('message', function() { + var args = [].slice.call(target) + , channel = args.unshift() + , cmd = args.unshift() + }) + + // Respond to messages directed to everyone + sub.subscribe('ALL') + + // Output + var push = zmq.socket('push') + options.endpoints.push.forEach(function(endpoint) { + log.info('Sending output to %s', endpoint) + push.connect(endpoint) + }) + + // Introduce worker + push.send(['HELO', options.serial]) + + function gracefullyExit() { + log.info('Bye') + process.exit(0) + } + + process.on('SIGINT', function() { + gracefullyExit() + }) + + process.on('SIGTERM', function() { + gracefullyExit() + }) +} diff --git a/lib/roles/provider.js b/lib/roles/provider.js new file mode 100644 index 00000000..135e63ce --- /dev/null +++ b/lib/roles/provider.js @@ -0,0 +1,169 @@ +var adb = require('adbkit') +var Promise = require('bluebird') + +module.exports = function(options) { + var log = require('../util/logger').createLogger('provider') + var client = adb.createClient() + var workers = Object.create(null) + + client.trackDevices(function(err, tracker) { + if (err) { + log.fatal('Unable to track devices: %s', err.message) + throw err + } + + log.info('Tracking devices') + + tracker.on('add', function(device) { + if (isWantedDevice(device)) { + log.info('Found device "%s" (%s)', device.id, device.type) + maybeConnect(device) + } + else { + log.info('Ignoring device "%s" (%s)', device.id, device.type) + } + }) + + tracker.on('change', function(device) { + if (isWantedDevice(device)) { + log.info('Device "%s" is now "%s"', device.id, device.type) + maybeConnect(device) || maybeDisconnect(device) + } + }) + + tracker.on('remove', function(device) { + if (isWantedDevice(device)) { + log.info('Lost device "%s" (%s)', device.id, device.type) + maybeDisconnect(device) + } + }) + }) + + function isWantedDevice(device) { + return options.filter ? options.filter(device) : true + } + + function isConnectable(device) { + switch (device.type) { + case 'device': + case 'emulator': + return true + default: + return false + } + } + + function isConnected(device) { + return workers[device.id] + } + + function maybeConnect(device) { + if (isConnectable(device) && !isConnected(device)) { + log.info('Spawning worker for device "%s"', device.id) + var proc = options.fork(device) + proc.on('error', function(err) { + log.error('Device worker "%s" had an error: %s', + device.id, err.message) + }) + proc.on('exit', function(code, signal) { + var data = workers[device.id] + delete workers[device.id] + if (code === 0) { + log.info('Device worker "%s" stopped cleanly', device.id) + } + else { + log.error('Device worker "%s" had a dirty exit (code %d)', + device.id, code) + if (Date.now() - data.started < 10000) { + log.error('Device worker "%s" failed within 10 seconds of startup,' + + ' will not attempt to restart', device.id) + } + else { + log.info('Restarting worker of "%s"', device.id) + maybeConnect(device) + } + } + }) + workers[device.id] = { + device: device + , proc: proc + , started: Date.now() + } + return true + } + return false + } + + function maybeDisconnect(device) { + if (isConnected(device)) { + log.info('Releasing worker of %s', device.id) + gracefullyKillWorker(device.id, function() { /* noop */ }) + return true + } + return false + } + + function tryKillWorker(id) { + var deferred = Promise.defer(), + worker = workers[id] + + function onExit() { + deferred.resolve() + } + + worker.proc.once('exit', onExit) + worker.proc.kill('SIGTERM') + + return deferred.promise.finally(function() { + worker.proc.removeListener('exit', onExit) + }) + } + + function forceKillWorker(id) { + log.warn('Force killing worker of device "%s"', id) + + var deferred = Promise.defer() + , worker = workers[id] + + function onExit() { + deferred.resolve() + } + + worker.proc.once('exit', onExit) + worker.proc.kill('SIGKILL') + + return deferred.promise.finally(function() { + worker.proc.removeListener('exit', onExit) + }) + } + + function gracefullyKillWorker(id) { + return tryKillWorker(id) + .timeout(10000) + .catch(function() { + log.error('Device worker "%s" did not stop in time', id) + return forceKillWorker(id) + .timeout(10000) + .then(deferred.resolve) + }) + } + + function gracefullyExit() { + log.info('Stopping all workers') + Promise.all(Object.keys(workers).map(gracefullyKillWorker)) + .done(function() { + log.info('All cleaned up') + process.exit(0) + }) + } + + process.on('SIGINT', function(e) { + log.info('Received SIGINT') + gracefullyExit() + }) + + process.on('SIGTERM', function(e) { + log.info('Received SIGTERM') + gracefullyExit() + }) +} diff --git a/lib/roles/triproxy.js b/lib/roles/triproxy.js new file mode 100644 index 00000000..df8fe196 --- /dev/null +++ b/lib/roles/triproxy.js @@ -0,0 +1,34 @@ +var zmq = require('zmq') + +var logger = require('../util/logger') + +module.exports = function(options) { + var log = logger.createLogger('triproxy') + + if (options.name) { + logger.setGlobalIdentifier(options.name) + } + + function proxy(to) { + return function() { + to.send([].slice.call(arguments)) + } + } + + // App/device output + var pub = zmq.socket('pub') + pub.bindSync(options.endpoints.pub) + log.info('PUB socket bound on', options.endpoints.pub) + + // Coordinator input/output + var dealer = zmq.socket('dealer') + dealer.bindSync(options.endpoints.dealer) + dealer.on('message', proxy(pub)) + log.info('DEALER socket bound on', options.endpoints.dealer) + + // App/device input + var pull = zmq.socket('pull') + pull.bindSync(options.endpoints.pull) + pull.on('message', proxy(dealer)) + log.info('PULL socket bound on', options.endpoints.pull) +} diff --git a/lib/util/cliargs.js b/lib/util/cliargs.js new file mode 100644 index 00000000..774220f8 --- /dev/null +++ b/lib/util/cliargs.js @@ -0,0 +1,11 @@ +module.exports.list = function(val) { + return val.split(/\s*,\s*/g).filter(Boolean) +} + +module.exports.allUnknownArgs = function(args) { + return [].slice.call(args, 0, -1).filter(Boolean) +} + +module.exports.lastArg = function(args) { + return args[args.length - 1] +}