diff --git a/lib/db/api.js b/lib/db/api.js index d7525c73..e51dadf2 100644 --- a/lib/db/api.js +++ b/lib/db/api.js @@ -13,6 +13,17 @@ module.exports.saveUserAfterLogin = function(user) { })) } +module.exports.saveDeviceLog = function(serial, entry) { + return db.run(r.table('logs').insert({ + serial: entry.serial + , timestamp: r.epochTime(entry.timestamp) + , priority: entry.priority + , tag: entry.tag + , pid: entry.pid + , message: entry.message + })) +} + module.exports.saveDeviceStatus = function(serial, status) { return db.run(r.table('devices').get(serial).update({ status: status diff --git a/lib/db/setup.js b/lib/db/setup.js index e5806bfd..bc353f0a 100644 --- a/lib/db/setup.js +++ b/lib/db/setup.js @@ -11,28 +11,64 @@ var tables = require('./tables') module.exports = function(conn) { var log = logger.createLogger('db:setup') + function alreadyExistsError(err) { + return err.msg && err.msg.indexOf('already exists') !== -1 + } + + function noMasterAvailableError(err) { + return err.msg && err.msg.indexOf('No master available') !== -1 + } + function createDatabase() { return rutil.run(conn, r.dbCreate(conn.db)) .then(function() { log.info('Database "%s" created', conn.db) }) - .catch(rutil.errors.RqlRuntimeError, function(err) { - var expected = util.format('Database `%s` already exists.', conn.db) - assert.equal(expected, err.msg) + .catch(alreadyExistsError, function(err) { log.info('Database "%s" already exists', conn.db) + return Promise.resolve() }) } function createTable(table, options) { - return rutil.run(conn, r.tableCreate(table, options)) + var tableOptions = { + primaryKey: options.primaryKey + } + return rutil.run(conn, r.tableCreate(table, tableOptions)) .then(function() { log.info('Table "%s" created', table) + }) + .catch(alreadyExistsError, function(err) { + log.info('Table "%s" already exists', table) return Promise.resolve() }) - .catch(rutil.errors.RqlRuntimeError, function(err) { - var expected = util.format('Table `%s` already exists.', table) - assert.equal(expected, err.msg) - log.info('Table "%s" already exists', table) + .catch(noMasterAvailableError, function(err) { + return Promise.delay(1000).then(function() { + return createTable(table, options) + }) + }) + .then(function() { + if (options.indexes) { + return Promise.all(Object.keys(options.indexes).map(function(index) { + return createIndex(table, index, options.indexes[index]) + })) + } + }) + } + + function createIndex(table, index, fn) { + return rutil.run(conn, r.table(table).indexCreate(index, fn)) + .then(function() { + log.info('Index "%s"."%s" created', table, index) + }) + .catch(alreadyExistsError, function(err) { + log.info('Index "%s"."%s" already exists', table, index) + return Promise.resolve() + }) + .catch(noMasterAvailableError, function(err) { + return Promise.delay(1000).then(function() { + return createIndex(table, index, fn) + }) }) } diff --git a/lib/db/tables.js b/lib/db/tables.js index 76e469b4..d692d8a5 100644 --- a/lib/db/tables.js +++ b/lib/db/tables.js @@ -5,4 +5,11 @@ module.exports = { , devices: { primaryKey: 'serial' } +, logs: { + primaryKey: 'id' + , indexes: { + serial: null + , timestamp: null + } + } } diff --git a/lib/roles/coordinator.js b/lib/roles/coordinator.js index 6eeb4cff..54c04273 100644 --- a/lib/roles/coordinator.js +++ b/lib/roles/coordinator.js @@ -41,6 +41,10 @@ module.exports = function(options) { var message = wire.LeaveGroupMessage.decode(wrapper.message) appDealer.send([channel, data]) break + case wire.MessageType.DEVICE_LOG: + var message = wire.DeviceLogMessage.decode(wrapper.message) + dbapi.saveDeviceLog(message.serial, message) + break case wire.MessageType.DEVICE_POKE: var message = wire.DevicePokeMessage.decode(wrapper.message) devDealer.send([message.channel, wireutil.makeProbeMessage()]) diff --git a/lib/roles/device.js b/lib/roles/device.js index ad961d3e..4aa40685 100644 --- a/lib/roles/device.js +++ b/lib/roles/device.js @@ -35,12 +35,25 @@ module.exports = function(options) { // Show serial number in logs logger.setGlobalIdentifier(options.serial) + // Output + var push = zmq.socket('push') + options.endpoints.push.forEach(function(endpoint) { + log.info('Sending output to %s', endpoint) + push.connect(endpoint) + }) + // Panic if necessary Promise.onPossiblyUnhandledRejection(function(err, promise) { log.fatal('Unhandled rejection', err.stack) selfDestruct() }) + // Forward all logs + logger.on('entry', function(entry) { + push.send([wireutil.global, + wireutil.makeDeviceLogMessage(options.serial, entry)]) + }) + // Adb var adb = Promise.promisifyAll(adbkit.createClient()) @@ -299,13 +312,6 @@ module.exports = function(options) { } }) - // Output - var push = zmq.socket('push') - options.endpoints.push.forEach(function(endpoint) { - log.info('Sending output to %s', endpoint) - push.connect(endpoint) - }) - function poke() { push.send([wireutil.global, wireutil.makeDevicePokeMessage(options.serial, solo)]) diff --git a/lib/util/logger.js b/lib/util/logger.js index 46de6afa..83f03409 100644 --- a/lib/util/logger.js +++ b/lib/util/logger.js @@ -6,20 +6,28 @@ var colors = require('colors') function Log(tag, stream) { this.tag = tag this.levels = { - DEBUG: 'DBG' - , VERBOSE: 'VRB' - , INFO: 'INF' - , WARNING: 'WRN' - , ERROR: 'ERR' - , FATAL: 'FTL' + DEBUG: 1 + , VERBOSE: 2 + , INFO: 3 + , WARNING: 4 + , ERROR: 5 + , FATAL: 6 + } + this.names = { + 1: 'DBG' + , 2: 'VRB' + , 3: 'INF' + , 4: 'WRN' + , 5: 'ERR' + , 6: 'FTL' } this.colors = { - DBG: 'grey' - , VRB: 'cyan' - , INF: 'green' - , WRN: 'yellow' - , ERR: 'red' - , FTL: 'red' + 1: 'grey' + , 2: 'cyan' + , 3: 'green' + , 4: 'yellow' + , 5: 'red' + , 6: 'red' } this.localIdentifier = null events.EventEmitter.call(this) @@ -27,8 +35,8 @@ function Log(tag, stream) { util.inherits(Log, events.EventEmitter) -Log.Entry = function(time, priority, tag, pid, identifier, message) { - this.time = time +Log.Entry = function(timestamp, priority, tag, pid, identifier, message) { + this.timestamp = timestamp this.priority = priority this.tag = tag this.pid = pid @@ -64,24 +72,20 @@ Log.prototype.fatal = function() { this._write(this._entry(this.levels.FATAL, arguments)) } -Log.prototype._color = function(priority) { - return priority[this.colors[priority]] -} - Log.prototype._entry = function(priority, args) { return new Log.Entry( Date.now() , priority , this.tag , process.pid - , this.localIdentifier || Log.globalIdentifier + , this.localIdentifier || Logger.globalIdentifier , util.format.apply(util, args) ) } Log.prototype._format = function(entry) { return util.format('%s/%s %d [%s] %s' - , this._color(entry.priority) + , this._name(entry.priority) , entry.tag , entry.pid , entry.identifier @@ -89,20 +93,27 @@ Log.prototype._format = function(entry) { ) } +Log.prototype._name = function(priority) { + return this.names[priority][this.colors[priority]] +} + Log.prototype._write = function(entry) { console.error(this._format(entry)) this.emit('entry', entry) + Logger.emit('entry', entry) } -Log.globalIdentifier = '*' +var Logger = new events.EventEmitter() -Log.createLogger = function(tag) { +Logger.globalIdentifier = '*' + +Logger.createLogger = function(tag) { return new Log(tag) } -Log.setGlobalIdentifier = function(identifier) { - Log.globalIdentifier = identifier - return Log +Logger.setGlobalIdentifier = function(identifier) { + Logger.globalIdentifier = identifier + return Logger } -exports = module.exports = Log +exports = module.exports = Logger diff --git a/lib/util/wireutil.js b/lib/util/wireutil.js index 270e8b0d..082d0e50 100644 --- a/lib/util/wireutil.js +++ b/lib/util/wireutil.js @@ -55,6 +55,18 @@ module.exports = function(wire) { , envelope: function(type, message) { return new wire.Envelope(type, message.encode()).encodeNB() } + , makeDeviceLogMessage: function(serial, entry) { + var message = new wire.DeviceLogMessage( + serial + , entry.timestamp / 1000 + , entry.priority + , entry.tag + , entry.pid + , entry.message + ) + + return wireutil.envelope(wire.MessageType.DEVICE_LOG, message) + } , makeGroupMessage: function(channel, timeout, requirements) { var message = new wire.GroupMessage( channel diff --git a/lib/wire/wire.proto b/lib/wire/wire.proto index baf2ec2d..31a49bc0 100644 --- a/lib/wire/wire.proto +++ b/lib/wire/wire.proto @@ -14,6 +14,7 @@ enum MessageType { DEVICE_DONE = 11; DEVICE_FAIL = 12; DEVICE_IDENTITY = 13; + DEVICE_LOG = 14; } message Envelope { @@ -21,6 +22,17 @@ message Envelope { required bytes message = 2; } +// Logging + +message DeviceLogMessage { + required string serial = 1; + required double timestamp = 2; + required uint32 priority = 3; + required string tag = 4; + required uint32 pid = 5; + required string message = 6; +} + // Introductions message DevicePokeMessage {