1
0
Fork 0
mirror of https://github.com/openstf/stf synced 2025-10-04 02:09:32 +02:00

Reap dead devices using timeouts. Previously, if a provider died without being able to clean up properly, the device would stay as a ghost.

This commit is contained in:
Simo Kinnunen 2014-02-25 20:45:29 +09:00
parent f24e777961
commit 4896ca406e
10 changed files with 189 additions and 3 deletions

View file

@ -95,6 +95,10 @@ program
, 'public ip for global access' , 'public ip for global access'
, String , String
, ip()) , ip())
.option('--heartbeat-interval <ms>'
, 'heartbeat interval'
, Number
, 10000)
.action(function(serial, options) { .action(function(serial, options) {
if (!options.connectSub) { if (!options.connectSub) {
this.missingArgument('--connect-sub') this.missingArgument('--connect-sub')
@ -118,6 +122,7 @@ program
sub: options.connectSub sub: options.connectSub
, push: options.connectPush , push: options.connectPush
} }
, heartbeatInterval: options.heartbeatInterval
}) })
}) })
@ -147,6 +152,31 @@ program
}) })
}) })
program
.command('reaper <name>')
.description('start reaper')
.option('-p, --connect-push <endpoint>'
, 'push endpoint'
, cliutil.list)
.option('-t, --heartbeat-timeout <ms>'
, 'consider devices with heartbeat older than this value dead'
, Number
, 20000)
.option('-i, --reap-interval <ms>'
, 'reap interval'
, Number
, 10000)
.action(function(name, options) {
require('./roles/reaper')({
name: name
, heartbeatTimeout: options.heartbeatTimeout
, reapInterval: options.reapInterval
, endpoints: {
push: options.connectPush
}
})
})
program program
.command('triproxy <name>') .command('triproxy <name>')
.description('start triproxy') .description('start triproxy')
@ -471,6 +501,15 @@ program
log.error('processor 002 died', err.stack) log.error('processor 002 died', err.stack)
}) })
// reaper one
procutil.fork(__filename, [
'reaper', 'reaper001'
, '--connect-push', options.bindDevPull
])
.catch(function(err) {
log.error('reaper 001 died', err.stack)
})
// provider // provider
procutil.fork(__filename, [ procutil.fork(__filename, [
'provider' 'provider'

View file

@ -7,6 +7,10 @@ var wireutil = require('../wire/util')
var dbapi = Object.create(null) var dbapi = Object.create(null)
dbapi.close = function(options) {
return db.close(options)
}
dbapi.saveUserAfterLogin = function(user) { dbapi.saveUserAfterLogin = function(user) {
return db.run(r.table('users').get(user.email).update({ return db.run(r.table('users').get(user.email).update({
name: user.name name: user.name
@ -56,6 +60,7 @@ dbapi.saveDevice = function(serial, device) {
, status: device.status , status: device.status
, statusChangedAt: r.now() , statusChangedAt: r.now()
, createdAt: r.now() , createdAt: r.now()
, lastHeartbeatAt: r.now()
} }
, { , {
upsert: true upsert: true
@ -84,6 +89,7 @@ dbapi.unsetDeviceOwner = function(serial, owner) {
dbapi.setDeviceAbsent = function(serial) { dbapi.setDeviceAbsent = function(serial) {
return db.run(r.table('devices').get(serial).update({ return db.run(r.table('devices').get(serial).update({
present: false present: false
, lastHeartbeatAt: null
})) }))
} }
@ -114,4 +120,26 @@ dbapi.loadDevice = function(serial) {
return db.run(r.table('devices').get(serial)) return db.run(r.table('devices').get(serial))
} }
dbapi.updateDeviceHeartbeat = function(serial) {
return db.run(
r.table('devices').get(serial).update({
lastHeartbeatAt: r.now()
})
, {
noreply: true
, durability: 'soft'
}
)
}
dbapi.getDeadDevices = function(timeout) {
return db.run(
r.table('devices')
.between(null, r.now().sub(timeout / 1000), {
index: 'lastHeartbeatAt'
})
.pluck('serial')
)
}
module.exports = dbapi module.exports = dbapi

View file

@ -33,8 +33,15 @@ db.connect = (function() {
} }
})() })()
// Close connection, we don't really care if it hasn't been created yet or not
db.close = function() {
return db.connect().then(function(conn) {
return rutil.close(conn)
})
}
// Small utility for running queries without having to acquire a connection // Small utility for running queries without having to acquire a connection
db.run = function(q) { db.run = function(q, options) {
return db.connect().then(function(conn) { return db.connect().then(function(conn) {
return rutil.run(conn, q) return rutil.run(conn, q)
}) })

View file

@ -1,3 +1,5 @@
var r = require('rethinkdb')
module.exports = { module.exports = {
users: { users: {
primaryKey: 'email' primaryKey: 'email'
@ -8,6 +10,7 @@ module.exports = {
ownerEmail: function(device) { ownerEmail: function(device) {
return device('owner')('email') return device('owner')('email')
} }
, lastHeartbeatAt: null
} }
} }
, logs: { , logs: {

View file

@ -558,6 +558,16 @@ module.exports = function(options) {
owner = null owner = null
} }
function heartbeat() {
push.send([
wireutil.heartbeat
, wireutil.envelope(new wire.DeviceHeartbeatMessage(
options.serial
))
])
setTimeout(heartbeat, options.heartbeatInterval)
}
function selfDestruct() { function selfDestruct() {
process.exit(1) process.exit(1)
} }
@ -580,4 +590,6 @@ module.exports = function(options) {
process.on('SIGTERM', function() { process.on('SIGTERM', function() {
gracefullyExit() gracefullyExit()
}) })
heartbeat()
} }

View file

@ -65,6 +65,9 @@ module.exports = function(options) {
}) })
}) })
// Worker messages // Worker messages
.on(wire.DeviceHeartbeatMessage, function(channel, message) {
dbapi.updateDeviceHeartbeat(message.serial)
})
.on(wire.JoinGroupMessage, function(channel, message, data) { .on(wire.JoinGroupMessage, function(channel, message, data) {
dbapi.setDeviceOwner(message.serial, message.owner) dbapi.setDeviceOwner(message.serial, message.owner)
appDealer.send([channel, data]) appDealer.send([channel, data])

73
lib/roles/reaper.js Normal file
View file

@ -0,0 +1,73 @@
var util = require('util')
var Promise = require('bluebird')
var zmq = require('zmq')
var logger = require('../util/logger')
var wire = require('../wire')
var wireutil = require('../wire/util')
var dbapi = require('../db/api')
module.exports = function(options) {
var log = logger.createLogger('reaper')
, quit = Promise.defer()
, timer
if (options.name) {
logger.setGlobalIdentifier(options.name)
}
// Output
var push = zmq.socket('push')
options.endpoints.push.forEach(function(endpoint) {
log.info('Sending output to %s', endpoint)
push.connect(endpoint)
})
function reap() {
dbapi.getDeadDevices(options.heartbeatTimeout)
.then(function(cursor) {
return Promise.promisify(cursor.toArray, cursor)()
.then(function(list) {
list.forEach(function(device) {
log.info('Reaping device "%s"', device.serial)
push.send([
wireutil.global
, wireutil.envelope(new wire.DeviceAbsentMessage(
device.serial
))
])
})
})
})
.catch(function(err) {
log.error('Failed to load device list: ', err.message, err.stack)
quit.reject(err)
})
}
timer = setInterval(reap, options.reapInterval)
process.on('SIGINT', function() {
quit.resolve()
})
process.on('SIGTERM', function() {
quit.resolve()
})
quit.promise.finally(function() {
clearInterval(timer)
dbapi.close()
.timeout(10000)
.then(function() {
log.info('Quit after cleanup')
})
.catch(Promise.TimeoutError, function() {
log.error('Cleanup operation timed out, closing unsafely')
process.exit(1)
})
})
log.info('Reaping devices with no heartbeat')
}

View file

@ -10,8 +10,21 @@ module.exports.connect = function(options) {
return resolver.promise return resolver.promise
} }
module.exports.run = function(conn, q) { module.exports.close = function(conn, options) {
var resolver = Promise.defer() var resolver = Promise.defer()
q.run(conn, resolver.callback) if (!options) {
options = {}
}
conn.close(options, resolver.callback)
return resolver.promise
}
module.exports.run = function(conn, q, options) {
var resolver = Promise.defer()
if (!options) {
options = {}
}
options.connection = conn
q.run(options, resolver.callback)
return resolver.promise return resolver.promise
} }

View file

@ -5,6 +5,7 @@ var wire = require('./')
var wireutil = { var wireutil = {
global: '*ALL' global: '*ALL'
, log: '*LOG' , log: '*LOG'
, heartbeat: '*HB'
, makePrivateChannel: function() { , makePrivateChannel: function() {
return uuid.v4(null, new Buffer(16)).toString('base64') return uuid.v4(null, new Buffer(16)).toString('base64')
} }

View file

@ -2,6 +2,7 @@
enum MessageType { enum MessageType {
DeviceAbsentMessage = 1; DeviceAbsentMessage = 1;
DeviceHeartbeatMessage = 28;
DeviceIdentityMessage = 2; DeviceIdentityMessage = 2;
DeviceLogcatEntryMessage = 3; DeviceLogcatEntryMessage = 3;
DeviceLogMessage = 4; DeviceLogMessage = 4;
@ -49,6 +50,12 @@ message TransactionDoneMessage {
optional string data = 4; optional string data = 4;
} }
// Heartbeat
message DeviceHeartbeatMessage {
required string serial = 1;
}
// Logging // Logging
message DeviceLogMessage { message DeviceLogMessage {