1
0
Fork 0
mirror of https://github.com/openstf/stf synced 2025-10-04 10:19:30 +02:00

Initial version of transactions. Shell command implemented as an example. Still needs channel cleanup on app side, handling a device dying in the middle of a transaction, and getting device list back to normal.

This commit is contained in:
Simo Kinnunen 2014-02-21 11:43:51 +09:00
parent 84207e1f36
commit 8769b8040c
19 changed files with 433 additions and 181 deletions

View file

@ -8,7 +8,8 @@
"se7en-bootstrap-3": "git@ghe.amb.ca.local:stf/se7en-bootstrap-3.git", "se7en-bootstrap-3": "git@ghe.amb.ca.local:stf/se7en-bootstrap-3.git",
"socket.io-client": "~0.9.16", "socket.io-client": "~0.9.16",
"oboe": "~1.12.2", "oboe": "~1.12.2",
"lodash": "~2.4.1" "lodash": "~2.4.1",
"bluebird": "~1.0.6"
}, },
"private": true "private": true
} }

View file

@ -8,6 +8,7 @@ var validator = require('express-validator')
var socketio = require('socket.io') var socketio = require('socket.io')
var zmq = require('zmq') var zmq = require('zmq')
var Promise = require('bluebird') var Promise = require('bluebird')
var _ = require('lodash')
var logger = require('../util/logger') var logger = require('../util/logger')
var pathutil = require('../util/pathutil') var pathutil = require('../util/pathutil')
@ -199,6 +200,41 @@ module.exports = function(options) {
var channels = [] var channels = []
, user = socket.handshake.user , user = socket.handshake.user
function joinChannel(channel) {
channels.push(channel)
groupRouter.on(channel, messageListener)
sub.subscribe(channel)
}
function leaveChannel(channel) {
_.pull(channels, channel)
groupRouter.removeListener(channel, messageListener)
sub.unsubscribe(channel)
}
function createTouchHandler(klass) {
return function(channel, data) {
push.send([
channel
, wireutil.envelope(new klass(
data.x
, data.y
))
])
}
}
function createKeyHandler(klass) {
return function(channel, data) {
push.send([
channel
, wireutil.envelope(new klass(
data.key
))
])
}
}
var messageListener = wirerouter() var messageListener = wirerouter()
.on(wire.JoinGroupMessage, function(channel, message) { .on(wire.JoinGroupMessage, function(channel, message) {
socket.emit('group.join', message) socket.emit('group.join', message)
@ -218,6 +254,13 @@ module.exports = function(options) {
.on(wire.DeviceIdentityMessage, function(channel, message) { .on(wire.DeviceIdentityMessage, function(channel, message) {
socket.emit('device.identity', message) socket.emit('device.identity', message)
}) })
.on(wire.TransactionProgressMessage, function(channel, message) {
socket.emit('tx.progress', channel.toString(), message)
})
.on(wire.TransactionDoneMessage, function(channel, message) {
//leaveChannel(channel)
socket.emit('tx.done', channel.toString(), message)
})
.handler() .handler()
// Global messages // Global messages
@ -228,20 +271,19 @@ module.exports = function(options) {
groupRouter.on(wireutil.global, messageListener) groupRouter.on(wireutil.global, messageListener)
// User's private group // User's private group
channels.push(user.group) joinChannel(user.group)
sub.subscribe(user.group)
groupRouter.on(user.group, messageListener)
socket
// Clean up all listeners and subscriptions // Clean up all listeners and subscriptions
socket.on('disconnect', function() { .on('disconnect', function() {
groupRouter.removeListener(wireutil.global, messageListener) groupRouter.removeListener(wireutil.global, messageListener)
channels.forEach(function(channel) { channels.forEach(function(channel) {
groupRouter.removeListener(channel, messageListener) groupRouter.removeListener(channel, messageListener)
sub.unsubscribe(channel) sub.unsubscribe(channel)
}) })
}) })
// Grouping
socket.on('group.invite', function(data) { .on('group.invite', function(data) {
push.send([ push.send([
wireutil.global wireutil.global
, wireutil.envelope(new wire.GroupMessage( , wireutil.envelope(new wire.GroupMessage(
@ -255,8 +297,7 @@ module.exports = function(options) {
)) ))
]) ])
}) })
.on('group.kick', function(data) {
socket.on('group.kick', function(data) {
push.send([ push.send([
user.group user.group
, wireutil.envelope(new wire.UngroupMessage( , wireutil.envelope(new wire.UngroupMessage(
@ -264,40 +305,16 @@ module.exports = function(options) {
)) ))
]) ])
}) })
// Touch events
function touchSender(klass) { .on('input.touchDown', createTouchHandler(wire.TouchDownMessage))
return function(channel, data) { .on('input.touchMove', createTouchHandler(wire.TouchMoveMessage))
push.send([ .on('input.touchUp', createTouchHandler(wire.TouchUpMessage))
channel .on('input.tap', createTouchHandler(wire.TapMessage))
, wireutil.envelope(new klass( // Key events
data.x .on('input.keyDown', createKeyHandler(wire.KeyDownMessage))
, data.y .on('input.keyUp', createKeyHandler(wire.KeyUpMessage))
)) .on('input.keyPress', createKeyHandler(wire.KeyPressMessage))
]) .on('input.type', function(channel, data) {
}
}
function keySender(klass) {
return function(channel, data) {
push.send([
channel
, wireutil.envelope(new klass(
data.key
))
])
}
}
socket.on('input.touchDown', touchSender(wire.TouchDownMessage))
socket.on('input.touchMove', touchSender(wire.TouchMoveMessage))
socket.on('input.touchUp', touchSender(wire.TouchUpMessage))
socket.on('input.tap', touchSender(wire.TapMessage))
socket.on('input.keyDown', keySender(wire.KeyDownMessage))
socket.on('input.keyUp', keySender(wire.KeyUpMessage))
socket.on('input.keyPress', keySender(wire.KeyPressMessage))
socket.on('input.type', function(channel, data) {
push.send([ push.send([
channel channel
, wireutil.envelope(new wire.TypeMessage( , wireutil.envelope(new wire.TypeMessage(
@ -305,7 +322,25 @@ module.exports = function(options) {
)) ))
]) ])
}) })
// Transactions
.on('shell.command', function(channel, responseChannel, data) {
joinChannel(responseChannel)
push.send([
channel
, wireutil.transaction(
responseChannel
, new wire.ShellCommandMessage(data)
)
])
})
.on('shell.keepalive', function(channel, data) {
push.send([
channel
, wireutil.envelope(new wire.ShellKeepAliveMessage(data))
])
})
/*
socket.on('flick', function(data) {}) socket.on('flick', function(data) {})
socket.on('back', function(data) {}) socket.on('back', function(data) {})
socket.on('forward', function(data) {}) socket.on('forward', function(data) {})
@ -338,7 +373,7 @@ module.exports = function(options) {
socket.on('selenium.allCookies', function(data) {}) socket.on('selenium.allCookies', function(data) {})
socket.on('forward.unset', function(data) {}) socket.on('forward.unset', function(data) {})
socket.on('forward.list', function(data) {}) socket.on('forward.list', function(data) {})
*/
//this._react 'forward.test', (data = {}) => //this._react 'forward.test', (data = {}) =>
// this._runTransaction 'forward.test', // this._runTransaction 'forward.test',
// this._insertOptionalIp data, 'targetHost' // this._insertOptionalIp data, 'targetHost'

View file

@ -334,24 +334,24 @@ module.exports = function(options) {
}) })
sub.on('message', wirerouter() sub.on('message', wirerouter()
.on('message', function(channel) {
channels.keepalive(channel)
})
.on(wire.ProbeMessage, function(channel, message) { .on(wire.ProbeMessage, function(channel, message) {
push.send([wireutil.global, push.send([wireutil.global,
wireutil.makeDeviceIdentityMessage(options.serial, identity)]) wireutil.makeDeviceIdentityMessage(options.serial, identity)])
channels.keepalive(channel)
}) })
.on(wire.GroupMessage, function(channel, message) { .on(wire.GroupMessage, function(channel, message) {
if (!isGrouped() && if (!isGrouped() &&
devutil.matchesRequirements(identity, message.requirements)) { devutil.matchesRequirements(identity, message.requirements)) {
joinGroup(message.owner, message.timeout) joinGroup(message.owner, message.timeout)
} }
channels.keepalive(channel)
}) })
.on(wire.UngroupMessage, function(channel, message) { .on(wire.UngroupMessage, function(channel, message) {
if (isGrouped() && if (isGrouped() &&
devutil.matchesRequirements(identity, message.requirements)) { devutil.matchesRequirements(identity, message.requirements)) {
leaveGroup() leaveGroup()
} }
channels.keepalive(channel)
}) })
.on(wire.TouchDownMessage, function(channel, message) { .on(wire.TouchDownMessage, function(channel, message) {
services.input.touchDownAsync(message.x, message.y) services.input.touchDownAsync(message.x, message.y)
@ -409,55 +409,85 @@ module.exports = function(options) {
}) })
}) })
.on(wire.ShellCommandMessage, function(channel, message) { .on(wire.ShellCommandMessage, function(channel, message) {
log.info('Running shell command "%s"', message.command.join(' ')) var router = this
, seq = 0
log.info('Running shell command "%s"', message.command)
adb.shellAsync(options.serial, message.command) adb.shellAsync(options.serial, message.command)
.then(function(stream) { .then(function(stream) {
var resolver = Promise.defer() var resolver = Promise.defer()
, seq = 0 , timer
function dataListener(chunk) { function keepAliveListener(channel, message) {
push.send([message.channel, clearTimeout(timer)
wireutil.makeShellCommandDataMessage( timer = setTimeout(forceStop, message.timeout)
}
function readableListener() {
var chunk
while (chunk = stream.read()) {
push.send([
channel
, wireutil.envelope(new wire.TransactionProgressMessage(
options.serial options.serial
, seq++ , seq++
, chunk , chunk
)]) ))
])
}
} }
function endListener() { function endListener() {
push.send([message.channel, push.send([
wireutil.makeShellCommandDoneMessage(options.serial)]) channel
, wireutil.envelope(new wire.TransactionDoneMessage(
options.serial
, seq++
, true
))
])
resolver.resolve() resolver.resolve()
} }
function errorListener(err) { function errorListener(err) {
log.error('Shell command "%s" failed due to "%s"'
, message.command.join(' '), err.message)
resolver.reject(err) resolver.reject(err)
push.send([message.channel,
wireutil.makeShellCommandFailMessage(
options.serial
, err.message
)])
} }
stream.on('data', dataListener) function forceStop() {
stream.end()
}
stream.on('readable', readableListener)
stream.on('end', endListener) stream.on('end', endListener)
stream.on('error', errorListener) stream.on('error', errorListener)
sub.subscribe(channel)
router.on(wire.ShellKeepAliveMessage, keepAliveListener)
timer = setTimeout(forceStop, message.timeout)
return resolver.promise.finally(function() { return resolver.promise.finally(function() {
stream.removeListener('data', dataListener) stream.removeListener('readable', readableListener)
stream.removeListener('end', endListener) stream.removeListener('end', endListener)
stream.removeListener('error', errorListener) stream.removeListener('error', errorListener)
sub.unsubscribe(channel)
router.removeListener(wire.ShellKeepAliveMessage, keepAliveListener)
clearTimeout(timer)
}) })
}) })
.error(function(err) { .error(function(err) {
log.error('Shell command "%s" failed due to "%s"' log.error('Shell command "%s" failed due to "%s"'
, message.command.join(' '), err.message) , message.command, err.message)
push.send([message.channel, push.send([
wire.makeShellCommandFailMessage(options.serial, err.message)]) channel
, wireutil.envelope(new wire.TransactionDoneMessage(
options.serial
, seq++
, false
, err.message
))
])
}) })
channels.keepalive(channel)
}) })
.handler()) .handler())

View file

@ -81,13 +81,10 @@ module.exports = function(options) {
dbapi.saveDeviceIdentity(message.serial, message) dbapi.saveDeviceIdentity(message.serial, message)
appDealer.send([channel, data]) appDealer.send([channel, data])
}) })
.on(wire.ShellCommandDataMessage, function(channel, message, data) { .on(wire.TransactionProgressMessage, function(channel, message, data) {
appDealer.send([channel, data]) appDealer.send([channel, data])
}) })
.on(wire.ShellCommandDoneMessage, function(channel, message, data) { .on(wire.TransactionDoneMessage, function(channel, message, data) {
appDealer.send([channel, data])
})
.on(wire.ShellCommandFailMessage, function(channel, message, data) {
appDealer.send([channel, data]) appDealer.send([channel, data])
}) })
.handler()) .handler())

