diff --git a/lib/roles/coordinator.js b/lib/roles/coordinator.js index b6559d6a..20a73583 100644 --- a/lib/roles/coordinator.js +++ b/lib/roles/coordinator.js @@ -1,6 +1,8 @@ var zmq = require('zmq') var logger = require('../util/logger') +var wire = require('../wire') +var wireutil = require('../util/wireutil')(wire) module.exports = function(options) { var log = logger.createLogger('coordinator') @@ -27,7 +29,19 @@ module.exports = function(options) { devDealer.connect(endpoint) }) - devDealer.on('message', function() { - appDealer.send([].slice.call(arguments)) + devDealer.on('message', function(channel, data) { + var wrapper = wire.Envelope.decode(data) + switch (wrapper.type) { + case wire.MessageType.DEVICE_POKE: + var message = wire.DevicePokeMessage.decode(wrapper.message) + devDealer.send([message.channel, wireutil.makeProbeMessage()]) + break + case wire.MessageType.DEVICE_STATUS: + var message = wire.DeviceStatusMessage.decode(wrapper.message) + break + case wire.MessageType.DEVICE_PROPERTIES: + var message = wire.DevicePropertiesMessage.decode(wrapper.message) + break + } }) } diff --git a/lib/roles/device.js b/lib/roles/device.js index a3e36ea0..88bfcf59 100644 --- a/lib/roles/device.js +++ b/lib/roles/device.js @@ -4,13 +4,24 @@ var Promise = require('bluebird') var zmq = require('zmq') var adbkit = require('adbkit') +var logger = require('../util/logger') +var wire = require('../wire') +var wireutil = require('../util/wireutil')(wire) +var devutil = require('../util/devutil') +var ChannelManager = require('../wire/channelmanager') + module.exports = function(options) { - var logger = require('../util/logger') var log = logger.createLogger('device') + var identity = Object.create(null) + var solo = wireutil.makePrivateChannel() + var channels = new ChannelManager() // Show serial number in logs logger.setGlobalIdentifier(options.serial) + // Adb + var adb = Promise.promisifyAll(adbkit.createClient()) + // Input var sub = zmq.socket('sub') options.endpoints.sub.forEach(function(endpoint) { @@ -18,39 +29,47 @@ module.exports = function(options) { sub.connect(endpoint) }) - sub.on('message', function(channel, id, cmd) { - push.send([id, options.serial, 'ACK']) - switch (cmd.toString()) { - case 'ls': - log.info('Responding to "ls"') - push.send([id, options.serial, 'OKY']) + // Establish always-on channels + ;[wireutil.global, solo].forEach(function(channel) { + log.info('Subscribing to permanent channel "%s"', channel) + sub.subscribe(channel) + channels.register(channel, Infinity) + }) + + // Unsubscribe from temporary channels when they timeout + channels.on('timeout', function(channel) { + log.info('Channel "%s" timed out', channel) + sub.unsubscribe(channel) + push.send([channel, wireutil.makeLeaveGroupMessage(options.serial)]) + }) + + sub.on('message', function(channel, data) { + var wrapper = wire.Envelope.decode(data) + channels.keepalive(channel) + switch (wrapper.type) { + case wire.MessageType.GROUP: + var message = wire.GroupMessage.decode(wrapper.message) + , groupChannel = message.group + if (wireutil.matchesRequirements(identity, message.requirements)) { + channels.register(groupChannel, 600000) + log.info('Subscribing to group channel "%s"', groupChannel) + sub.subscribe(groupChannel) + push.send([groupChannel, + wireutil.makeJoinGroupMessage(options.serial)]) + } break - case 'shell': - var line = arguments[3] - log.info('Running shell command "%s"', line) - adb.shellAsync(options.serial, line) - .then(function(out) { - var chunks = [] - out.on('data', function(chunk) { - chunks.push(chunk) - }) - out.on('end', function(chunk) { - push.send([id, options.serial, 'OKY', Buffer.concat(chunks)]) - }) + case wire.MessageType.PROBE: + var message = wire.ProbeMessage.decode(wrapper.message) + adb.getPropertiesAsync(options.serial) + .then(function(properties) { + identity = devutil.makeIdentity(options.serial, properties) + push.send([channel, + wireutil.makeDevicePropertiesMessage(options.serial, properties)]) }) - .catch(function(err) { - push.send([id, options.serial, 'ERR', err.message]) - }) - break - default: - log.warn('Unknown command "%s"', cmd) break } }) - // Respond to messages directed to everyone - sub.subscribe('ALL') - // Output var push = zmq.socket('push') options.endpoints.push.forEach(function(endpoint) { @@ -58,11 +77,10 @@ module.exports = function(options) { push.connect(endpoint) }) - // Introduce worker - // push.send(['HELO', options.serial]) - - // Adb - var adb = Promise.promisifyAll(adbkit.createClient()) + function poke() { + push.send([wireutil.global, + wireutil.makeDevicePokeMessage(options.serial, solo)]) + } function gracefullyExit() { log.info('Bye') @@ -76,4 +94,6 @@ module.exports = function(options) { process.on('SIGTERM', function() { gracefullyExit() }) + + poke() } diff --git a/lib/roles/provider.js b/lib/roles/provider.js index fb1e910e..d343611e 100644 --- a/lib/roles/provider.js +++ b/lib/roles/provider.js @@ -1,9 +1,15 @@ +var path = require('path') + var adb = require('adbkit') var Promise = require('bluebird') var zmq = require('zmq') +var logger = require('../util/logger') +var wire = require('../wire') +var wireutil = require('../util/wireutil')(wire) + module.exports = function(options) { - var log = require('../util/logger').createLogger('provider') + var log = logger.createLogger('provider') var client = Promise.promisifyAll(adb.createClient()) var workers = Object.create(null) @@ -43,6 +49,11 @@ module.exports = function(options) { }) }) + function pushDeviceStatus(device, type) { + push.send([wireutil.global, + wireutil.makeDeviceStatusMessage(device.id, type)]) + } + function isWantedDevice(device) { return options.filter ? options.filter(device) : true } @@ -63,6 +74,7 @@ module.exports = function(options) { function maybeConnect(device) { if (isConnectable(device) && !isConnected(device)) { + pushDeviceStatus(device, device.type) log.info('Spawning worker for device "%s"', device.id) var proc = options.fork(device) proc.on('error', function(err) { @@ -100,6 +112,7 @@ module.exports = function(options) { function maybeDisconnect(device) { if (isConnected(device)) { + pushDeviceStatus(device, 'absent') log.info('Releasing worker of %s', device.id) gracefullyKillWorker(device.id, function() { /* noop */ }) return true diff --git a/lib/util/devutil.js b/lib/util/devutil.js new file mode 100644 index 00000000..1f13541d --- /dev/null +++ b/lib/util/devutil.js @@ -0,0 +1,33 @@ +module.exports = { + makeIdentity: function(serial, properties) { + var model = properties['ro.product.model'] + , brand = properties['ro.product.brand'] + , manufacturer = properties['ro.product.manufacturer'] + , version = properties['ro.build.version.release'] + , sdk = +properties['ro.build.version.sdk'] + , abi = properties['ro.product.cpu.abi'] + + // Remove brand prefix for consistency + if (model.substr(0, brand.length) === brand) { + model = model.substr(brand.length) + } + + // Remove manufacturer prefix for consistency + if (model.substr(0, manufacturer.length) === manufacturer) { + model = model.substr(manufacturer.length) + } + + // Clean up remaining model name + model = model.replace(/[_ ]/g, '') + + return { + platform: 'android' + , serial: serial + , manufacturer: manufacturer + , model: model + , version: version + , sdk: sdk + , abi: abi + } + } +} diff --git a/lib/util/wireutil.js b/lib/util/wireutil.js new file mode 100644 index 00000000..59a14d57 --- /dev/null +++ b/lib/util/wireutil.js @@ -0,0 +1,95 @@ +var semver = require('semver') +var minimatch = require('minimatch') +var uuid = require('node-uuid') + +module.exports = function(wire) { + var wireutil = { + global: '*ALL' + , makePrivateChannel: function() { + return uuid.v4(null, new Buffer(16)).toString('base64') + } + , toDeviceStatus: function(type) { + return wire.DeviceStatus[{ + device: 'ONLINE' + , emulator: 'ONLINE' + , unauthorized: 'UNAUTHORIZED' + , offline: 'OFFLINE' + , absent: 'ABSENT' + }[type]] + } + , toDeviceType: function(type) { + return wire.DeviceStatus[{ + device: 'PHYSICAL' + , emulator: 'VIRTUAL' + }[type]] + } + , matchesRequirements: function(capabilities, requirements) { + return requirements.every(function(req) { + var capability = capabilities[req.name] + + if (!capability) { + return false + } + + switch (req.type) { + case wire.RequirementType.SEMVER: + if (!semver.satisfies(capability, req.value)) { + return false + } + break + case wire.RequirementType.GLOB: + if (!minimatch(capability, req.value)) { + return false + } + break + case wire.RequirementType.EXACT: + if (capability !== req.value) { + return false + } + break + default: + return false + } + }) + } + , envelope: function(type, message) { + return new wire.Envelope(type, message.encode()).encodeNB() + } + , makeJoinGroupMessage: function(serial) { + var message = new wire.JoinGroupMessage(serial) + return wireutil.envelope(wire.MessageType.JOIN_GROUP, message) + } + , makeLeaveGroupMessage: function(serial) { + var message = new wire.LeaveGroupMessage(serial) + return wireutil.envelope(wire.MessageType.LEAVE_GROUP, message) + } + , makeDevicePokeMessage: function(serial, channel) { + var message = new wire.DevicePokeMessage(serial, channel) + return wireutil.envelope(wire.MessageType.DEVICE_POKE, message) + } + , makeDevicePropertiesMessage: function(serial, properties) { + var message = new wire.DevicePropertiesMessage( + serial + , Object.keys(properties).map(function(key) { + return new wire.DeviceProperty(key, properties[key]) + }) + ) + + return wireutil.envelope(wire.MessageType.DEVICE_PROPERTIES, message) + } + , makeDeviceStatusMessage: function(serial, type) { + var message = new wire.DeviceStatusMessage( + serial + , wireutil.toDeviceStatus(type) + ) + + return wireutil.envelope(wire.MessageType.DEVICE_STATUS, message) + } + , makeProbeMessage: function() { + var message = new wire.ProbeMessage() + return wireutil.envelope(wire.MessageType.PROBE, message) + } + } + + return wireutil +} diff --git a/lib/wire/channelmanager.js b/lib/wire/channelmanager.js new file mode 100644 index 00000000..f7c15c8a --- /dev/null +++ b/lib/wire/channelmanager.js @@ -0,0 +1,51 @@ +var events = require('events') +var util = require('util') + +function ChannelManager() { + events.EventEmitter.call(this) + this.channels = Object.create(null) +} + +util.inherits(ChannelManager, events.EventEmitter) + +ChannelManager.prototype.register = function(id, timeout) { + this.channels[id] = { + timeout: timeout + , lastActivity: Date.now() + , timer: null + } + + // Set timer with initial check + this.check(id) +} + +ChannelManager.prototype.unregister = function(id) { + var channel = this.channels[id] + delete this.channels[id] + clearInterval(channel.timer) +} + +ChannelManager.prototype.keepalive = function(id) { + var channel = this.channels[id] + if (channel) { + channel.lastActivity = Date.now() + } +} + +ChannelManager.prototype.check = function(id) { + var channel = this.channels[id] + , inactivePeriod = Date.now() - channel.lastActivity + + if (inactivePeriod >= channel.timeout) { + this.unregister(id) + this.emit('timeout', id) + } + else if (channel.timeout < Infinity) { + channel.timer = setTimeout( + this.check.bind(this, id) + , channel.timeout - inactivePeriod + ) + } +} + +module.exports = ChannelManager diff --git a/lib/wire/index.js b/lib/wire/index.js new file mode 100644 index 00000000..d82bc80d --- /dev/null +++ b/lib/wire/index.js @@ -0,0 +1,6 @@ +var path = require('path') + +var ProtoBuf = require('protobufjs') + +module.exports = + ProtoBuf.loadProtoFile(path.join(__dirname, 'wire.proto')).build() diff --git a/lib/wire/wire.proto b/lib/wire/wire.proto new file mode 100644 index 00000000..59703383 --- /dev/null +++ b/lib/wire/wire.proto @@ -0,0 +1,86 @@ +// Message wrapper + +enum MessageType { + DEVICE_POKE = 1; + DEVICE_STATUS = 2; + DEVICE_TYPE = 3; + DEVICE_PROPERTIES = 4; + GROUP = 5; + JOIN_GROUP = 6; + LEAVE_GROUP = 7; + PROBE = 8; +} + +message Envelope { + required MessageType type = 1; + required bytes message = 2; +} + +// Introductions + +message DevicePokeMessage { + required string serial = 1; + required string channel = 2; +} + +message ProbeMessage { +} + +enum DeviceStatus { + ABSENT = 1; + OFFLINE = 2; + UNAUTHORIZED = 3; + ONLINE = 4; +} + +message DeviceStatusMessage { + required string serial = 1; + required DeviceStatus status = 2; +} + +enum DeviceType { + PHYSICAL = 1; + VIRTUAL = 2; +} + +message DeviceTypeMessage { + required string serial = 1; + required DeviceType type = 2; +} + +message DeviceProperty { + required string name = 1; + required string value = 2; +} + +message DevicePropertiesMessage { + required string serial = 1; + repeated DeviceProperty properties = 2; +} + +// Grouping + +enum RequirementType { + SEMVER = 1; + GLOB = 2; + EXACT = 3; +} + +message DeviceRequirement { + required string name = 1; + required string value = 2; + required RequirementType type = 3; +} + +message GroupMessage { + required string group = 1; + repeated DeviceRequirement requirements = 2; +} + +message JoinGroupMessage { + required string serial = 1; +} + +message LeaveGroupMessage { + required string serial = 1; +}