1
0
Fork 0
mirror of https://github.com/openstf/stf synced 2025-10-04 18:29:17 +02:00

Switch to protocol buffers for communication.

This commit is contained in:
Simo Kinnunen 2014-01-22 16:34:07 +09:00
parent f60cf2008b
commit 90e405a341
8 changed files with 354 additions and 36 deletions

View file

@ -1,6 +1,8 @@
var zmq = require('zmq')
var logger = require('../util/logger')
var wire = require('../wire')
var wireutil = require('../util/wireutil')(wire)
module.exports = function(options) {
var log = logger.createLogger('coordinator')
@ -27,7 +29,19 @@ module.exports = function(options) {
devDealer.connect(endpoint)
})
devDealer.on('message', function() {
appDealer.send([].slice.call(arguments))
devDealer.on('message', function(channel, data) {
var wrapper = wire.Envelope.decode(data)
switch (wrapper.type) {
case wire.MessageType.DEVICE_POKE:
var message = wire.DevicePokeMessage.decode(wrapper.message)
devDealer.send([message.channel, wireutil.makeProbeMessage()])
break
case wire.MessageType.DEVICE_STATUS:
var message = wire.DeviceStatusMessage.decode(wrapper.message)
break
case wire.MessageType.DEVICE_PROPERTIES:
var message = wire.DevicePropertiesMessage.decode(wrapper.message)
break
}
})
}

View file

@ -4,13 +4,24 @@ var Promise = require('bluebird')
var zmq = require('zmq')
var adbkit = require('adbkit')
var logger = require('../util/logger')
var wire = require('../wire')
var wireutil = require('../util/wireutil')(wire)
var devutil = require('../util/devutil')
var ChannelManager = require('../wire/channelmanager')
module.exports = function(options) {
var logger = require('../util/logger')
var log = logger.createLogger('device')
var identity = Object.create(null)
var solo = wireutil.makePrivateChannel()
var channels = new ChannelManager()
// Show serial number in logs
logger.setGlobalIdentifier(options.serial)
// Adb
var adb = Promise.promisifyAll(adbkit.createClient())
// Input
var sub = zmq.socket('sub')
options.endpoints.sub.forEach(function(endpoint) {
@ -18,39 +29,47 @@ module.exports = function(options) {
sub.connect(endpoint)
})
sub.on('message', function(channel, id, cmd) {
push.send([id, options.serial, 'ACK'])
switch (cmd.toString()) {
case 'ls':
log.info('Responding to "ls"')
push.send([id, options.serial, 'OKY'])
// Establish always-on channels
;[wireutil.global, solo].forEach(function(channel) {
log.info('Subscribing to permanent channel "%s"', channel)
sub.subscribe(channel)
channels.register(channel, Infinity)
})
// Unsubscribe from temporary channels when they timeout
channels.on('timeout', function(channel) {
log.info('Channel "%s" timed out', channel)
sub.unsubscribe(channel)
push.send([channel, wireutil.makeLeaveGroupMessage(options.serial)])
})
sub.on('message', function(channel, data) {
var wrapper = wire.Envelope.decode(data)
channels.keepalive(channel)
switch (wrapper.type) {
case wire.MessageType.GROUP:
var message = wire.GroupMessage.decode(wrapper.message)
, groupChannel = message.group
if (wireutil.matchesRequirements(identity, message.requirements)) {
channels.register(groupChannel, 600000)
log.info('Subscribing to group channel "%s"', groupChannel)
sub.subscribe(groupChannel)
push.send([groupChannel,
wireutil.makeJoinGroupMessage(options.serial)])
}
break
case 'shell':
var line = arguments[3]
log.info('Running shell command "%s"', line)
adb.shellAsync(options.serial, line)
.then(function(out) {
var chunks = []
out.on('data', function(chunk) {
chunks.push(chunk)
})
out.on('end', function(chunk) {
push.send([id, options.serial, 'OKY', Buffer.concat(chunks)])
})
case wire.MessageType.PROBE:
var message = wire.ProbeMessage.decode(wrapper.message)
adb.getPropertiesAsync(options.serial)
.then(function(properties) {
identity = devutil.makeIdentity(options.serial, properties)
push.send([channel,
wireutil.makeDevicePropertiesMessage(options.serial, properties)])
})
.catch(function(err) {
push.send([id, options.serial, 'ERR', err.message])
})
break
default:
log.warn('Unknown command "%s"', cmd)
break
}
})
// Respond to messages directed to everyone
sub.subscribe('ALL')
// Output
var push = zmq.socket('push')
options.endpoints.push.forEach(function(endpoint) {
@ -58,11 +77,10 @@ module.exports = function(options) {
push.connect(endpoint)
})
// Introduce worker
// push.send(['HELO', options.serial])
// Adb
var adb = Promise.promisifyAll(adbkit.createClient())
function poke() {
push.send([wireutil.global,
wireutil.makeDevicePokeMessage(options.serial, solo)])
}
function gracefullyExit() {
log.info('Bye')
@ -76,4 +94,6 @@ module.exports = function(options) {
process.on('SIGTERM', function() {
gracefullyExit()
})
poke()
}

View file

@ -1,9 +1,15 @@
var path = require('path')
var adb = require('adbkit')
var Promise = require('bluebird')
var zmq = require('zmq')
var logger = require('../util/logger')
var wire = require('../wire')
var wireutil = require('../util/wireutil')(wire)
module.exports = function(options) {
var log = require('../util/logger').createLogger('provider')
var log = logger.createLogger('provider')
var client = Promise.promisifyAll(adb.createClient())
var workers = Object.create(null)
@ -43,6 +49,11 @@ module.exports = function(options) {
})
})
function pushDeviceStatus(device, type) {
push.send([wireutil.global,
wireutil.makeDeviceStatusMessage(device.id, type)])
}
function isWantedDevice(device) {
return options.filter ? options.filter(device) : true
}
@ -63,6 +74,7 @@ module.exports = function(options) {
function maybeConnect(device) {
if (isConnectable(device) && !isConnected(device)) {
pushDeviceStatus(device, device.type)
log.info('Spawning worker for device "%s"', device.id)
var proc = options.fork(device)
proc.on('error', function(err) {
@ -100,6 +112,7 @@ module.exports = function(options) {
function maybeDisconnect(device) {
if (isConnected(device)) {
pushDeviceStatus(device, 'absent')
log.info('Releasing worker of %s', device.id)
gracefullyKillWorker(device.id, function() { /* noop */ })
return true

33
lib/util/devutil.js Normal file
View file

@ -0,0 +1,33 @@
module.exports = {
makeIdentity: function(serial, properties) {
var model = properties['ro.product.model']
, brand = properties['ro.product.brand']
, manufacturer = properties['ro.product.manufacturer']
, version = properties['ro.build.version.release']
, sdk = +properties['ro.build.version.sdk']
, abi = properties['ro.product.cpu.abi']
// Remove brand prefix for consistency
if (model.substr(0, brand.length) === brand) {
model = model.substr(brand.length)
}
// Remove manufacturer prefix for consistency
if (model.substr(0, manufacturer.length) === manufacturer) {
model = model.substr(manufacturer.length)
}
// Clean up remaining model name
model = model.replace(/[_ ]/g, '')
return {
platform: 'android'
, serial: serial
, manufacturer: manufacturer
, model: model
, version: version
, sdk: sdk
, abi: abi
}
}
}

95
lib/util/wireutil.js Normal file
View file

@ -0,0 +1,95 @@
var semver = require('semver')
var minimatch = require('minimatch')
var uuid = require('node-uuid')
module.exports = function(wire) {
var wireutil = {
global: '*ALL'
, makePrivateChannel: function() {
return uuid.v4(null, new Buffer(16)).toString('base64')
}
, toDeviceStatus: function(type) {
return wire.DeviceStatus[{
device: 'ONLINE'
, emulator: 'ONLINE'
, unauthorized: 'UNAUTHORIZED'
, offline: 'OFFLINE'
, absent: 'ABSENT'
}[type]]
}
, toDeviceType: function(type) {
return wire.DeviceStatus[{
device: 'PHYSICAL'
, emulator: 'VIRTUAL'
}[type]]
}
, matchesRequirements: function(capabilities, requirements) {
return requirements.every(function(req) {
var capability = capabilities[req.name]
if (!capability) {
return false
}
switch (req.type) {
case wire.RequirementType.SEMVER:
if (!semver.satisfies(capability, req.value)) {
return false
}
break
case wire.RequirementType.GLOB:
if (!minimatch(capability, req.value)) {
return false
}
break
case wire.RequirementType.EXACT:
if (capability !== req.value) {
return false
}
break
default:
return false
}
})
}
, envelope: function(type, message) {
return new wire.Envelope(type, message.encode()).encodeNB()
}
, makeJoinGroupMessage: function(serial) {
var message = new wire.JoinGroupMessage(serial)
return wireutil.envelope(wire.MessageType.JOIN_GROUP, message)
}
, makeLeaveGroupMessage: function(serial) {
var message = new wire.LeaveGroupMessage(serial)
return wireutil.envelope(wire.MessageType.LEAVE_GROUP, message)
}
, makeDevicePokeMessage: function(serial, channel) {
var message = new wire.DevicePokeMessage(serial, channel)
return wireutil.envelope(wire.MessageType.DEVICE_POKE, message)
}
, makeDevicePropertiesMessage: function(serial, properties) {
var message = new wire.DevicePropertiesMessage(
serial
, Object.keys(properties).map(function(key) {
return new wire.DeviceProperty(key, properties[key])
})
)
return wireutil.envelope(wire.MessageType.DEVICE_PROPERTIES, message)
}
, makeDeviceStatusMessage: function(serial, type) {
var message = new wire.DeviceStatusMessage(
serial
, wireutil.toDeviceStatus(type)
)
return wireutil.envelope(wire.MessageType.DEVICE_STATUS, message)
}
, makeProbeMessage: function() {
var message = new wire.ProbeMessage()
return wireutil.envelope(wire.MessageType.PROBE, message)
}
}
return wireutil
}

View file

@ -0,0 +1,51 @@
var events = require('events')
var util = require('util')
function ChannelManager() {
events.EventEmitter.call(this)
this.channels = Object.create(null)
}
util.inherits(ChannelManager, events.EventEmitter)
ChannelManager.prototype.register = function(id, timeout) {
this.channels[id] = {
timeout: timeout
, lastActivity: Date.now()
, timer: null
}
// Set timer with initial check
this.check(id)
}
ChannelManager.prototype.unregister = function(id) {
var channel = this.channels[id]
delete this.channels[id]
clearInterval(channel.timer)
}
ChannelManager.prototype.keepalive = function(id) {
var channel = this.channels[id]
if (channel) {
channel.lastActivity = Date.now()
}
}
ChannelManager.prototype.check = function(id) {
var channel = this.channels[id]
, inactivePeriod = Date.now() - channel.lastActivity
if (inactivePeriod >= channel.timeout) {
this.unregister(id)
this.emit('timeout', id)
}
else if (channel.timeout < Infinity) {
channel.timer = setTimeout(
this.check.bind(this, id)
, channel.timeout - inactivePeriod
)
}
}
module.exports = ChannelManager

6
lib/wire/index.js Normal file
View file

@ -0,0 +1,6 @@
var path = require('path')
var ProtoBuf = require('protobufjs')
module.exports =
ProtoBuf.loadProtoFile(path.join(__dirname, 'wire.proto')).build()

86
lib/wire/wire.proto Normal file
View file

@ -0,0 +1,86 @@
// Message wrapper
enum MessageType {
DEVICE_POKE = 1;
DEVICE_STATUS = 2;
DEVICE_TYPE = 3;
DEVICE_PROPERTIES = 4;
GROUP = 5;
JOIN_GROUP = 6;
LEAVE_GROUP = 7;
PROBE = 8;
}
message Envelope {
required MessageType type = 1;
required bytes message = 2;
}
// Introductions
message DevicePokeMessage {
required string serial = 1;
required string channel = 2;
}
message ProbeMessage {
}
enum DeviceStatus {
ABSENT = 1;
OFFLINE = 2;
UNAUTHORIZED = 3;
ONLINE = 4;
}
message DeviceStatusMessage {
required string serial = 1;
required DeviceStatus status = 2;
}
enum DeviceType {
PHYSICAL = 1;
VIRTUAL = 2;
}
message DeviceTypeMessage {
required string serial = 1;
required DeviceType type = 2;
}
message DeviceProperty {
required string name = 1;
required string value = 2;
}
message DevicePropertiesMessage {
required string serial = 1;
repeated DeviceProperty properties = 2;
}
// Grouping
enum RequirementType {
SEMVER = 1;
GLOB = 2;
EXACT = 3;
}
message DeviceRequirement {
required string name = 1;
required string value = 2;
required RequirementType type = 3;
}
message GroupMessage {
required string group = 1;
repeated DeviceRequirement requirements = 2;
}
message JoinGroupMessage {
required string serial = 1;
}
message LeaveGroupMessage {
required string serial = 1;
}