diff --git a/lib/roles/console.js b/lib/roles/console.js index 506592da..a119eab9 100644 --- a/lib/roles/console.js +++ b/lib/roles/console.js @@ -5,6 +5,8 @@ var Promise = require('bluebird') var logger = require('../util/logger') var tx = require('../util/tx') +var wire = require('../wire') +var wireutil = require('../util/wireutil')(wire) module.exports = function(options) { var log = logger.createLogger('console') @@ -43,35 +45,51 @@ module.exports = function(options) { console.log() rl.prompt() break - case 'ls': - tx.q(push, sub, 'ALL', ['ls']) - .timeout(1000) - .then(function(results) { - results.forEach(function(result) { - console.log('%s', result.serial) - }) - }) - .catch(Promise.TimeoutError, function(err) { - console.log(err.message) - }) - .finally(function() { - rl.prompt() - }) - break case 'shell': - tx.q(push, sub, 'ALL', ['shell', args.join(' ')]) - .timeout(1000) - .then(function(results) { - results.forEach(function(result) { - console.log('%s: %s', result.serial, result.value.toString().trim()) - }) - }) - .catch(Promise.TimeoutError, function(err) { - console.log(err.message) - }) - .finally(function() { - rl.prompt() - }) + var resolver = Promise.defer() + , ours = wireutil.makePrivateChannel() + + log.debug('Using channel "%s"', ours) + + function messageListener(theirs, data) { + if (theirs.toString() === ours) { + var wrapper = wire.Envelope.decode(data) + switch (wrapper.type) { + case wire.MessageType.DEVICE_DATA: + var message = wire.DeviceDataMessage.decode(wrapper.message) + log.info('[%s] DATA: %s' + , message.serial, message.data.toUTF8().trim()) + break + case wire.MessageType.DEVICE_DONE: + var message = wire.DeviceDoneMessage.decode(wrapper.message) + log.info('[%s] DONE', message.serial) + resolver.resolve() // @todo Wait till all devices have finished + break + case wire.MessageType.DEVICE_FAIL: + var message = wire.DeviceFailMessage.decode(wrapper.message) + log.error('[%s] FAIL: ', message.serial, message.reason) + resolver.reject(new Error(message.reason)) + break + default: + resolver.reject(new Error('Unexpected response')) + break + } + } + } + + sub.subscribe(ours) + sub.on('message', messageListener) + + push.send([wireutil.global, wireutil.makeShellCommandMessage( + ours + , args + )]) + + resolver.promise.finally(function() { + sub.unsubscribe(ours) + sub.removeListener('message', messageListener) + rl.prompt() + }) break case 'exit': case 'quit': diff --git a/lib/roles/coordinator.js b/lib/roles/coordinator.js index 20a73583..a6282818 100644 --- a/lib/roles/coordinator.js +++ b/lib/roles/coordinator.js @@ -18,8 +18,8 @@ module.exports = function(options) { appDealer.connect(endpoint) }) - appDealer.on('message', function(channel, id, cmd) { - devDealer.send([].slice.call(arguments)) + appDealer.on('message', function(channel, data) { + devDealer.send([channel, data]) }) // Device side @@ -42,6 +42,18 @@ module.exports = function(options) { case wire.MessageType.DEVICE_PROPERTIES: var message = wire.DevicePropertiesMessage.decode(wrapper.message) break + case wire.MessageType.DEVICE_DATA: + var message = wire.DeviceDataMessage.decode(wrapper.message) + appDealer.send([channel, data]) + break + case wire.MessageType.DEVICE_DONE: + var message = wire.DeviceDoneMessage.decode(wrapper.message) + appDealer.send([channel, data]) + break + case wire.MessageType.DEVICE_FAIL: + var message = wire.DeviceFailMessage.decode(wrapper.message) + appDealer.send([channel, data]) + break } }) } diff --git a/lib/roles/device.js b/lib/roles/device.js index 3a80fd04..1c751f00 100644 --- a/lib/roles/device.js +++ b/lib/roles/device.js @@ -9,6 +9,7 @@ var wire = require('../wire') var wireutil = require('../util/wireutil')(wire) var devutil = require('../util/devutil') var ChannelManager = require('../wire/channelmanager') +var streamutil = require('../util/streamutil') module.exports = function(options) { var log = logger.createLogger('device') @@ -67,6 +68,56 @@ module.exports = function(options) { wireutil.makeDevicePropertiesMessage(options.serial, properties)]) }) break + case wire.MessageType.SHELL_COMMAND: + var message = wire.ShellCommandMessage.decode(wrapper.message) + adb.shellAsync(options.serial, message.command) + .then(function(stream) { + var resolver = Promise.defer() + , seq = 0 + + function dataListener(chunk) { + push.send([message.channel, + wireutil.makeDeviceDataMessage( + options.serial + , seq++ + , chunk + )]) + } + + function endListener() { + push.send([message.channel, + wireutil.makeDeviceDoneMessage(options.serial)]) + resolver.resolve() + } + + function errorListener(err) { + log.error('Shell command "%s" failed due to "%s"' + , message.command.join(' '), err.message) + resolver.reject(err) + push.send([message.channel, + wireutil.makeDeviceFailMessage( + options.serial + , err.message + )]) + } + + stream.on('data', dataListener) + stream.on('end', endListener) + stream.on('error', errorListener) + + return resolver.promise.finally(function() { + stream.removeListener('data', dataListener) + stream.removeListener('end', endListener) + stream.removeListener('error', errorListener) + }) + }) + .error(function(err) { + log.error('Shell command "%s" failed due to "%s"' + , message.command.join(' '), err.message) + push.send([message.channel, + wire.makeShellCommandFailMessage(options.serial, err.message)]) + }) + break } }) diff --git a/lib/util/wireutil.js b/lib/util/wireutil.js index 59a14d57..062fc601 100644 --- a/lib/util/wireutil.js +++ b/lib/util/wireutil.js @@ -89,6 +89,22 @@ module.exports = function(wire) { var message = new wire.ProbeMessage() return wireutil.envelope(wire.MessageType.PROBE, message) } + , makeShellCommandMessage: function(channel, command) { + var message = new wire.ShellCommandMessage(channel, command) + return wireutil.envelope(wire.MessageType.SHELL_COMMAND, message) + } + , makeDeviceDataMessage: function(serial, seq, chunk) { + var message = new wire.DeviceDataMessage(serial, seq, chunk) + return wireutil.envelope(wire.MessageType.DEVICE_DATA, message) + } + , makeDeviceDoneMessage: function(serial) { + var message = new wire.DeviceDoneMessage(serial) + return wireutil.envelope(wire.MessageType.DEVICE_DONE, message) + } + , makeDeviceFailMessage: function(serial, reason) { + var message = new wire.DeviceFailMessage(serial, reason) + return wireutil.envelope(wire.MessageType.DEVICE_FAIL, message) + } } return wireutil diff --git a/lib/wire/wire.proto b/lib/wire/wire.proto index 437d5330..e86dc521 100644 --- a/lib/wire/wire.proto +++ b/lib/wire/wire.proto @@ -9,6 +9,10 @@ enum MessageType { JOIN_GROUP = 6; LEAVE_GROUP = 7; PROBE = 8; + SHELL_COMMAND = 9; + DEVICE_DATA = 10; + DEVICE_DONE = 11; + DEVICE_FAIL = 12; } message Envelope { @@ -85,3 +89,25 @@ message JoinGroupMessage { message LeaveGroupMessage { required string serial = 1; } + +// Commands + +message ShellCommandMessage { + required string channel = 1; + repeated string command = 2; +} + +message DeviceDataMessage { + required string serial = 1; + required uint32 seq = 2; + required bytes data = 3; +} + +message DeviceDoneMessage { + required string serial = 1; +} + +message DeviceFailMessage { + required string serial = 1; + required string reason = 2; +}