1
0
Fork 0
mirror of https://github.com/openstf/stf synced 2025-10-04 10:19:30 +02:00

Send heartbeats from the provider. This keeps unauthorized and offline devices present as well, which is what we want. This also greatly reduces the number of queries to rethinkdb.

This commit is contained in:
Simo Kinnunen 2014-06-02 14:51:26 +09:00
parent 2dda82253d
commit bebb3af42a
8 changed files with 37 additions and 40 deletions

View file

@ -46,6 +46,10 @@ program
.option('-r, --storage-url <url>'
, 'URL to storage client'
, String)
.option('--heartbeat-interval <ms>'
, 'heartbeat interval'
, Number
, 10000)
.action(function() {
var serials = cliutil.allUnknownArgs(arguments)
, options = cliutil.lastArg(arguments)
@ -63,6 +67,7 @@ program
require('./roles/provider')({
name: options.name
, killTimeout: 10000
, heartbeatInterval: options.heartbeatInterval
, ports: cliutil.range(options.minPort, options.maxPort)
, filter: function(device) {
return serials.length === 0 || serials.indexOf(device.id) !== -1
@ -106,10 +111,6 @@ program
, 'public ip for global access'
, String
, ip())
.option('--heartbeat-interval <ms>'
, 'heartbeat interval'
, Number
, 10000)
.option('-t, --group-timeout <seconds>'
, 'group timeout'
, Number
@ -143,7 +144,6 @@ program
sub: options.connectSub
, push: options.connectPush
}
, heartbeatInterval: options.heartbeatInterval
, groupTimeout: options.groupTimeout * 1000 // change to ms
, storageUrl: options.storageUrl
})

View file

@ -208,11 +208,14 @@ dbapi.loadDevice = function(serial) {
return db.run(r.table('devices').get(serial))
}
dbapi.updateDeviceHeartbeat = function(serial) {
dbapi.updateProviderHeartbeat = function(channel) {
return db.run(
r.table('devices').get(serial).update({
lastHeartbeatAt: r.now()
})
r.table('devices').getAll(channel, {
index: 'providerChannel'
})
.update({
lastHeartbeatAt: r.now()
})
, {
noreply: true
, durability: 'soft'

View file

@ -15,6 +15,9 @@ module.exports = {
)
}
, lastHeartbeatAt: null
, providerChannel: function(device) {
return device('provider')('channel')
}
}
}
, logs: {

View file

@ -17,7 +17,6 @@ module.exports = function(options) {
log.info('Preparing device')
return syrup.serial()
.dependency(require('./device/plugins/solo'))
.dependency(require('./device/plugins/heartbeat'))
.dependency(require('./device/plugins/display'))
.dependency(require('./device/plugins/screenshot'))
.dependency(require('./device/plugins/http'))

View file

@ -1,20 +0,0 @@
var syrup = require('syrup')
var wire = require('../../../wire')
var wireutil = require('../../../wire/util')
module.exports = syrup.serial()
.dependency(require('../support/push'))
.define(function(options, push) {
function heartbeat() {
push.send([
wireutil.heartbeat
, wireutil.envelope(new wire.DeviceHeartbeatMessage(
options.serial
))
])
setTimeout(heartbeat, options.heartbeatInterval)
}
heartbeat()
})

View file

@ -33,6 +33,10 @@ module.exports = function(options) {
})
devDealer.on('message', wirerouter()
// Provider messages
.on(wire.ProviderHeartbeatMessage, function(channel, message) {
dbapi.updateProviderHeartbeat(message.channel)
})
// Initial device message
.on(wire.DevicePresentMessage, function(channel, message, data) {
dbapi.saveDevice(message.serial, message)
@ -66,9 +70,6 @@ module.exports = function(options) {
})
})
// Worker messages
.on(wire.DeviceHeartbeatMessage, function(channel, message) {
dbapi.updateDeviceHeartbeat(message.serial)
})
.on(wire.JoinGroupMessage, function(channel, message, data) {
dbapi.setDeviceOwner(message.serial, message.owner)
appDealer.send([channel, data])

View file

@ -362,6 +362,19 @@ module.exports = function(options) {
lifecycle.share('Tracker', tracker)
})
// This keeps the devices "present" in the database. It relies on the
// provider channel changing on every run so that we never match old
// devices.
;(function heartbeat() {
push.send([
wireutil.heartbeat
, wireutil.envelope(new wire.ProviderHeartbeatMessage(
solo
))
])
setTimeout(heartbeat, options.heartbeatInterval)
})()
lifecycle.observe(function() {
clearTimeout(totalsTimer)

View file

@ -3,7 +3,6 @@
enum MessageType {
CopyMessage = 33;
DeviceAbsentMessage = 1;
DeviceHeartbeatMessage = 28;
DeviceIdentityMessage = 2;
DeviceLogcatEntryMessage = 3;
DeviceLogMessage = 4;
@ -51,6 +50,7 @@ enum MessageType {
RotationEvent = 48;
StoreOpenMessage = 49;
ScreenCaptureMessage = 50;
ProviderHeartbeatMessage = 51;
}
message Envelope {
@ -74,12 +74,6 @@ message TransactionDoneMessage {
optional string body = 5;
}
// Heartbeat
message DeviceHeartbeatMessage {
required string serial = 1;
}
// Logging
message DeviceLogMessage {
@ -99,6 +93,10 @@ message ProviderMessage {
required string name = 2;
}
message ProviderHeartbeatMessage {
required string channel = 1;
}
message DevicePresentMessage {
required string serial = 1;
required DeviceStatus status = 2;