diff --git a/lib/cli.js b/lib/cli.js index f6272dce..5c57efbf 100644 --- a/lib/cli.js +++ b/lib/cli.js @@ -95,6 +95,10 @@ program , 'public ip for global access' , String , ip()) + .option('--heartbeat-interval ' + , 'heartbeat interval' + , Number + , 10000) .action(function(serial, options) { if (!options.connectSub) { this.missingArgument('--connect-sub') @@ -118,6 +122,7 @@ program sub: options.connectSub , push: options.connectPush } + , heartbeatInterval: options.heartbeatInterval }) }) @@ -147,6 +152,31 @@ program }) }) +program + .command('reaper ') + .description('start reaper') + .option('-p, --connect-push ' + , 'push endpoint' + , cliutil.list) + .option('-t, --heartbeat-timeout ' + , 'consider devices with heartbeat older than this value dead' + , Number + , 20000) + .option('-i, --reap-interval ' + , 'reap interval' + , Number + , 10000) + .action(function(name, options) { + require('./roles/reaper')({ + name: name + , heartbeatTimeout: options.heartbeatTimeout + , reapInterval: options.reapInterval + , endpoints: { + push: options.connectPush + } + }) + }) + program .command('triproxy ') .description('start triproxy') @@ -471,6 +501,15 @@ program log.error('processor 002 died', err.stack) }) + // reaper one + procutil.fork(__filename, [ + 'reaper', 'reaper001' + , '--connect-push', options.bindDevPull + ]) + .catch(function(err) { + log.error('reaper 001 died', err.stack) + }) + // provider procutil.fork(__filename, [ 'provider' diff --git a/lib/db/api.js b/lib/db/api.js index f09207f7..81892bdb 100644 --- a/lib/db/api.js +++ b/lib/db/api.js @@ -7,6 +7,10 @@ var wireutil = require('../wire/util') var dbapi = Object.create(null) +dbapi.close = function(options) { + return db.close(options) +} + dbapi.saveUserAfterLogin = function(user) { return db.run(r.table('users').get(user.email).update({ name: user.name @@ -56,6 +60,7 @@ dbapi.saveDevice = function(serial, device) { , status: device.status , statusChangedAt: r.now() , createdAt: r.now() + , lastHeartbeatAt: r.now() } , { upsert: true @@ -84,6 +89,7 @@ dbapi.unsetDeviceOwner = function(serial, owner) { dbapi.setDeviceAbsent = function(serial) { return db.run(r.table('devices').get(serial).update({ present: false + , lastHeartbeatAt: null })) } @@ -114,4 +120,26 @@ dbapi.loadDevice = function(serial) { return db.run(r.table('devices').get(serial)) } +dbapi.updateDeviceHeartbeat = function(serial) { + return db.run( + r.table('devices').get(serial).update({ + lastHeartbeatAt: r.now() + }) + , { + noreply: true + , durability: 'soft' + } + ) +} + +dbapi.getDeadDevices = function(timeout) { + return db.run( + r.table('devices') + .between(null, r.now().sub(timeout / 1000), { + index: 'lastHeartbeatAt' + }) + .pluck('serial') + ) +} + module.exports = dbapi diff --git a/lib/db/index.js b/lib/db/index.js index 5a77c072..4b01ae6c 100644 --- a/lib/db/index.js +++ b/lib/db/index.js @@ -33,8 +33,15 @@ db.connect = (function() { } })() +// Close connection, we don't really care if it hasn't been created yet or not +db.close = function() { + return db.connect().then(function(conn) { + return rutil.close(conn) + }) +} + // Small utility for running queries without having to acquire a connection -db.run = function(q) { +db.run = function(q, options) { return db.connect().then(function(conn) { return rutil.run(conn, q) }) diff --git a/lib/db/tables.js b/lib/db/tables.js index de66ac90..bf046061 100644 --- a/lib/db/tables.js +++ b/lib/db/tables.js @@ -1,3 +1,5 @@ +var r = require('rethinkdb') + module.exports = { users: { primaryKey: 'email' @@ -8,6 +10,7 @@ module.exports = { ownerEmail: function(device) { return device('owner')('email') } + , lastHeartbeatAt: null } } , logs: { diff --git a/lib/roles/device.js b/lib/roles/device.js index 4bc54667..1cb54c1e 100644 --- a/lib/roles/device.js +++ b/lib/roles/device.js @@ -558,6 +558,16 @@ module.exports = function(options) { owner = null } + function heartbeat() { + push.send([ + wireutil.heartbeat + , wireutil.envelope(new wire.DeviceHeartbeatMessage( + options.serial + )) + ]) + setTimeout(heartbeat, options.heartbeatInterval) + } + function selfDestruct() { process.exit(1) } @@ -580,4 +590,6 @@ module.exports = function(options) { process.on('SIGTERM', function() { gracefullyExit() }) + + heartbeat() } diff --git a/lib/roles/processor.js b/lib/roles/processor.js index 94cbf0be..0d768944 100644 --- a/lib/roles/processor.js +++ b/lib/roles/processor.js @@ -65,6 +65,9 @@ module.exports = function(options) { }) }) // Worker messages + .on(wire.DeviceHeartbeatMessage, function(channel, message) { + dbapi.updateDeviceHeartbeat(message.serial) + }) .on(wire.JoinGroupMessage, function(channel, message, data) { dbapi.setDeviceOwner(message.serial, message.owner) appDealer.send([channel, data]) diff --git a/lib/roles/reaper.js b/lib/roles/reaper.js new file mode 100644 index 00000000..11cdfead --- /dev/null +++ b/lib/roles/reaper.js @@ -0,0 +1,73 @@ +var util = require('util') + +var Promise = require('bluebird') +var zmq = require('zmq') + +var logger = require('../util/logger') +var wire = require('../wire') +var wireutil = require('../wire/util') +var dbapi = require('../db/api') + +module.exports = function(options) { + var log = logger.createLogger('reaper') + , quit = Promise.defer() + , timer + + if (options.name) { + logger.setGlobalIdentifier(options.name) + } + + // Output + var push = zmq.socket('push') + options.endpoints.push.forEach(function(endpoint) { + log.info('Sending output to %s', endpoint) + push.connect(endpoint) + }) + + function reap() { + dbapi.getDeadDevices(options.heartbeatTimeout) + .then(function(cursor) { + return Promise.promisify(cursor.toArray, cursor)() + .then(function(list) { + list.forEach(function(device) { + log.info('Reaping device "%s"', device.serial) + push.send([ + wireutil.global + , wireutil.envelope(new wire.DeviceAbsentMessage( + device.serial + )) + ]) + }) + }) + }) + .catch(function(err) { + log.error('Failed to load device list: ', err.message, err.stack) + quit.reject(err) + }) + } + + timer = setInterval(reap, options.reapInterval) + + process.on('SIGINT', function() { + quit.resolve() + }) + + process.on('SIGTERM', function() { + quit.resolve() + }) + + quit.promise.finally(function() { + clearInterval(timer) + dbapi.close() + .timeout(10000) + .then(function() { + log.info('Quit after cleanup') + }) + .catch(Promise.TimeoutError, function() { + log.error('Cleanup operation timed out, closing unsafely') + process.exit(1) + }) + }) + + log.info('Reaping devices with no heartbeat') +} diff --git a/lib/util/rutil.js b/lib/util/rutil.js index a3d5c990..79544d7d 100644 --- a/lib/util/rutil.js +++ b/lib/util/rutil.js @@ -10,8 +10,21 @@ module.exports.connect = function(options) { return resolver.promise } -module.exports.run = function(conn, q) { +module.exports.close = function(conn, options) { var resolver = Promise.defer() - q.run(conn, resolver.callback) + if (!options) { + options = {} + } + conn.close(options, resolver.callback) + return resolver.promise +} + +module.exports.run = function(conn, q, options) { + var resolver = Promise.defer() + if (!options) { + options = {} + } + options.connection = conn + q.run(options, resolver.callback) return resolver.promise } diff --git a/lib/wire/util.js b/lib/wire/util.js index 53cc1976..d755482f 100644 --- a/lib/wire/util.js +++ b/lib/wire/util.js @@ -5,6 +5,7 @@ var wire = require('./') var wireutil = { global: '*ALL' , log: '*LOG' +, heartbeat: '*HB' , makePrivateChannel: function() { return uuid.v4(null, new Buffer(16)).toString('base64') } diff --git a/lib/wire/wire.proto b/lib/wire/wire.proto index ac40d479..8866ea77 100644 --- a/lib/wire/wire.proto +++ b/lib/wire/wire.proto @@ -2,6 +2,7 @@ enum MessageType { DeviceAbsentMessage = 1; + DeviceHeartbeatMessage = 28; DeviceIdentityMessage = 2; DeviceLogcatEntryMessage = 3; DeviceLogMessage = 4; @@ -49,6 +50,12 @@ message TransactionDoneMessage { optional string data = 4; } +// Heartbeat + +message DeviceHeartbeatMessage { + required string serial = 1; +} + // Logging message DeviceLogMessage {