diff --git a/lib/cli.js b/lib/cli.js index ea667df5..fbb4665d 100644 --- a/lib/cli.js +++ b/lib/cli.js @@ -46,6 +46,10 @@ program .option('-r, --storage-url ' , 'URL to storage client' , String) + .option('--heartbeat-interval ' + , 'heartbeat interval' + , Number + , 10000) .action(function() { var serials = cliutil.allUnknownArgs(arguments) , options = cliutil.lastArg(arguments) @@ -63,6 +67,7 @@ program require('./roles/provider')({ name: options.name , killTimeout: 10000 + , heartbeatInterval: options.heartbeatInterval , ports: cliutil.range(options.minPort, options.maxPort) , filter: function(device) { return serials.length === 0 || serials.indexOf(device.id) !== -1 @@ -106,10 +111,6 @@ program , 'public ip for global access' , String , ip()) - .option('--heartbeat-interval ' - , 'heartbeat interval' - , Number - , 10000) .option('-t, --group-timeout ' , 'group timeout' , Number @@ -143,7 +144,6 @@ program sub: options.connectSub , push: options.connectPush } - , heartbeatInterval: options.heartbeatInterval , groupTimeout: options.groupTimeout * 1000 // change to ms , storageUrl: options.storageUrl }) diff --git a/lib/db/api.js b/lib/db/api.js index d1e3216c..04972c49 100644 --- a/lib/db/api.js +++ b/lib/db/api.js @@ -208,11 +208,14 @@ dbapi.loadDevice = function(serial) { return db.run(r.table('devices').get(serial)) } -dbapi.updateDeviceHeartbeat = function(serial) { +dbapi.updateProviderHeartbeat = function(channel) { return db.run( - r.table('devices').get(serial).update({ - lastHeartbeatAt: r.now() - }) + r.table('devices').getAll(channel, { + index: 'providerChannel' + }) + .update({ + lastHeartbeatAt: r.now() + }) , { noreply: true , durability: 'soft' diff --git a/lib/db/tables.js b/lib/db/tables.js index 424ebb43..2e7d1ae7 100644 --- a/lib/db/tables.js +++ b/lib/db/tables.js @@ -15,6 +15,9 @@ module.exports = { ) } , lastHeartbeatAt: null + , providerChannel: function(device) { + return device('provider')('channel') + } } } , logs: { diff --git a/lib/roles/device.js b/lib/roles/device.js index cace32b8..58dad0d3 100644 --- a/lib/roles/device.js +++ b/lib/roles/device.js @@ -17,7 +17,6 @@ module.exports = function(options) { log.info('Preparing device') return syrup.serial() .dependency(require('./device/plugins/solo')) - .dependency(require('./device/plugins/heartbeat')) .dependency(require('./device/plugins/display')) .dependency(require('./device/plugins/screenshot')) .dependency(require('./device/plugins/http')) diff --git a/lib/roles/device/plugins/heartbeat.js b/lib/roles/device/plugins/heartbeat.js deleted file mode 100644 index 29c6f00f..00000000 --- a/lib/roles/device/plugins/heartbeat.js +++ /dev/null @@ -1,20 +0,0 @@ -var syrup = require('syrup') - -var wire = require('../../../wire') -var wireutil = require('../../../wire/util') - -module.exports = syrup.serial() - .dependency(require('../support/push')) - .define(function(options, push) { - function heartbeat() { - push.send([ - wireutil.heartbeat - , wireutil.envelope(new wire.DeviceHeartbeatMessage( - options.serial - )) - ]) - setTimeout(heartbeat, options.heartbeatInterval) - } - - heartbeat() - }) diff --git a/lib/roles/processor.js b/lib/roles/processor.js index f786e016..e283a398 100644 --- a/lib/roles/processor.js +++ b/lib/roles/processor.js @@ -33,6 +33,10 @@ module.exports = function(options) { }) devDealer.on('message', wirerouter() + // Provider messages + .on(wire.ProviderHeartbeatMessage, function(channel, message) { + dbapi.updateProviderHeartbeat(message.channel) + }) // Initial device message .on(wire.DevicePresentMessage, function(channel, message, data) { dbapi.saveDevice(message.serial, message) @@ -66,9 +70,6 @@ module.exports = function(options) { }) }) // Worker messages - .on(wire.DeviceHeartbeatMessage, function(channel, message) { - dbapi.updateDeviceHeartbeat(message.serial) - }) .on(wire.JoinGroupMessage, function(channel, message, data) { dbapi.setDeviceOwner(message.serial, message.owner) appDealer.send([channel, data]) diff --git a/lib/roles/provider.js b/lib/roles/provider.js index be4f05a4..fd0b3311 100644 --- a/lib/roles/provider.js +++ b/lib/roles/provider.js @@ -362,6 +362,19 @@ module.exports = function(options) { lifecycle.share('Tracker', tracker) }) + // This keeps the devices "present" in the database. It relies on the + // provider channel changing on every run so that we never match old + // devices. + ;(function heartbeat() { + push.send([ + wireutil.heartbeat + , wireutil.envelope(new wire.ProviderHeartbeatMessage( + solo + )) + ]) + setTimeout(heartbeat, options.heartbeatInterval) + })() + lifecycle.observe(function() { clearTimeout(totalsTimer) diff --git a/lib/wire/wire.proto b/lib/wire/wire.proto index d6df28b5..a54f02e7 100644 --- a/lib/wire/wire.proto +++ b/lib/wire/wire.proto @@ -3,7 +3,6 @@ enum MessageType { CopyMessage = 33; DeviceAbsentMessage = 1; - DeviceHeartbeatMessage = 28; DeviceIdentityMessage = 2; DeviceLogcatEntryMessage = 3; DeviceLogMessage = 4; @@ -51,6 +50,7 @@ enum MessageType { RotationEvent = 48; StoreOpenMessage = 49; ScreenCaptureMessage = 50; + ProviderHeartbeatMessage = 51; } message Envelope { @@ -74,12 +74,6 @@ message TransactionDoneMessage { optional string body = 5; } -// Heartbeat - -message DeviceHeartbeatMessage { - required string serial = 1; -} - // Logging message DeviceLogMessage { @@ -99,6 +93,10 @@ message ProviderMessage { required string name = 2; } +message ProviderHeartbeatMessage { + required string channel = 1; +} + message DevicePresentMessage { required string serial = 1; required DeviceStatus status = 2;