From 35ac51795b0612f860d96d50b6e5801d4889e84e Mon Sep 17 00:00:00 2001 From: Simo Kinnunen Date: Thu, 23 Jan 2014 14:00:15 +0900 Subject: [PATCH] Initial version of multidevice transactions. --- lib/roles/console.js | 78 ++++++++++++++++++++++++++++++++++------ lib/roles/coordinator.js | 8 +++++ lib/roles/device.js | 6 ++-- lib/util/wireutil.js | 9 +++++ lib/wire/wire.proto | 2 +- 5 files changed, 88 insertions(+), 15 deletions(-) diff --git a/lib/roles/console.js b/lib/roles/console.js index a119eab9..d82758f9 100644 --- a/lib/roles/console.js +++ b/lib/roles/console.js @@ -34,6 +34,9 @@ module.exports = function(options) { rl.setPrompt('stf> ') rl.prompt() + var clients = [] + , group = wireutil.global + rl.on('line', function(line) { var args = line.trim().split(/\s+/g) switch (args.shift()) { @@ -45,13 +48,61 @@ module.exports = function(options) { console.log() rl.prompt() break + case 'invite': + if (clients.length) { + log.error('We already have clients') + break + } + + var ours = group = wireutil.makePrivateChannel() + + var messageListener = function(theirs, data) { + if (theirs.toString() === ours) { + var wrapper = wire.Envelope.decode(data) + switch (wrapper.type) { + case wire.MessageType.JOIN_GROUP: + var message = wire.JoinGroupMessage.decode(wrapper.message) + clients.push(message.serial) + log.info('"%s" joined', message.serial) + break + case wire.MessageType.LEAVE_GROUP: + var message = wire.LeaveGroupMessage.decode(wrapper.message) + , index = clients.indexOf(message.serial) + clients.splice(index, 1) + log.info('"%s" left', message.serial) + break + default: + throw new Error('Unexpected response') + } + } + } + + sub.subscribe(ours) + sub.on('message', messageListener) + + push.send([wireutil.global, wireutil.makeGroupMessage( + ours + , 10000 + , [] + )]) + + Promise.delay(1000) + .then(function() { + rl.prompt() + }) + break case 'shell': - var resolver = Promise.defer() + var resolvers = {} , ours = wireutil.makePrivateChannel() + , counter = 0 log.debug('Using channel "%s"', ours) - function messageListener(theirs, data) { + clients.forEach(function(client) { + resolvers[client] = Promise.defer() + }) + + var messageListener = function(theirs, data) { if (theirs.toString() === ours) { var wrapper = wire.Envelope.decode(data) switch (wrapper.type) { @@ -63,33 +114,38 @@ module.exports = function(options) { case wire.MessageType.DEVICE_DONE: var message = wire.DeviceDoneMessage.decode(wrapper.message) log.info('[%s] DONE', message.serial) - resolver.resolve() // @todo Wait till all devices have finished + resolvers[message.serial].resolve() break case wire.MessageType.DEVICE_FAIL: var message = wire.DeviceFailMessage.decode(wrapper.message) log.error('[%s] FAIL: ', message.serial, message.reason) - resolver.reject(new Error(message.reason)) + resolvers[message.serial].reject(new Error(message.reason)) break default: - resolver.reject(new Error('Unexpected response')) - break + throw new Error('Unexpected response') } } } + sub.subscribe(ours) sub.on('message', messageListener) - push.send([wireutil.global, wireutil.makeShellCommandMessage( + push.send([group, wireutil.makeShellCommandMessage( ours , args )]) - resolver.promise.finally(function() { - sub.unsubscribe(ours) - sub.removeListener('message', messageListener) - rl.prompt() + var promises = Object.keys(resolvers).map(function(serial) { + return resolvers[serial].promise }) + + Promise.settle(promises) + .then(function() { + sub.unsubscribe(ours) + sub.removeListener('message', messageListener) + rl.prompt() + }) break case 'exit': case 'quit': diff --git a/lib/roles/coordinator.js b/lib/roles/coordinator.js index a6282818..edd1c714 100644 --- a/lib/roles/coordinator.js +++ b/lib/roles/coordinator.js @@ -32,6 +32,14 @@ module.exports = function(options) { devDealer.on('message', function(channel, data) { var wrapper = wire.Envelope.decode(data) switch (wrapper.type) { + case wire.MessageType.JOIN_GROUP: + var message = wire.JoinGroupMessage.decode(wrapper.message) + appDealer.send([channel, data]) + break + case wire.MessageType.LEAVE_GROUP: + var message = wire.LeaveGroupMessage.decode(wrapper.message) + appDealer.send([channel, data]) + break case wire.MessageType.DEVICE_POKE: var message = wire.DevicePokeMessage.decode(wrapper.message) devDealer.send([message.channel, wireutil.makeProbeMessage()]) diff --git a/lib/roles/device.js b/lib/roles/device.js index 1c751f00..7e4c6b1a 100644 --- a/lib/roles/device.js +++ b/lib/roles/device.js @@ -9,7 +9,6 @@ var wire = require('../wire') var wireutil = require('../util/wireutil')(wire) var devutil = require('../util/devutil') var ChannelManager = require('../wire/channelmanager') -var streamutil = require('../util/streamutil') module.exports = function(options) { var log = logger.createLogger('device') @@ -50,7 +49,7 @@ module.exports = function(options) { switch (wrapper.type) { case wire.MessageType.GROUP: var message = wire.GroupMessage.decode(wrapper.message) - , groupChannel = message.group + , groupChannel = message.channel if (wireutil.matchesRequirements(identity, message.requirements)) { channels.register(groupChannel, message.timeout) log.info('Subscribing to group channel "%s"', groupChannel) @@ -70,6 +69,7 @@ module.exports = function(options) { break case wire.MessageType.SHELL_COMMAND: var message = wire.ShellCommandMessage.decode(wrapper.message) + log.info('Running shell command "%s"', message.command.join(' ')) adb.shellAsync(options.serial, message.command) .then(function(stream) { var resolver = Promise.defer() @@ -115,7 +115,7 @@ module.exports = function(options) { log.error('Shell command "%s" failed due to "%s"' , message.command.join(' '), err.message) push.send([message.channel, - wire.makeShellCommandFailMessage(options.serial, err.message)]) + wire.makeDeviceFailMessage(options.serial, err.message)]) }) break } diff --git a/lib/util/wireutil.js b/lib/util/wireutil.js index 062fc601..dd6d9cc5 100644 --- a/lib/util/wireutil.js +++ b/lib/util/wireutil.js @@ -55,6 +55,15 @@ module.exports = function(wire) { , envelope: function(type, message) { return new wire.Envelope(type, message.encode()).encodeNB() } + , makeGroupMessage: function(channel, timeout, requirements) { + var message = new wire.GroupMessage( + channel + , timeout + , requirements + ) + + return wireutil.envelope(wire.MessageType.GROUP, message) + } , makeJoinGroupMessage: function(serial) { var message = new wire.JoinGroupMessage(serial) return wireutil.envelope(wire.MessageType.JOIN_GROUP, message) diff --git a/lib/wire/wire.proto b/lib/wire/wire.proto index e86dc521..3edee7e2 100644 --- a/lib/wire/wire.proto +++ b/lib/wire/wire.proto @@ -77,7 +77,7 @@ message DeviceRequirement { } message GroupMessage { - required string group = 1; + required string channel = 1; required uint32 timeout = 2; repeated DeviceRequirement requirements = 3; }