diff --git a/lib/cli.js b/lib/cli.js index d12c60aa..558cfaf2 100644 --- a/lib/cli.js +++ b/lib/cli.js @@ -105,6 +105,27 @@ program }) }) +program + .command('console') + .description('start console') + .option('-s, --connect-sub ', 'sub endpoint', cliargs.list) + .option('-p, --connect-push ', 'push endpoint', cliargs.list) + .action(function(options) { + if (!options.connectSub) { + this.missingArgument('--connect-sub') + } + if (!options.connectPush) { + this.missingArgument('--connect-push') + } + + require('./roles/console')({ + endpoints: { + sub: options.connectSub + , push: options.connectPush + } + }) + }) + program .command('local [serial..]') .description('start everything locally') diff --git a/lib/roles/console.js b/lib/roles/console.js new file mode 100644 index 00000000..506592da --- /dev/null +++ b/lib/roles/console.js @@ -0,0 +1,87 @@ +var readline = require('readline') + +var zmq = require('zmq') +var Promise = require('bluebird') + +var logger = require('../util/logger') +var tx = require('../util/tx') + +module.exports = function(options) { + var log = logger.createLogger('console') + + // Input + var sub = zmq.socket('sub') + sub.subscribe('ALL') + options.endpoints.sub.forEach(function(endpoint) { + log.info('SUB connected to %s', endpoint) + sub.connect(endpoint) + }) + + // Output + var push = zmq.socket('push') + options.endpoints.push.forEach(function(endpoint) { + log.info('PUSH connected to %s', endpoint) + push.connect(endpoint) + }) + + // User input + var rl = readline.createInterface({ + input: process.stdin + , output: process.stdout + }) + rl.setPrompt('stf> ') + rl.prompt() + + rl.on('line', function(line) { + var args = line.trim().split(/\s+/g) + switch (args.shift()) { + case 'help': + console.log() + console.log('Available commands:') + console.log() + console.log(' help - show help') + console.log() + rl.prompt() + break + case 'ls': + tx.q(push, sub, 'ALL', ['ls']) + .timeout(1000) + .then(function(results) { + results.forEach(function(result) { + console.log('%s', result.serial) + }) + }) + .catch(Promise.TimeoutError, function(err) { + console.log(err.message) + }) + .finally(function() { + rl.prompt() + }) + break + case 'shell': + tx.q(push, sub, 'ALL', ['shell', args.join(' ')]) + .timeout(1000) + .then(function(results) { + results.forEach(function(result) { + console.log('%s: %s', result.serial, result.value.toString().trim()) + }) + }) + .catch(Promise.TimeoutError, function(err) { + console.log(err.message) + }) + .finally(function() { + rl.prompt() + }) + break + case 'exit': + case 'quit': + process.exit(0) + break + default: + console.log('Unknown command') + rl.prompt() + break + } + }) + +} diff --git a/lib/roles/coordinator.js b/lib/roles/coordinator.js index 9d590cbb..b6559d6a 100644 --- a/lib/roles/coordinator.js +++ b/lib/roles/coordinator.js @@ -16,6 +16,10 @@ module.exports = function(options) { appDealer.connect(endpoint) }) + appDealer.on('message', function(channel, id, cmd) { + devDealer.send([].slice.call(arguments)) + }) + // Device side var devDealer = zmq.socket('dealer') options.endpoints.devDealer.forEach(function(endpoint) { @@ -24,6 +28,6 @@ module.exports = function(options) { }) devDealer.on('message', function() { - log.debug(arguments) + appDealer.send([].slice.call(arguments)) }) } diff --git a/lib/roles/device.js b/lib/roles/device.js index 09178116..a3e36ea0 100644 --- a/lib/roles/device.js +++ b/lib/roles/device.js @@ -18,10 +18,34 @@ module.exports = function(options) { sub.connect(endpoint) }) - sub.on('message', function() { - var args = [].slice.call(target) - , channel = args.unshift() - , cmd = args.unshift() + sub.on('message', function(channel, id, cmd) { + push.send([id, options.serial, 'ACK']) + switch (cmd.toString()) { + case 'ls': + log.info('Responding to "ls"') + push.send([id, options.serial, 'OKY']) + break + case 'shell': + var line = arguments[3] + log.info('Running shell command "%s"', line) + adb.shellAsync(options.serial, line) + .then(function(out) { + var chunks = [] + out.on('data', function(chunk) { + chunks.push(chunk) + }) + out.on('end', function(chunk) { + push.send([id, options.serial, 'OKY', Buffer.concat(chunks)]) + }) + }) + .catch(function(err) { + push.send([id, options.serial, 'ERR', err.message]) + }) + break + default: + log.warn('Unknown command "%s"', cmd) + break + } }) // Respond to messages directed to everyone @@ -35,7 +59,10 @@ module.exports = function(options) { }) // Introduce worker - push.send(['HELO', options.serial]) + // push.send(['HELO', options.serial]) + + // Adb + var adb = Promise.promisifyAll(adbkit.createClient()) function gracefullyExit() { log.info('Bye') diff --git a/lib/util/tx.js b/lib/util/tx.js new file mode 100644 index 00000000..0d4c0b61 --- /dev/null +++ b/lib/util/tx.js @@ -0,0 +1,77 @@ +var uuid = require('node-uuid') +var Promise = require('bluebird') + +function newId() { + return uuid.v4() +} + +module.exports.newId = newId + +function q(output, input, channel, args) { + var deferred = Promise.defer() + , ourId = newId() + , results = [] + , mapping = {} + , remaining = 0 // @todo pass expected number to query + + function onMessage(theirId, serial, state, data) { + if (ourId === theirId.toString()) { + serial = serial.toString() + state = state.toString() + + var mapped = mapping[serial] + + if (!mapped) { + results.push(mapped = mapping[serial] = { + serial: serial + , state: state + , progress: 0 + , value: null + }) + } + else { + mapped.state = state + } + + switch (state) { + case 'ACK': + deferred.progress(results) + ++remaining + break + case 'PRG': + mapped.progress = +data + deferred.progress(results) + break + case 'ERR': + mapped.value = data + --remaining + break + case 'OKY': + mapped.progress = 100 + mapped.value = data + --remaining + break + } + + if (remaining) { + deferred.progress(results) + } + else { + deferred.resolve(results) + } + } + } + + input.on('message', onMessage) + input.subscribe(ourId) + + output.send([channel, ourId].concat(args)) + + return deferred.promise.finally(function() { + input.unsubscribe(ourId) + input.removeListener('message', onMessage) + mapping = results = null + }) +} + +module.exports.q = q