View file

@ -8,6 +8,9 @@ wire.ReverseMessageType = Object.keys(wire.MessageType)
.reduce( .reduce(
function(acc, type) { function(acc, type) {
var code = wire.MessageType[type] var code = wire.MessageType[type]
if (!wire[type]) {
throw new Error('wire.MessageType has unknown value "' + type + '"')
}
wire[type].$code = wire[type].prototype.$code = code wire[type].$code = wire[type].prototype.$code = code
acc[code] = type acc[code] = type
return acc return acc

View file

@ -34,10 +34,14 @@ Router.prototype.handler = function() {
if (type) { if (type) {
this.emit( this.emit(
wrapper.type wrapper.type
, channel , wrapper.channel || channel
, wire[type].decode(wrapper.message) , wire[type].decode(wrapper.message)
, data , data
) )
this.emit(
'message'
, channel
)
} }
else { else {
log.warn( log.warn(

View file

@ -29,6 +29,14 @@ var wireutil = {
, envelope: function(message) { , envelope: function(message) {
return new wire.Envelope(message.$code, message.encode()).encodeNB() return new wire.Envelope(message.$code, message.encode()).encodeNB()
} }
, transaction: function(channel, message) {
return new wire.Envelope(
message.$code
, message.encode()
, channel
)
.encodeNB()
}
, makeDeviceLogMessage: function(serial, entry) { , makeDeviceLogMessage: function(serial, entry) {
return wireutil.envelope(new wire.DeviceLogMessage( return wireutil.envelope(new wire.DeviceLogMessage(
serial serial

View file

@ -10,9 +10,8 @@ enum MessageType {
LeaveGroupMessage = 7; LeaveGroupMessage = 7;
ProbeMessage = 8; ProbeMessage = 8;
ShellCommandMessage = 9; ShellCommandMessage = 9;
ShellCommandDataMessage = 10; TransactionProgressMessage = 10;
ShellCommandDoneMessage = 11; TransactionDoneMessage = 11;
ShellCommandFailMessage = 12;
DeviceIdentityMessage = 13; DeviceIdentityMessage = 13;
DeviceLogMessage = 14; DeviceLogMessage = 14;
DevicePresentMessage = 16; DevicePresentMessage = 16;
@ -28,11 +27,26 @@ enum MessageType {
DeviceRegisteredMessage = 26; DeviceRegisteredMessage = 26;
DeviceLogcatEntryMessage = 27; DeviceLogcatEntryMessage = 27;
LogcatApplyFiltersMessage = 28; LogcatApplyFiltersMessage = 28;
ShellKeepAliveMessage = 29;
} }
message Envelope { message Envelope {
required MessageType type = 1; required MessageType type = 1;
required bytes message = 2; required bytes message = 2;
optional string channel = 3;
}
message TransactionProgressMessage {
required string serial = 1;
required uint32 seq = 2;
optional string data = 3;
}
message TransactionDoneMessage {
required string serial = 1;
required uint32 seq = 2;
required bool success = 3;
optional string data = 4;
} }
// Logging // Logging
@ -257,21 +271,10 @@ message LogcatApplyFiltersMessage {
// Commands // Commands
message ShellCommandMessage { message ShellCommandMessage {
required string channel = 1; required string command = 1;
repeated string command = 2; required uint32 timeout = 2;
} }
message ShellCommandDataMessage { message ShellKeepAliveMessage {
required string serial = 1; required uint32 timeout = 1;
required uint32 seq = 2;
required bytes data = 3;
}
message ShellCommandDoneMessage {
required string serial = 1;
}
message ShellCommandFailMessage {
required string serial = 1;
required string reason = 2;
} }

View file

@ -1,8 +1,8 @@
module.exports = function ControlServiceFactory($rootScope, socket) { module.exports = function ControlServiceFactory($rootScope, socket, TransactionService) {
var controlService = { var controlService = {
} }
function ControlService(channel) { function ControlService(devices, channel) {
var keyCodes = { var keyCodes = {
8: 8 // backspace 8: 8 // backspace
, 13: 13 // enter , 13: 13 // enter
@ -35,15 +35,16 @@ module.exports = function ControlServiceFactory($rootScope, socket) {
} }
function touchSender(type) { function touchSender(type) {
return function (x, y) { return function(x, y) {
socket.emit(type, channel, { socket.emit(type, channel, {
x: x, y: y x: x
, y: y
}) })
} }
} }
function keySender(type, fixedKey) { function keySender(type, fixedKey) {
return function (key) { return function(key) {
var mapped = fixedKey || keyCodes[key] var mapped = fixedKey || keyCodes[key]
if (mapped) { if (mapped) {
socket.emit(type, channel, { socket.emit(type, channel, {
@ -66,15 +67,28 @@ module.exports = function ControlServiceFactory($rootScope, socket) {
this.menu = keySender('input.keyPress', 93) this.menu = keySender('input.keyPress', 93)
this.back = keySender('input.keyPress', 4) this.back = keySender('input.keyPress', 4)
this.type = function (text) { this.type = function(text) {
socket.emit('input.type', channel, { socket.emit('input.type', channel, {
text: text text: text
}) })
} }
this.shell = function(command) {
var tx = TransactionService.create(devices)
socket.emit('shell.command', channel, tx.channel, {
command: command
, timeout: 10000
})
return tx
}
} }
controlService.forChannel = function (channel) { controlService.forOne = function(device, channel) {
return new ControlService(channel) return new ControlService([device], channel)
}
controlService.forMany = function(devices, channel) {
return new ControlService(devices, channel)
} }
return controlService return controlService

View file

@ -1,4 +1,5 @@
module.exports = angular.module('stf/control', [ module.exports = angular.module('stf/control', [
require('stf/socket').name require('stf/socket').name
]) ])
.factory('TransactionService', require('./transaction-service'))
.factory('ControlService', require('./control-service')) .factory('ControlService', require('./control-service'))

View file

@ -0,0 +1,124 @@
var Promise = require('bluebird')
module.exports = function TransactionServiceFactory(socket) {
var transactionService = {}
function createChannel() {
return 'tx' + Date.now() // @todo UUID
}
function Transaction(devices) {
var pending = Object.create(null)
, results = []
, channel = createChannel()
, resolver = Promise.defer()
function doneListener(someChannel, data) {
if (someChannel === channel) {
pending[data.serial].done(data)
}
}
function progressListener(someChannel, data) {
if (someChannel === channel) {
pending[data.serial].progress(data)
}
}
socket.on('tx.done', doneListener)
socket.on('tx.progress', progressListener)
this.channel = channel
this.results = results
this.promise = Promise.settle(devices.map(function(device) {
var pendingResult = new PendingTransactionResult(device)
pending[device.serial] = pendingResult
results.push(pendingResult.result)
return pendingResult.promise
}))
.finally(function() {
socket.removeListener('tx.done', doneListener)
socket.removeListener('tx.progress', progressListener)
})
.progressed(function() {
console.log('progressing')
return results
})
.then(function() {
return results
})
}
function PendingTransactionResult(device) {
var resolver = Promise.defer()
, result = new TransactionResult(device)
, seq = 0
, last = null
, error = null
, unplaced = []
resolver.promise.finally(function() {
result.settled = true
})
function readQueue() {
var message
, foundAny = false
while (message = unplaced[seq]) {
unplaced[seq] = void 0
if (seq === last) {
result.success = message.success
if (message.success) {
if (message.data) {
result.data[seq] = message.data
}
}
else {
result.error = message.data
}
resolver.resolve(result)
return
}
foundAny = true
result.data[seq++] = message.data
}
if (foundAny) {
resolver.progress(result)
}
}
this.progress = function(message) {
unplaced[message.seq] = message
readQueue()
}
this.done = function(message) {
last = message.seq
unplaced[message.seq] = message
readQueue()
}
this.result = result
this.promise = resolver.promise
}
function TransactionResult(device) {
this.device = device
this.settled = false
this.success = false
this.error = null
this.data = []
}
transactionService.create = function(devices) {
return new Transaction(devices)
}
return transactionService
}

View file

@ -1,5 +1,6 @@
var oboe = require('oboe') var oboe = require('oboe')
var _ = require('lodash') var _ = require('lodash')
var Promise = require('bluebird')
module.exports = function DeviceServiceFactory($rootScope, $http, socket) { module.exports = function DeviceServiceFactory($rootScope, $http, socket) {
var deviceService = { var deviceService = {
@ -69,6 +70,13 @@ module.exports = function DeviceServiceFactory($rootScope, $http, socket) {
insert(device) insert(device)
}) })
deviceService.list = function () {
return $http.get('/api/v1/devices')
.then(function(response) {
return response.data.devices
})
}
deviceService.get = function (serial) { deviceService.get = function (serial) {
return $http.get('/api/v1/devices/' + serial) return $http.get('/api/v1/devices/' + serial)
.then(function (response) { .then(function (response) {

View file

@ -5,7 +5,7 @@ module.exports = function DeviceScreenCtrl($scope, ScalingService) {
$scope.showScreen = true $scope.showScreen = true
$scope.ScalingService = ScalingService $scope.ScalingService = ScalingService
$scope.promiseOfDevice.then(function () { $scope.device.promise.then(function() {
$scope.ready = true $scope.ready = true
}) })
} }

View file

@ -5,7 +5,7 @@ module.exports = function DeviceScreenDirective($document, ScalingService, $root
restrict: 'E', restrict: 'E',
template: require('./screen.jade'), template: require('./screen.jade'),
link: function (scope, element, attrs) { link: function (scope, element, attrs) {
scope.promiseOfDevice.then(function (device) { scope.device.promise.then(function(device) {
var loader = new Image() var loader = new Image()
, canvas = element.find('canvas')[0] , canvas = element.find('canvas')[0]
, finger = element.find('span') , finger = element.find('span')

View file

@ -1,11 +1,11 @@
module.exports = function DeviceControlCtrl($scope, $routeParams, DeviceService, ControlService) { module.exports = function DeviceControlCtrl($scope, $routeParams, DeviceService, ControlService) {
$scope.device = null
$scope.control = null $scope.control = null
$scope.device = {
$scope.promiseOfDevice = DeviceService.get($routeParams.serial) promise: DeviceService.get($routeParams.serial)
.then(function (device) { .then(function(device) {
$scope.device = device $scope.device.value = device
$scope.control = ControlService.forChannel(device.channel) $scope.control = ControlService.forOne(device, device.channel)
return device return device
}) })
}
} }

View file

@ -1,4 +1,4 @@
h1 {{ device.serial }} h1 {{ device.value.serial }}
button(ng-click='showScreen = !showScreen') Show/Hide button(ng-click='showScreen = !showScreen') Show/Hide

View file

@ -1,9 +1,16 @@
h1 Devices4 h1 Devices4
ul.device-list ul.device-list
li(ng-repeat='device in devices track by device.serial') li(ng-repeat='device in devices.value track by device.serial')
span {{ device.serial }} {{ device.present ? 'present' : 'absent' }} {{ device.owner.email }} span {{ device.serial }} {{ device.present ? 'present' : 'absent' }} {{ device.owner.email }}
a(href='#!/devices/{{ device.serial }}') Linky a(href='#!/devices/{{ device.serial }}') Linky
button(ng-click="invite(device)") invite button(ng-click="invite(device)") invite
button(ng-click="kick(device)") kick button(ng-click="kick(device)") kick
div(ng-controller='ShellCommandCtrl')
input(type=text, ng-model='command')
button(ng-click='run(command)') run
table
tr(ng-repeat='result in results track by result.device.serial')
td {{ result.device.serial }}
td {{ result.data }}

View file

@ -12,3 +12,4 @@ module.exports = angular.module('device-list', [
}) })
}]) }])
.controller('DeviceListCtrl', require('./device-list-controller')) .controller('DeviceListCtrl', require('./device-list-controller'))
.controller('ShellCommandCtrl', require('./shell-controller'))

View file

@ -0,0 +1,16 @@
module.exports = function ShellCommandCtrl($scope) {
$scope.results = []
$scope.run = function(command) {
var cmd = $scope.control.shell(command)
return cmd.promise
.progressed(function(results) {
$scope.results = results
$scope.$digest()
})
.then(function(results) {
$scope.results = results
$scope.$digest()
})
}
}