mirror of
https://github.com/openstf/stf
synced 2025-10-04 18:29:17 +02:00
Initial version of multidevice transactions.
This commit is contained in:
parent
242f6f17de
commit
35ac51795b
5 changed files with 88 additions and 15 deletions
|
@ -34,6 +34,9 @@ module.exports = function(options) {
|
|||
rl.setPrompt('stf> ')
|
||||
rl.prompt()
|
||||
|
||||
var clients = []
|
||||
, group = wireutil.global
|
||||
|
||||
rl.on('line', function(line) {
|
||||
var args = line.trim().split(/\s+/g)
|
||||
switch (args.shift()) {
|
||||
|
@ -45,13 +48,61 @@ module.exports = function(options) {
|
|||
console.log()
|
||||
rl.prompt()
|
||||
break
|
||||
case 'invite':
|
||||
if (clients.length) {
|
||||
log.error('We already have clients')
|
||||
break
|
||||
}
|
||||
|
||||
var ours = group = wireutil.makePrivateChannel()
|
||||
|
||||
var messageListener = function(theirs, data) {
|
||||
if (theirs.toString() === ours) {
|
||||
var wrapper = wire.Envelope.decode(data)
|
||||
switch (wrapper.type) {
|
||||
case wire.MessageType.JOIN_GROUP:
|
||||
var message = wire.JoinGroupMessage.decode(wrapper.message)
|
||||
clients.push(message.serial)
|
||||
log.info('"%s" joined', message.serial)
|
||||
break
|
||||
case wire.MessageType.LEAVE_GROUP:
|
||||
var message = wire.LeaveGroupMessage.decode(wrapper.message)
|
||||
, index = clients.indexOf(message.serial)
|
||||
clients.splice(index, 1)
|
||||
log.info('"%s" left', message.serial)
|
||||
break
|
||||
default:
|
||||
throw new Error('Unexpected response')
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sub.subscribe(ours)
|
||||
sub.on('message', messageListener)
|
||||
|
||||
push.send([wireutil.global, wireutil.makeGroupMessage(
|
||||
ours
|
||||
, 10000
|
||||
, []
|
||||
)])
|
||||
|
||||
Promise.delay(1000)
|
||||
.then(function() {
|
||||
rl.prompt()
|
||||
})
|
||||
break
|
||||
case 'shell':
|
||||
var resolver = Promise.defer()
|
||||
var resolvers = {}
|
||||
, ours = wireutil.makePrivateChannel()
|
||||
, counter = 0
|
||||
|
||||
log.debug('Using channel "%s"', ours)
|
||||
|
||||
function messageListener(theirs, data) {
|
||||
clients.forEach(function(client) {
|
||||
resolvers[client] = Promise.defer()
|
||||
})
|
||||
|
||||
var messageListener = function(theirs, data) {
|
||||
if (theirs.toString() === ours) {
|
||||
var wrapper = wire.Envelope.decode(data)
|
||||
switch (wrapper.type) {
|
||||
|
@ -63,29 +114,34 @@ module.exports = function(options) {
|
|||
case wire.MessageType.DEVICE_DONE:
|
||||
var message = wire.DeviceDoneMessage.decode(wrapper.message)
|
||||
log.info('[%s] DONE', message.serial)
|
||||
resolver.resolve() // @todo Wait till all devices have finished
|
||||
resolvers[message.serial].resolve()
|
||||
break
|
||||
case wire.MessageType.DEVICE_FAIL:
|
||||
var message = wire.DeviceFailMessage.decode(wrapper.message)
|
||||
log.error('[%s] FAIL: ', message.serial, message.reason)
|
||||
resolver.reject(new Error(message.reason))
|
||||
resolvers[message.serial].reject(new Error(message.reason))
|
||||
break
|
||||
default:
|
||||
resolver.reject(new Error('Unexpected response'))
|
||||
break
|
||||
throw new Error('Unexpected response')
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
sub.subscribe(ours)
|
||||
sub.on('message', messageListener)
|
||||
|
||||
push.send([wireutil.global, wireutil.makeShellCommandMessage(
|
||||
push.send([group, wireutil.makeShellCommandMessage(
|
||||
ours
|
||||
, args
|
||||
)])
|
||||
|
||||
resolver.promise.finally(function() {
|
||||
var promises = Object.keys(resolvers).map(function(serial) {
|
||||
return resolvers[serial].promise
|
||||
})
|
||||
|
||||
Promise.settle(promises)
|
||||
.then(function() {
|
||||
sub.unsubscribe(ours)
|
||||
sub.removeListener('message', messageListener)
|
||||
rl.prompt()
|
||||
|
|
|
@ -32,6 +32,14 @@ module.exports = function(options) {
|
|||
devDealer.on('message', function(channel, data) {
|
||||
var wrapper = wire.Envelope.decode(data)
|
||||
switch (wrapper.type) {
|
||||
case wire.MessageType.JOIN_GROUP:
|
||||
var message = wire.JoinGroupMessage.decode(wrapper.message)
|
||||
appDealer.send([channel, data])
|
||||
break
|
||||
case wire.MessageType.LEAVE_GROUP:
|
||||
var message = wire.LeaveGroupMessage.decode(wrapper.message)
|
||||
appDealer.send([channel, data])
|
||||
break
|
||||
case wire.MessageType.DEVICE_POKE:
|
||||
var message = wire.DevicePokeMessage.decode(wrapper.message)
|
||||
devDealer.send([message.channel, wireutil.makeProbeMessage()])
|
||||
|
|
|
@ -9,7 +9,6 @@ var wire = require('../wire')
|
|||
var wireutil = require('../util/wireutil')(wire)
|
||||
var devutil = require('../util/devutil')
|
||||
var ChannelManager = require('../wire/channelmanager')
|
||||
var streamutil = require('../util/streamutil')
|
||||
|
||||
module.exports = function(options) {
|
||||
var log = logger.createLogger('device')
|
||||
|
@ -50,7 +49,7 @@ module.exports = function(options) {
|
|||
switch (wrapper.type) {
|
||||
case wire.MessageType.GROUP:
|
||||
var message = wire.GroupMessage.decode(wrapper.message)
|
||||
, groupChannel = message.group
|
||||
, groupChannel = message.channel
|
||||
if (wireutil.matchesRequirements(identity, message.requirements)) {
|
||||
channels.register(groupChannel, message.timeout)
|
||||
log.info('Subscribing to group channel "%s"', groupChannel)
|
||||
|
@ -70,6 +69,7 @@ module.exports = function(options) {
|
|||
break
|
||||
case wire.MessageType.SHELL_COMMAND:
|
||||
var message = wire.ShellCommandMessage.decode(wrapper.message)
|
||||
log.info('Running shell command "%s"', message.command.join(' '))
|
||||
adb.shellAsync(options.serial, message.command)
|
||||
.then(function(stream) {
|
||||
var resolver = Promise.defer()
|
||||
|
@ -115,7 +115,7 @@ module.exports = function(options) {
|
|||
log.error('Shell command "%s" failed due to "%s"'
|
||||
, message.command.join(' '), err.message)
|
||||
push.send([message.channel,
|
||||
wire.makeShellCommandFailMessage(options.serial, err.message)])
|
||||
wire.makeDeviceFailMessage(options.serial, err.message)])
|
||||
})
|
||||
break
|
||||
}
|
||||
|
|
|
@ -55,6 +55,15 @@ module.exports = function(wire) {
|
|||
, envelope: function(type, message) {
|
||||
return new wire.Envelope(type, message.encode()).encodeNB()
|
||||
}
|
||||
, makeGroupMessage: function(channel, timeout, requirements) {
|
||||
var message = new wire.GroupMessage(
|
||||
channel
|
||||
, timeout
|
||||
, requirements
|
||||
)
|
||||
|
||||
return wireutil.envelope(wire.MessageType.GROUP, message)
|
||||
}
|
||||
, makeJoinGroupMessage: function(serial) {
|
||||
var message = new wire.JoinGroupMessage(serial)
|
||||
return wireutil.envelope(wire.MessageType.JOIN_GROUP, message)
|
||||
|
|
|
@ -77,7 +77,7 @@ message DeviceRequirement {
|
|||
}
|
||||
|
||||
message GroupMessage {
|
||||
required string group = 1;
|
||||
required string channel = 1;
|
||||
required uint32 timeout = 2;
|
||||
repeated DeviceRequirement requirements = 3;
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue