diff --git a/lib/cli.js b/lib/cli.js index ec6a78a8..4f1d576f 100644 --- a/lib/cli.js +++ b/lib/cli.js @@ -50,7 +50,7 @@ program .option('--heartbeat-interval ' , 'heartbeat interval' , Number - , 10000) + , 5000) .option('--adb-host ' , 'ADB host (defaults to 127.0.0.1)' , String @@ -80,7 +80,6 @@ program require('./units/provider')({ name: options.name , killTimeout: 10000 - , heartbeatInterval: options.heartbeatInterval , ports: cliutil.range(options.minPort, options.maxPort) , filter: function(device) { return serials.length === 0 || serials.indexOf(device.id) !== -1 @@ -100,6 +99,7 @@ program , '--adb-host', options.adbHost , '--adb-port', options.adbPort , '--screen-ws-url-pattern', options.screenWsUrlPattern + , '--heartbeat-interval', options.heartbeatInterval ]) } , endpoints: { @@ -152,6 +152,10 @@ program , 'screen WebSocket URL pattern' , String , 'ws://${publicIp}:${publicPort}') + .option('--heartbeat-interval ' + , 'heartbeat interval' + , Number + , 5000) .action(function(serial, options) { if (!options.connectSub) { this.missingArgument('--connect-sub') @@ -187,6 +191,7 @@ program , screenWsUrlPattern: options.screenWsUrlPattern , screenPort: options.screenPort , connectPort: options.connectPort + , heartbeatInterval: options.heartbeatInterval }) }) @@ -222,21 +227,20 @@ program .option('-p, --connect-push ' , 'push endpoint' , cliutil.list) + .option('-s, --connect-sub ' + , 'sub endpoint' + , cliutil.list) .option('-t, --heartbeat-timeout ' , 'consider devices with heartbeat older than this value dead' , Number - , 20000) - .option('-i, --reap-interval ' - , 'reap interval' - , Number , 10000) .action(function(name, options) { require('./units/reaper')({ name: name , heartbeatTimeout: options.heartbeatTimeout - , reapInterval: options.reapInterval , endpoints: { push: options.connectPush + , sub: options.connectSub } }) }) @@ -925,6 +929,7 @@ program , procutil.fork(__filename, [ 'reaper', 'reaper001' , '--connect-push', options.bindDevPull + , '--connect-sub', options.bindAppPub ]) // provider diff --git a/lib/db/api.js b/lib/db/api.js index 14075293..6b665104 100644 --- a/lib/db/api.js +++ b/lib/db/api.js @@ -128,7 +128,6 @@ dbapi.saveDevice = function(serial, device) { , status: device.status , ready: false , statusChangedAt: r.now() - , lastHeartbeatAt: r.now() , reverseForwards: [] } return db.run(r.table('devices').get(serial).update(data)) @@ -164,8 +163,6 @@ dbapi.unsetDeviceOwner = function(serial) { dbapi.setDeviceAbsent = function(serial) { return db.run(r.table('devices').get(serial).update({ present: false - , ready: false - , lastHeartbeatAt: null })) } @@ -263,33 +260,14 @@ dbapi.loadDevices = function() { return db.run(r.table('devices')) } +dbapi.loadPresentDevices = function() { + return db.run(r.table('devices').getAll(true, { + index: 'present' + })) +} + dbapi.loadDevice = function(serial) { return db.run(r.table('devices').get(serial)) } -dbapi.updateProviderHeartbeat = function(channel) { - return db.run( - r.table('devices').getAll(channel, { - index: 'providerChannel' - }) - .update({ - lastHeartbeatAt: r.now() - }) - , { - noreply: true - , durability: 'soft' - } - ) -} - -dbapi.getDeadDevices = function(timeout) { - return db.run( - r.table('devices') - .between(null, r.now().sub(timeout / 1000), { - index: 'lastHeartbeatAt' - }) - .pluck('serial') - ) -} - module.exports = dbapi diff --git a/lib/db/tables.js b/lib/db/tables.js index 8c0d3133..fa843c12 100644 --- a/lib/db/tables.js +++ b/lib/db/tables.js @@ -26,7 +26,7 @@ module.exports = { ) } } - , lastHeartbeatAt: null + , present: null , providerChannel: { indexFunction: function(device) { return device('provider')('channel') diff --git a/lib/units/device/index.js b/lib/units/device/index.js index d2a83b91..e22ab6ed 100644 --- a/lib/units/device/index.js +++ b/lib/units/device/index.js @@ -16,6 +16,7 @@ module.exports = function(options) { var log = logger.createLogger('device') log.info('Preparing device') return syrup.serial() + .dependency(require('./plugins/heartbeat')) .dependency(require('./plugins/solo')) .dependency(require('./plugins/screen/stream')) .dependency(require('./plugins/screen/capture')) @@ -38,7 +39,7 @@ module.exports = function(options) { .dependency(require('./plugins/ringer')) .dependency(require('./plugins/wifi')) .dependency(require('./plugins/sd')) - .define(function(options, solo) { + .define(function(options, heartbeat, solo) { if (process.send) { // Only if we have a parent process process.send('ready') diff --git a/lib/units/device/plugins/heartbeat.js b/lib/units/device/plugins/heartbeat.js new file mode 100644 index 00000000..0e585f8c --- /dev/null +++ b/lib/units/device/plugins/heartbeat.js @@ -0,0 +1,24 @@ +var syrup = require('stf-syrup') + +var lifecycle = require('../../../util/lifecycle') +var wire = require('../../../wire') +var wireutil = require('../../../wire/util') + +module.exports = syrup.serial() + .dependency(require('../support/push')) + .define(function(options, push) { + function beat() { + push.send([ + wireutil.global + , wireutil.envelope(new wire.DeviceHeartbeatMessage( + options.serial + )) + ]) + } + + var timer = setInterval(beat, options.heartbeatInterval) + + lifecycle.observe(function() { + clearInterval(timer) + }) + }) diff --git a/lib/units/processor/index.js b/lib/units/processor/index.js index a82bbe11..216fd44d 100644 --- a/lib/units/processor/index.js +++ b/lib/units/processor/index.js @@ -53,10 +53,6 @@ module.exports = function(options) { }) devDealer.on('message', wirerouter() - // Provider messages - .on(wire.ProviderHeartbeatMessage, function(channel, message) { - dbapi.updateProviderHeartbeat(message.channel) - }) // Initial device message .on(wire.DevicePresentMessage, function(channel, message, data) { dbapi.saveDevice(message.serial, message) @@ -79,6 +75,9 @@ module.exports = function(options) { dbapi.saveDeviceStatus(message.serial, message.status) appDealer.send([channel, data]) }) + .on(wire.DeviceHeartbeatMessage, function(channel, message, data) { + appDealer.send([channel, data]) + }) // Worker initialized .on(wire.DevicePokeMessage, function(channel, message) { dbapi.setDeviceChannel(message.serial, message.channel) diff --git a/lib/units/provider/index.js b/lib/units/provider/index.js index f8db0dda..aedef773 100644 --- a/lib/units/provider/index.js +++ b/lib/units/provider/index.js @@ -126,6 +126,7 @@ module.exports = function(options) { return false } if (options.filter && !options.filter(device)) { + log.info('Filtered out device "%s"', device.id) return false } return listener(device) @@ -407,19 +408,6 @@ module.exports = function(options) { lifecycle.share('Tracker', tracker) }) - // This keeps the devices "present" in the database. It relies on the - // provider channel changing on every run so that we never match old - // devices. - ;(function heartbeat() { - push.send([ - wireutil.heartbeat - , wireutil.envelope(new wire.ProviderHeartbeatMessage( - solo - )) - ]) - setTimeout(heartbeat, options.heartbeatInterval) - })() - lifecycle.observe(function() { [push, sub].forEach(function(sock) { try { diff --git a/lib/units/reaper/index.js b/lib/units/reaper/index.js index a285cb44..5a9047ce 100644 --- a/lib/units/reaper/index.js +++ b/lib/units/reaper/index.js @@ -4,18 +4,43 @@ var zmq = require('zmq') var logger = require('../../util/logger') var wire = require('../../wire') var wireutil = require('../../wire/util') +var wirerouter = require('../../wire/router') var dbapi = require('../../db/api') var lifecycle = require('../../util/lifecycle') var srv = require('../../util/srv') +var TtlSet = require('./util/ttlset') + module.exports = function(options) { var log = logger.createLogger('reaper') - , timer + var ttlset = new TtlSet(options.heartbeatTimeout) if (options.name) { logger.setGlobalIdentifier(options.name) } + // Input + var sub = zmq.socket('sub') + Promise.map(options.endpoints.sub, function(endpoint) { + return srv.resolve(endpoint).then(function(records) { + return srv.attempt(records, function(record) { + log.info('Receiving input from "%s"', record.url) + sub.connect(record.url) + return Promise.resolve(true) + }) + }) + }) + .catch(function(err) { + log.fatal('Unable to connect to sub endpoint', err) + lifecycle.fatal() + }) + + // Establish always-on channels + ;[wireutil.global].forEach(function(channel) { + log.info('Subscribing to permanent channel "%s"', channel) + sub.subscribe(channel) + }) + // Output var push = zmq.socket('push') Promise.map(options.endpoints.push, function(endpoint) { @@ -32,39 +57,55 @@ module.exports = function(options) { lifecycle.fatal() }) - function reap() { - dbapi.getDeadDevices(options.heartbeatTimeout) - .then(function(cursor) { - return Promise.promisify(cursor.toArray, cursor)() - .then(function(list) { - list.forEach(function(device) { - log.info('Reaping device "%s"', device.serial) - push.send([ - wireutil.global - , wireutil.envelope(new wire.DeviceAbsentMessage( - device.serial - )) - ]) - }) - }) - }) - .catch(function(err) { - log.error('Failed to load device list: ', err.message, err.stack) - lifecycle.fatal() - }) - } + sub.on('message', wirerouter() + .on(wire.DevicePresentMessage, function(channel, message) { + ttlset.bump(message.serial, Date.now()) + }) + .on(wire.DeviceHeartbeatMessage, function(channel, message) { + ttlset.bump(message.serial, Date.now()) + }) + .on(wire.DeviceAbsentMessage, function(channel, message) { + ttlset.drop(message.serial) + }) + .handler()) + + ttlset.on('timeout', function(serial) { + log.info('Reaping device "%s" due to heartbeat timeout', serial) + push.send([ + wireutil.global + , wireutil.envelope(new wire.DeviceAbsentMessage( + serial + )) + ]) + }) + + dbapi.loadPresentDevices() + .then(function(cursor) { + return Promise.promisify(cursor.toArray, cursor)() + .then(function(list) { + var now = Date.now() + list.forEach(function(device) { + ttlset.bump(device.serial, now) + }) + }) + }) + .catch(function(err) { + log.error('Failed to load device list: ', err.stack) + lifecycle.fatal() + }) - timer = setInterval(reap, options.reapInterval) log.info('Reaping devices with no heartbeat') lifecycle.observe(function() { - clearTimeout(timer) + [push, sub].forEach(function(sock) { + try { + sock.close() + } + catch (err) { + // No-op + } + }) - try { - push.close() - } - catch (err) { - // No-op - } + ttlset.stop() }) } diff --git a/lib/units/reaper/util/ttlset.js b/lib/units/reaper/util/ttlset.js new file mode 100644 index 00000000..d6fd3917 --- /dev/null +++ b/lib/units/reaper/util/ttlset.js @@ -0,0 +1,108 @@ +var util = require('util') + +var EventEmitter = require('eventemitter3').EventEmitter + +function TtlItem(value) { + this.next = null + this.prev = null + this.time = null + this.value = value +} + +function TtlSet(ttl) { + this.head = null + this.tail = null + this.mapping = Object.create(null) + this.ttl = ttl + this.timer = null +} + +util.inherits(TtlSet, EventEmitter) + +TtlSet.prototype.bump = function(value, time) { + var item = this._remove(this.mapping[value]) || this._create(value) + + item.time = time || Date.now() + item.prev = this.tail + + this.tail = item + + if (!this.head) { + this.head = item + this._scheduleCheck() + } +} + +TtlSet.prototype.drop = function(value) { + this._drop(this.mapping[value]) +} + +TtlSet.prototype.stop = function() { + clearTimeout(this.timer) +} + +TtlSet.prototype._scheduleCheck = function() { + clearTimeout(this.timer) + if (this.head) { + var delay = Math.max(0, this.ttl - (Date.now() - this.head.time)) + this.timer = setTimeout(this._check.bind(this), delay) + } +} + +TtlSet.prototype._check = function() { + var now = Date.now() + + var item + while ((item = this.head)) { + if (now - item.time > this.ttl) { + this._drop(item) + this.emit('timeout', item.value) + } + else { + break + } + } + + this._scheduleCheck() +} + +TtlSet.prototype._create = function(value) { + var item = new TtlItem(value) + this.mapping[value] = item + return item +} + +TtlSet.prototype._drop = function(item) { + if (item) { + this._remove(item) + delete this.mapping[item.value] + } +} + +TtlSet.prototype._remove = function(item) { + if (!item) { + return null + } + + if (item.prev) { + item.prev.next = item.next + } + + if (item.next) { + item.next.prev = item.prev + } + + if (item === this.head) { + this.head = item.next + } + + if (item === this.tail) { + this.tail = item.prev + } + + item.next = item.prev = null + + return item +} + +module.exports = TtlSet diff --git a/lib/wire/wire.proto b/lib/wire/wire.proto index 53e162f9..2d0e3825 100644 --- a/lib/wire/wire.proto +++ b/lib/wire/wire.proto @@ -56,7 +56,7 @@ enum MessageType { RotationEvent = 48; StoreOpenMessage = 49; ScreenCaptureMessage = 50; - ProviderHeartbeatMessage = 51; + DeviceHeartbeatMessage = 73; RebootMessage = 52; ConnectStartMessage = 53; ConnectStopMessage = 54; @@ -113,8 +113,8 @@ message ProviderMessage { required string name = 2; } -message ProviderHeartbeatMessage { - required string channel = 1; +message DeviceHeartbeatMessage { + required string serial = 1; } message DevicePresentMessage {