1
0
Fork 0
mirror of https://github.com/openstf/stf synced 2025-10-04 02:09:32 +02:00

Don't rely on lucky timings when sending messages from the provider.

This commit is contained in:
Simo Kinnunen 2014-02-05 15:37:28 +09:00
parent 36d0af71b5
commit 91097dc569
6 changed files with 153 additions and 84 deletions

View file

@ -84,6 +84,12 @@ dbapi.setDeviceAbsent = function(serial) {
})) }))
} }
dbapi.setDeviceChannel = function(serial, channel) {
return db.run(r.table('devices').get(serial).update({
channel: channel
}))
}
dbapi.saveDeviceIdentity = function(serial, identity) { dbapi.saveDeviceIdentity = function(serial, identity) {
return db.run(r.table('devices').get(serial).update({ return db.run(r.table('devices').get(serial).update({
platform: identity.platform platform: identity.platform
@ -105,16 +111,4 @@ dbapi.loadDevice = function(serial) {
return db.run(r.table('devices').get(serial)) return db.run(r.table('devices').get(serial))
} }
dbapi.ensureDeviceSaved = function(serial) {
return dbapi.loadDevice(serial)
.then(function(device) {
if (!device) {
return Promise.delay(100).then(function() {
return dbapi.loadDevice(serial)
})
}
return device
})
}
module.exports = dbapi module.exports = dbapi

View file

@ -431,8 +431,13 @@ module.exports = function(options) {
.handler()) .handler())
function poke() { function poke() {
push.send([wireutil.global, push.send([
wireutil.makeDevicePokeMessage(options.serial, solo)]) wireutil.global
, wireutil.envelope(new wire.DevicePokeMessage(
options.serial
, solo
))
])
} }
function isGrouped() { function isGrouped() {

View file

@ -32,10 +32,20 @@ module.exports = function(options) {
}) })
devDealer.on('message', wirerouter() devDealer.on('message', wirerouter()
// Initial device message
.on(wire.DevicePresentMessage, function(channel, message, data) { .on(wire.DevicePresentMessage, function(channel, message, data) {
dbapi.saveDevice(message.serial, message) dbapi.saveDevice(message.serial, message)
appDealer.send([channel, data]) .then(function() {
devDealer.send([
message.provider.channel
, wireutil.envelope(new wire.DeviceRegisteredMessage(
message.serial
))
])
appDealer.send([channel, data])
})
}) })
// Workerless messages
.on(wire.DeviceAbsentMessage, function(channel, message, data) { .on(wire.DeviceAbsentMessage, function(channel, message, data) {
dbapi.setDeviceAbsent(message.serial) dbapi.setDeviceAbsent(message.serial)
appDealer.send([channel, data]) appDealer.send([channel, data])
@ -44,6 +54,17 @@ module.exports = function(options) {
dbapi.saveDeviceStatus(message.serial, message.status) dbapi.saveDeviceStatus(message.serial, message.status)
appDealer.send([channel, data]) appDealer.send([channel, data])
}) })
// Worker initialized
.on(wire.DevicePokeMessage, function(channel, message) {
dbapi.setDeviceChannel(message.serial, message.channel)
.then(function() {
devDealer.send([
message.channel
, wireutil.envelope(new wire.ProbeMessage())
])
})
})
// Worker messages
.on(wire.JoinGroupMessage, function(channel, message, data) { .on(wire.JoinGroupMessage, function(channel, message, data) {
dbapi.setDeviceOwner(message.serial, message.owner) dbapi.setDeviceOwner(message.serial, message.owner)
appDealer.send([channel, data]) appDealer.send([channel, data])
@ -56,12 +77,6 @@ module.exports = function(options) {
dbapi.saveDeviceLog(message.serial, message) dbapi.saveDeviceLog(message.serial, message)
appDealer.send([channel, data]) appDealer.send([channel, data])
}) })
.on(wire.DevicePokeMessage, function(channel, message) {
dbapi.ensureDeviceSaved(message.serial)
.then(function() {
devDealer.send([message.channel, wireutil.makeProbeMessage()])
})
})
.on(wire.DeviceIdentityMessage, function(channel, message, data) { .on(wire.DeviceIdentityMessage, function(channel, message, data) {
dbapi.saveDeviceIdentity(message.serial, message) dbapi.saveDeviceIdentity(message.serial, message)
appDealer.send([channel, data]) appDealer.send([channel, data])

View file

@ -9,12 +9,14 @@ var _ = require('lodash')
var logger = require('../util/logger') var logger = require('../util/logger')
var wire = require('../wire') var wire = require('../wire')
var wireutil = require('../wire/util') var wireutil = require('../wire/util')
var wirerouter = require('../wire/router')
var procutil = require('../util/procutil') var procutil = require('../util/procutil')
module.exports = function(options) { module.exports = function(options) {
var log = logger.createLogger('provider') var log = logger.createLogger('provider')
var client = Promise.promisifyAll(adb.createClient()) var client = Promise.promisifyAll(adb.createClient())
var workers = {} var workers = {}
var solo = wireutil.makePrivateChannel()
var lists = { var lists = {
all: [] all: []
, ready: [] , ready: []
@ -58,6 +60,19 @@ module.exports = function(options) {
push.connect(endpoint) push.connect(endpoint)
}) })
// Input
var sub = zmq.socket('sub')
options.endpoints.sub.forEach(function(endpoint) {
log.info('Receiving input from %s', endpoint)
sub.connect(endpoint)
})
// Establish always-on channels
;[solo].forEach(function(channel) {
log.info('Subscribing to permanent channel "%s"', channel)
sub.subscribe(channel)
})
// Track and manage devices // Track and manage devices
client.trackDevicesAsync().then(function(tracker) { client.trackDevicesAsync().then(function(tracker) {
log.info('Tracking devices') log.info('Tracking devices')
@ -82,85 +97,119 @@ module.exports = function(options) {
tracker.on('add', filterDevice(function(device) { tracker.on('add', filterDevice(function(device) {
log.info('Found device "%s" (%s)', device.id, device.type) log.info('Found device "%s" (%s)', device.id, device.type)
// Tell others we found a device var privateTracker = new events.EventEmitter()
push.send([ , timer
wireutil.global , worker
, wireutil.envelope(new wire.DevicePresentMessage(
device.id // Wait for others to acknowledge the device
, options.name var register = new Promise(function(resolve, reject) {
, wireutil.toDeviceStatus(device.type) // Tell others we found a device
)) push.send([
]) wireutil.global
, wireutil.envelope(new wire.DevicePresentMessage(
device.id
, wireutil.toDeviceStatus(device.type)
, new wire.ProviderMessage(
solo
, options.name
)
))
])
privateTracker.once('register', resolve)
})
register.then(function() {
log.info('Registered device "%s"', device.id)
check()
})
// Statistics // Statistics
lists.all.push(device.id) lists.all.push(device.id)
delayedTotals() delayedTotals()
var privateTracker = new events.EventEmitter() // The device object will be kept up to date by the tracker, except
, resolver = Promise.defer() // our custom "present" property
, timer _.assign(device, {
, worker present: true
})
// When any event occurs on the added device // When any event occurs on the added device
function deviceListener(type, device) { function deviceListener(type) {
// Okay, this is a bit unnecessary but it allows us to get rid of an // Okay, this is a bit unnecessary but it allows us to get rid of an
// ugly switch statement and return to the original style. // ugly switch statement and return to the original style.
privateTracker.emit(type, device) privateTracker.emit(type)
} }
// When the added device changes // When the added device changes
function changeListener(device) { function changeListener() {
log.info('Device "%s" is now "%s"', device.id, device.type) register.then(function() {
log.info('Device "%s" is now "%s"', device.id, device.type)
// Tell others the device changed // Tell others the device changed
push.send([ push.send([
wireutil.global wireutil.global
, wireutil.envelope(new wire.DeviceStatusMessage( , wireutil.envelope(new wire.DeviceStatusMessage(
device.id device.id
, wireutil.toDeviceStatus(device.type) , wireutil.toDeviceStatus(device.type)
)) ))
]) ])
check(device) check()
})
} }
// When the added device gets removed // When the added device gets removed
function removeListener(device) { function removeListener() {
log.info('Lost device "%s" (%s)', device.id, device.type) register.then(function() {
log.info('Lost device "%s" (%s)', device.id, device.type)
clearTimeout(timer) clearTimeout(timer)
flippedTracker.removeListener(device.id, deviceListener) flippedTracker.removeListener(device.id, deviceListener)
_.pull(lists.all, device.id) _.pull(lists.all, device.id)
delayedTotals() delayedTotals()
// Tell others the device is gone // Tell others the device is gone
push.send([ push.send([
wireutil.global wireutil.global
, wireutil.envelope(new wire.DeviceAbsentMessage( , wireutil.envelope(new wire.DeviceAbsentMessage(
device.id device.id
)) ))
]) ])
stop() _.assign(device, {
present: false
})
check()
})
} }
// Check if we can do anything with the device // Check if we can do anything with the device
function check(device) { function check() {
clearTimeout(timer) clearTimeout(timer)
switch (device.type) {
case 'device': if (device.present) {
case 'emulator': // We might get multiple status updates in rapid succession,
timer = setTimeout(work, 100) // so let's wait for a while
break switch (device.type) {
default: case 'device':
stop() case 'emulator':
break timer = setTimeout(work, 100)
break
default:
timer = setTimeout(stop, 100)
break
}
}
else {
stop()
} }
} }
// Starts a device worker and keeps it alive // Starts a device worker and keeps it alive
function work() { function work() {
return worker = workers[device.id] = spawn(device) return worker = workers[device.id] = spawn()
.then(function() { .then(function() {
log.info('Device worker "%s" has retired', device.id) log.info('Device worker "%s" has retired', device.id)
worker = workers[device.id] = null worker = workers[device.id] = null
@ -188,7 +237,7 @@ module.exports = function(options) {
} }
// Spawn a device worker // Spawn a device worker
function spawn(device) { function spawn() {
var ports = options.ports.splice(0, 2) var ports = options.ports.splice(0, 2)
, proc = options.fork(device, ports) , proc = options.fork(device, ports)
, resolver = Promise.defer() , resolver = Promise.defer()
@ -272,7 +321,6 @@ module.exports = function(options) {
flippedTracker.on(device.id, deviceListener) flippedTracker.on(device.id, deviceListener)
privateTracker.on('change', changeListener) privateTracker.on('change', changeListener)
privateTracker.on('remove', removeListener) privateTracker.on('remove', removeListener)
check(device)
})) }))
tracker.on('change', filterDevice(function(device) { tracker.on('change', filterDevice(function(device) {
@ -282,6 +330,12 @@ module.exports = function(options) {
tracker.on('remove', filterDevice(function(device) { tracker.on('remove', filterDevice(function(device) {
flippedTracker.emit(device.id, 'remove', device) flippedTracker.emit(device.id, 'remove', device)
})) }))
sub.on('message', wirerouter()
.on(wire.DeviceRegisteredMessage, function(channel, message) {
flippedTracker.emit(message.serial, 'register')
})
.handler())
}) })
function gracefullyExit() { function gracefullyExit() {

View file

@ -40,12 +40,6 @@ var wireutil = {
, entry.identifier , entry.identifier
)) ))
} }
, makeDevicePokeMessage: function(serial, channel) {
return wireutil.envelope(new wire.DevicePokeMessage(
serial
, channel
))
}
, makeDeviceIdentityMessage: function(serial, identity) { , makeDeviceIdentityMessage: function(serial, identity) {
return wireutil.envelope(new wire.DeviceIdentityMessage( return wireutil.envelope(new wire.DeviceIdentityMessage(
serial serial
@ -78,9 +72,6 @@ var wireutil = {
}) })
)) ))
} }
, makeProbeMessage: function() {
return wireutil.envelope(new wire.ProbeMessage())
}
, makeShellCommandMessage: function(channel, command) { , makeShellCommandMessage: function(channel, command) {
return wireutil.envelope(new wire.ShellCommandMessage( return wireutil.envelope(new wire.ShellCommandMessage(
channel channel

View file

@ -25,6 +25,7 @@ enum MessageType {
KeyDownMessage = 23; KeyDownMessage = 23;
KeyUpMessage = 24; KeyUpMessage = 24;
KeyPressMessage = 25; KeyPressMessage = 25;
DeviceRegisteredMessage = 26;
} }
message Envelope { message Envelope {
@ -46,10 +47,19 @@ message DeviceLogMessage {
// Introductions // Introductions
message ProviderMessage {
required string channel = 1;
required string name = 2;
}
message DevicePresentMessage { message DevicePresentMessage {
required string serial = 1; required string serial = 1;
required string provider = 2; required DeviceStatus status = 2;
required DeviceStatus status = 4; required ProviderMessage provider = 3;
}
message DeviceRegisteredMessage {
required string serial = 1;
} }
message DeviceAbsentMessage { message DeviceAbsentMessage {