1
0
Fork 0
mirror of https://github.com/openstf/stf synced 2025-10-04 18:29:17 +02:00

Save device logs to rethinkdb.

This commit is contained in:
Simo Kinnunen 2014-01-29 19:50:47 +09:00
parent 39989c6e7a
commit 389db73f08
8 changed files with 140 additions and 41 deletions

View file

@ -13,6 +13,17 @@ module.exports.saveUserAfterLogin = function(user) {
})) }))
} }
module.exports.saveDeviceLog = function(serial, entry) {
return db.run(r.table('logs').insert({
serial: entry.serial
, timestamp: r.epochTime(entry.timestamp)
, priority: entry.priority
, tag: entry.tag
, pid: entry.pid
, message: entry.message
}))
}
module.exports.saveDeviceStatus = function(serial, status) { module.exports.saveDeviceStatus = function(serial, status) {
return db.run(r.table('devices').get(serial).update({ return db.run(r.table('devices').get(serial).update({
status: status status: status

View file

@ -11,28 +11,64 @@ var tables = require('./tables')
module.exports = function(conn) { module.exports = function(conn) {
var log = logger.createLogger('db:setup') var log = logger.createLogger('db:setup')
function alreadyExistsError(err) {
return err.msg && err.msg.indexOf('already exists') !== -1
}
function noMasterAvailableError(err) {
return err.msg && err.msg.indexOf('No master available') !== -1
}
function createDatabase() { function createDatabase() {
return rutil.run(conn, r.dbCreate(conn.db)) return rutil.run(conn, r.dbCreate(conn.db))
.then(function() { .then(function() {
log.info('Database "%s" created', conn.db) log.info('Database "%s" created', conn.db)
}) })
.catch(rutil.errors.RqlRuntimeError, function(err) { .catch(alreadyExistsError, function(err) {
var expected = util.format('Database `%s` already exists.', conn.db)
assert.equal(expected, err.msg)
log.info('Database "%s" already exists', conn.db) log.info('Database "%s" already exists', conn.db)
return Promise.resolve()
}) })
} }
function createTable(table, options) { function createTable(table, options) {
return rutil.run(conn, r.tableCreate(table, options)) var tableOptions = {
primaryKey: options.primaryKey
}
return rutil.run(conn, r.tableCreate(table, tableOptions))
.then(function() { .then(function() {
log.info('Table "%s" created', table) log.info('Table "%s" created', table)
})
.catch(alreadyExistsError, function(err) {
log.info('Table "%s" already exists', table)
return Promise.resolve() return Promise.resolve()
}) })
.catch(rutil.errors.RqlRuntimeError, function(err) { .catch(noMasterAvailableError, function(err) {
var expected = util.format('Table `%s` already exists.', table) return Promise.delay(1000).then(function() {
assert.equal(expected, err.msg) return createTable(table, options)
log.info('Table "%s" already exists', table) })
})
.then(function() {
if (options.indexes) {
return Promise.all(Object.keys(options.indexes).map(function(index) {
return createIndex(table, index, options.indexes[index])
}))
}
})
}
function createIndex(table, index, fn) {
return rutil.run(conn, r.table(table).indexCreate(index, fn))
.then(function() {
log.info('Index "%s"."%s" created', table, index)
})
.catch(alreadyExistsError, function(err) {
log.info('Index "%s"."%s" already exists', table, index)
return Promise.resolve()
})
.catch(noMasterAvailableError, function(err) {
return Promise.delay(1000).then(function() {
return createIndex(table, index, fn)
})
}) })
} }

View file

@ -5,4 +5,11 @@ module.exports = {
, devices: { , devices: {
primaryKey: 'serial' primaryKey: 'serial'
} }
, logs: {
primaryKey: 'id'
, indexes: {
serial: null
, timestamp: null
}
}
} }

View file

@ -41,6 +41,10 @@ module.exports = function(options) {
var message = wire.LeaveGroupMessage.decode(wrapper.message) var message = wire.LeaveGroupMessage.decode(wrapper.message)
appDealer.send([channel, data]) appDealer.send([channel, data])
break break
case wire.MessageType.DEVICE_LOG:
var message = wire.DeviceLogMessage.decode(wrapper.message)
dbapi.saveDeviceLog(message.serial, message)
break
case wire.MessageType.DEVICE_POKE: case wire.MessageType.DEVICE_POKE:
var message = wire.DevicePokeMessage.decode(wrapper.message) var message = wire.DevicePokeMessage.decode(wrapper.message)
devDealer.send([message.channel, wireutil.makeProbeMessage()]) devDealer.send([message.channel, wireutil.makeProbeMessage()])

View file

@ -35,12 +35,25 @@ module.exports = function(options) {
// Show serial number in logs // Show serial number in logs
logger.setGlobalIdentifier(options.serial) logger.setGlobalIdentifier(options.serial)
// Output
var push = zmq.socket('push')
options.endpoints.push.forEach(function(endpoint) {
log.info('Sending output to %s', endpoint)
push.connect(endpoint)
})
// Panic if necessary // Panic if necessary
Promise.onPossiblyUnhandledRejection(function(err, promise) { Promise.onPossiblyUnhandledRejection(function(err, promise) {
log.fatal('Unhandled rejection', err.stack) log.fatal('Unhandled rejection', err.stack)
selfDestruct() selfDestruct()
}) })
// Forward all logs
logger.on('entry', function(entry) {
push.send([wireutil.global,
wireutil.makeDeviceLogMessage(options.serial, entry)])
})
// Adb // Adb
var adb = Promise.promisifyAll(adbkit.createClient()) var adb = Promise.promisifyAll(adbkit.createClient())
@ -299,13 +312,6 @@ module.exports = function(options) {
} }
}) })
// Output
var push = zmq.socket('push')
options.endpoints.push.forEach(function(endpoint) {
log.info('Sending output to %s', endpoint)
push.connect(endpoint)
})
function poke() { function poke() {
push.send([wireutil.global, push.send([wireutil.global,
wireutil.makeDevicePokeMessage(options.serial, solo)]) wireutil.makeDevicePokeMessage(options.serial, solo)])

View file

@ -6,20 +6,28 @@ var colors = require('colors')
function Log(tag, stream) { function Log(tag, stream) {
this.tag = tag this.tag = tag
this.levels = { this.levels = {
DEBUG: 'DBG' DEBUG: 1
, VERBOSE: 'VRB' , VERBOSE: 2
, INFO: 'INF' , INFO: 3
, WARNING: 'WRN' , WARNING: 4
, ERROR: 'ERR' , ERROR: 5
, FATAL: 'FTL' , FATAL: 6
}
this.names = {
1: 'DBG'
, 2: 'VRB'
, 3: 'INF'
, 4: 'WRN'
, 5: 'ERR'
, 6: 'FTL'
} }
this.colors = { this.colors = {
DBG: 'grey' 1: 'grey'
, VRB: 'cyan' , 2: 'cyan'
, INF: 'green' , 3: 'green'
, WRN: 'yellow' , 4: 'yellow'
, ERR: 'red' , 5: 'red'
, FTL: 'red' , 6: 'red'
} }
this.localIdentifier = null this.localIdentifier = null
events.EventEmitter.call(this) events.EventEmitter.call(this)
@ -27,8 +35,8 @@ function Log(tag, stream) {
util.inherits(Log, events.EventEmitter) util.inherits(Log, events.EventEmitter)
Log.Entry = function(time, priority, tag, pid, identifier, message) { Log.Entry = function(timestamp, priority, tag, pid, identifier, message) {
this.time = time this.timestamp = timestamp
this.priority = priority this.priority = priority
this.tag = tag this.tag = tag
this.pid = pid this.pid = pid
@ -64,24 +72,20 @@ Log.prototype.fatal = function() {
this._write(this._entry(this.levels.FATAL, arguments)) this._write(this._entry(this.levels.FATAL, arguments))
} }
Log.prototype._color = function(priority) {
return priority[this.colors[priority]]
}
Log.prototype._entry = function(priority, args) { Log.prototype._entry = function(priority, args) {
return new Log.Entry( return new Log.Entry(
Date.now() Date.now()
, priority , priority
, this.tag , this.tag
, process.pid , process.pid
, this.localIdentifier || Log.globalIdentifier , this.localIdentifier || Logger.globalIdentifier
, util.format.apply(util, args) , util.format.apply(util, args)
) )
} }
Log.prototype._format = function(entry) { Log.prototype._format = function(entry) {
return util.format('%s/%s %d [%s] %s' return util.format('%s/%s %d [%s] %s'
, this._color(entry.priority) , this._name(entry.priority)
, entry.tag , entry.tag
, entry.pid , entry.pid
, entry.identifier , entry.identifier
@ -89,20 +93,27 @@ Log.prototype._format = function(entry) {
) )
} }
Log.prototype._name = function(priority) {
return this.names[priority][this.colors[priority]]
}
Log.prototype._write = function(entry) { Log.prototype._write = function(entry) {
console.error(this._format(entry)) console.error(this._format(entry))
this.emit('entry', entry) this.emit('entry', entry)
Logger.emit('entry', entry)
} }
Log.globalIdentifier = '*' var Logger = new events.EventEmitter()
Log.createLogger = function(tag) { Logger.globalIdentifier = '*'
Logger.createLogger = function(tag) {
return new Log(tag) return new Log(tag)
} }
Log.setGlobalIdentifier = function(identifier) { Logger.setGlobalIdentifier = function(identifier) {
Log.globalIdentifier = identifier Logger.globalIdentifier = identifier
return Log return Logger
} }
exports = module.exports = Log exports = module.exports = Logger

View file

@ -55,6 +55,18 @@ module.exports = function(wire) {
, envelope: function(type, message) { , envelope: function(type, message) {
return new wire.Envelope(type, message.encode()).encodeNB() return new wire.Envelope(type, message.encode()).encodeNB()
} }
, makeDeviceLogMessage: function(serial, entry) {
var message = new wire.DeviceLogMessage(
serial
, entry.timestamp / 1000
, entry.priority
, entry.tag
, entry.pid
, entry.message
)
return wireutil.envelope(wire.MessageType.DEVICE_LOG, message)
}
, makeGroupMessage: function(channel, timeout, requirements) { , makeGroupMessage: function(channel, timeout, requirements) {
var message = new wire.GroupMessage( var message = new wire.GroupMessage(
channel channel

View file

@ -14,6 +14,7 @@ enum MessageType {
DEVICE_DONE = 11; DEVICE_DONE = 11;
DEVICE_FAIL = 12; DEVICE_FAIL = 12;
DEVICE_IDENTITY = 13; DEVICE_IDENTITY = 13;
DEVICE_LOG = 14;
} }
message Envelope { message Envelope {
@ -21,6 +22,17 @@ message Envelope {
required bytes message = 2; required bytes message = 2;
} }
// Logging
message DeviceLogMessage {
required string serial = 1;
required double timestamp = 2;
required uint32 priority = 3;
required string tag = 4;
required uint32 pid = 5;
required string message = 6;
}
// Introductions // Introductions
message DevicePokeMessage { message DevicePokeMessage {