1
0
Fork 0
mirror of https://github.com/openstf/stf synced 2025-10-04 10:19:30 +02:00

Get rid of switch statements in favor of an EventEmitter-based router for message handling.

This commit is contained in:
Simo Kinnunen 2014-02-02 19:55:44 +09:00
parent ade611670e
commit 48d5648b9d
8 changed files with 224 additions and 188 deletions

View file

@ -168,18 +168,14 @@ module.exports = function(options) {
var channels = [] var channels = []
, group = socket.handshake.user.group , group = socket.handshake.user.group
function messageListener(channel, wrapper) { var messageListener = wirerouter()
switch (wrapper.type) { .on(wire.MessageType.JOIN_GROUP, function(channel, message) {
case wire.MessageType.JOIN_GROUP: socket.emit('join', message)
var message = wire.JoinGroupMessage.decode(wrapper.message) })
socket.emit('join', message) .on(wire.MessageType.LEAVE_GROUP, function(channel, message) {
break socket.emit('leave', message)
case wire.MessageType.LEAVE_GROUP: })
var message = wire.LeaveGroupMessage.decode(wrapper.message) .handler()
socket.emit('leave', message)
break
}
}
// Global messages // Global messages
// //

View file

@ -13,6 +13,7 @@ var split = require('split')
var logger = require('../util/logger') var logger = require('../util/logger')
var wire = require('../wire') var wire = require('../wire')
var wireutil = require('../util/wireutil')(wire) var wireutil = require('../util/wireutil')(wire)
var wirerouter = require('../wire/router')
var devutil = require('../util/devutil') var devutil = require('../util/devutil')
var pathutil = require('../util/pathutil') var pathutil = require('../util/pathutil')
var promiseutil = require('../util/promiseutil') var promiseutil = require('../util/promiseutil')
@ -326,90 +327,86 @@ module.exports = function(options) {
selfDestruct() selfDestruct()
}) })
sub.on('message', function(channel, data) { sub.on('message', wirerouter()
var wrapper = wire.Envelope.decode(data) .on(wire.MessageType.ProbeMessage, function(channel, message) {
channels.keepalive(channel) push.send([channel,
switch (wrapper.type) { wireutil.makeDeviceIdentityMessage(options.serial, identity)])
case wire.MessageType.PROBE: channels.keepalive(channel)
var message = wire.ProbeMessage.decode(wrapper.message) })
push.send([channel, .on(wire.MessageType.GroupMessage, function(channel, message) {
wireutil.makeDeviceIdentityMessage(options.serial, identity)]) var groupChannel = message.channel
break if (devutil.matchesRequirements(identity, message.requirements)) {
case wire.MessageType.GROUP: channels.register(groupChannel, message.timeout)
var message = wire.GroupMessage.decode(wrapper.message) log.info('Subscribing to group channel "%s"', groupChannel)
, groupChannel = message.channel sub.subscribe(groupChannel)
if (devutil.matchesRequirements(identity, message.requirements)) { push.send([groupChannel,
channels.register(groupChannel, message.timeout) wireutil.makeJoinGroupMessage(options.serial)])
log.info('Subscribing to group channel "%s"', groupChannel) }
sub.subscribe(groupChannel) channels.keepalive(channel)
push.send([groupChannel, })
wireutil.makeJoinGroupMessage(options.serial)]) .on(wire.MessageType.UngroupMessage, function(channel, message) {
} var groupChannel = message.channel
break if (devutil.matchesRequirements(identity, message.requirements)) {
case wire.MessageType.UNGROUP: channels.unregister(groupChannel)
var message = wire.UngroupMessage.decode(wrapper.message) log.info('Unsubscribing from group channel "%s"', groupChannel)
, groupChannel = message.channel sub.unsubscribe(groupChannel)
if (devutil.matchesRequirements(identity, message.requirements)) { push.send([groupChannel,
channels.unregister(groupChannel) wireutil.makeLeaveGroupMessage(options.serial)])
log.info('Unsubscribing from group channel "%s"', groupChannel) }
sub.unsubscribe(groupChannel) channels.keepalive(channel)
push.send([groupChannel, })
wireutil.makeLeaveGroupMessage(options.serial)]) .on(wire.MessageType.ShellCommandMessage, function(channel, message) {
} log.info('Running shell command "%s"', message.command.join(' '))
break adb.shellAsync(options.serial, message.command)
case wire.MessageType.SHELL_COMMAND: .then(function(stream) {
var message = wire.ShellCommandMessage.decode(wrapper.message) var resolver = Promise.defer()
log.info('Running shell command "%s"', message.command.join(' ')) , seq = 0
adb.shellAsync(options.serial, message.command)
.then(function(stream) {
var resolver = Promise.defer()
, seq = 0
function dataListener(chunk) { function dataListener(chunk) {
push.send([message.channel,
wireutil.makeDeviceDataMessage(
options.serial
, seq++
, chunk
)])
}
function endListener() {
push.send([message.channel,
wireutil.makeDeviceDoneMessage(options.serial)])
resolver.resolve()
}
function errorListener(err) {
log.error('Shell command "%s" failed due to "%s"'
, message.command.join(' '), err.message)
resolver.reject(err)
push.send([message.channel,
wireutil.makeDeviceFailMessage(
options.serial
, err.message
)])
}
stream.on('data', dataListener)
stream.on('end', endListener)
stream.on('error', errorListener)
return resolver.promise.finally(function() {
stream.removeListener('data', dataListener)
stream.removeListener('end', endListener)
stream.removeListener('error', errorListener)
})
})
.error(function(err) {
log.error('Shell command "%s" failed due to "%s"'
, message.command.join(' '), err.message)
push.send([message.channel, push.send([message.channel,
wire.makeDeviceFailMessage(options.serial, err.message)]) wireutil.makeShellCommandDataMessage(
options.serial
, seq++
, chunk
)])
}
function endListener() {
push.send([message.channel,
wireutil.makeShellCommandDoneMessage(options.serial)])
resolver.resolve()
}
function errorListener(err) {
log.error('Shell command "%s" failed due to "%s"'
, message.command.join(' '), err.message)
resolver.reject(err)
push.send([message.channel,
wireutil.makeShellCommandFailMessage(
options.serial
, err.message
)])
}
stream.on('data', dataListener)
stream.on('end', endListener)
stream.on('error', errorListener)
return resolver.promise.finally(function() {
stream.removeListener('data', dataListener)
stream.removeListener('end', endListener)
stream.removeListener('error', errorListener)
}) })
break })
} .error(function(err) {
}) log.error('Shell command "%s" failed due to "%s"'
, message.command.join(' '), err.message)
push.send([message.channel,
wire.makeShellCommandFailMessage(options.serial, err.message)])
})
channels.keepalive(channel)
})
.handler())
function poke() { function poke() {
push.send([wireutil.global, push.send([wireutil.global,

View file

@ -6,6 +6,7 @@ var zmq = require('zmq')
var logger = require('../../util/logger') var logger = require('../../util/logger')
var wire = require('../../wire') var wire = require('../../wire')
var wirerouter = require('../../wire/router')
var wireutil = require('../../util/wireutil')(wire) var wireutil = require('../../util/wireutil')(wire)
module.exports = function(options) { module.exports = function(options) {
@ -27,19 +28,15 @@ module.exports = function(options) {
sub.subscribe(channel) sub.subscribe(channel)
}) })
sub.on('message', function(channel, data) { sub.on('message', wirerouter()
var wrapper = wire.Envelope.decode(data) .on(wire.MessageType.DeviceLogMessage, function(channel, message) {
switch (wrapper.type) { if (message.priority >= options.priority) {
case wire.MessageType.DEVICE_LOG: buffer.push(message)
var message = wire.DeviceLogMessage.decode(wrapper.message) clearTimeout(timer)
if (message.priority >= options.priority) { timer = setTimeout(push, 1000)
buffer.push(message) }
clearTimeout(timer) })
timer = setTimeout(push, 1000) .handler())
}
break
}
})
function push() { function push() {
var messages = buffer.splice(0).map(function(entry) { var messages = buffer.splice(0).map(function(entry) {

View file

@ -2,6 +2,7 @@ var zmq = require('zmq')
var logger = require('../util/logger') var logger = require('../util/logger')
var wire = require('../wire') var wire = require('../wire')
var wirerouter = require('../wire/router')
var wireutil = require('../util/wireutil')(wire) var wireutil = require('../util/wireutil')(wire)
var dbapi = require('../db/api') var dbapi = require('../db/api')
@ -30,49 +31,34 @@ module.exports = function(options) {
devDealer.connect(endpoint) devDealer.connect(endpoint)
}) })
devDealer.on('message', function(channel, data) { devDealer.on('message', wirerouter()
var wrapper = wire.Envelope.decode(data) .on(wire.MessageType.JoinGroupMessage, function(channel, message, data) {
switch (wrapper.type) { appDealer.send([channel, data])
case wire.MessageType.JOIN_GROUP: })
var message = wire.JoinGroupMessage.decode(wrapper.message) .on(wire.MessageType.LeaveGroupMessage, function(channel, message, data) {
appDealer.send([channel, data]) appDealer.send([channel, data])
break })
case wire.MessageType.LEAVE_GROUP: .on(wire.MessageType.DeviceLogMessage, function(channel, message, data) {
var message = wire.LeaveGroupMessage.decode(wrapper.message) dbapi.saveDeviceLog(message.serial, message)
appDealer.send([channel, data]) appDealer.send([channel, data])
break })
case wire.MessageType.DEVICE_LOG: .on(wire.MessageType.DevicePokeMessage, function(channel, message) {
var message = wire.DeviceLogMessage.decode(wrapper.message) devDealer.send([message.channel, wireutil.makeProbeMessage()])
dbapi.saveDeviceLog(message.serial, message) })
appDealer.send([channel, data]) .on(wire.MessageType.DeviceIdentityMessage, function(channel, message) {
break dbapi.saveDeviceIdentity(message.serial, message)
case wire.MessageType.DEVICE_POKE: })
var message = wire.DevicePokeMessage.decode(wrapper.message) .on(wire.MessageType.DeviceStatusMessage, function(channel, message) {
devDealer.send([message.channel, wireutil.makeProbeMessage()]) dbapi.saveDeviceStatus(message.serial, message)
break })
case wire.MessageType.DEVICE_IDENTITY: .on(wire.MessageType.DeviceShellCommandDataMessage, function(channel, message, data) {
var message = wire.DeviceIdentityMessage.decode(wrapper.message) appDealer.send([channel, data])
dbapi.saveDeviceIdentity(message.serial, message) })
break .on(wire.MessageType.DeviceShellCommandDoneMessage, function(channel, message, data) {
case wire.MessageType.DEVICE_STATUS: appDealer.send([channel, data])
var message = wire.DeviceStatusMessage.decode(wrapper.message) })
dbapi.saveDeviceStatus(message.serial, message) .on(wire.MessageType.DeviceShellCommandFailMessage, function(channel, message, data) {
break appDealer.send([channel, data])
case wire.MessageType.DEVICE_PROPERTIES: })
var message = wire.DevicePropertiesMessage.decode(wrapper.message) .handler())
break
case wire.MessageType.DEVICE_DATA:
var message = wire.DeviceDataMessage.decode(wrapper.message)
appDealer.send([channel, data])
break
case wire.MessageType.DEVICE_DONE:
var message = wire.DeviceDoneMessage.decode(wrapper.message)
appDealer.send([channel, data])
break
case wire.MessageType.DEVICE_FAIL:
var message = wire.DeviceFailMessage.decode(wrapper.message)
appDealer.send([channel, data])
break
}
})
} }

View file

@ -30,7 +30,7 @@ module.exports = function(wire) {
, entry.identifier , entry.identifier
) )
return wireutil.envelope(wire.MessageType.DEVICE_LOG, message) return wireutil.envelope(wire.MessageType.DeviceLogMessage, message)
} }
, makeGroupMessage: function(channel, timeout, requirements) { , makeGroupMessage: function(channel, timeout, requirements) {
var message = new wire.GroupMessage( var message = new wire.GroupMessage(
@ -39,23 +39,23 @@ module.exports = function(wire) {
, requirements , requirements
) )
return wireutil.envelope(wire.MessageType.GROUP, message) return wireutil.envelope(wire.MessageType.GroupMessage, message)
} }
, makeUngroupMessage: function(requirements) { , makeUngroupMessage: function(requirements) {
var message = new wire.UngroupMessage(requirements) var message = new wire.UngroupMessage(requirements)
return wireutil.envelope(wire.MessageType.UNGROUP, message) return wireutil.envelope(wire.MessageType.UngroupMessage, message)
} }
, makeJoinGroupMessage: function(serial) { , makeJoinGroupMessage: function(serial) {
var message = new wire.JoinGroupMessage(serial) var message = new wire.JoinGroupMessage(serial)
return wireutil.envelope(wire.MessageType.JOIN_GROUP, message) return wireutil.envelope(wire.MessageType.JoinGroupMessage, message)
} }
, makeLeaveGroupMessage: function(serial) { , makeLeaveGroupMessage: function(serial) {
var message = new wire.LeaveGroupMessage(serial) var message = new wire.LeaveGroupMessage(serial)
return wireutil.envelope(wire.MessageType.LEAVE_GROUP, message) return wireutil.envelope(wire.MessageType.LeaveGroupMessage, message)
} }
, makeDevicePokeMessage: function(serial, channel) { , makeDevicePokeMessage: function(serial, channel) {
var message = new wire.DevicePokeMessage(serial, channel) var message = new wire.DevicePokeMessage(serial, channel)
return wireutil.envelope(wire.MessageType.DEVICE_POKE, message) return wireutil.envelope(wire.MessageType.DevicePokeMessage, message)
} }
, makeDeviceIdentityMessage: function(serial, identity) { , makeDeviceIdentityMessage: function(serial, identity) {
var message = new wire.DeviceIdentityMessage( var message = new wire.DeviceIdentityMessage(
@ -81,7 +81,7 @@ module.exports = function(wire) {
) )
) )
return wireutil.envelope(wire.MessageType.DEVICE_IDENTITY, message) return wireutil.envelope(wire.MessageType.DeviceIdentityMessage, message)
} }
, makeDevicePropertiesMessage: function(serial, properties) { , makeDevicePropertiesMessage: function(serial, properties) {
var message = new wire.DevicePropertiesMessage( var message = new wire.DevicePropertiesMessage(
@ -91,7 +91,10 @@ module.exports = function(wire) {
}) })
) )
return wireutil.envelope(wire.MessageType.DEVICE_PROPERTIES, message) return wireutil.envelope(
wire.MessageType.DevicePropertiesMessage
, message
)
} }
, makeDeviceStatusMessage: function(serial, type, provider) { , makeDeviceStatusMessage: function(serial, type, provider) {
var message = new wire.DeviceStatusMessage( var message = new wire.DeviceStatusMessage(
@ -100,27 +103,36 @@ module.exports = function(wire) {
, provider , provider
) )
return wireutil.envelope(wire.MessageType.DEVICE_STATUS, message) return wireutil.envelope(wire.MessageType.DeviceStatusMessage, message)
} }
, makeProbeMessage: function() { , makeProbeMessage: function() {
var message = new wire.ProbeMessage() var message = new wire.ProbeMessage()
return wireutil.envelope(wire.MessageType.PROBE, message) return wireutil.envelope(wire.MessageType.ProbeMessage, message)
} }
, makeShellCommandMessage: function(channel, command) { , makeShellCommandMessage: function(channel, command) {
var message = new wire.ShellCommandMessage(channel, command) var message = new wire.ShellCommandMessage(channel, command)
return wireutil.envelope(wire.MessageType.SHELL_COMMAND, message) return wireutil.envelope(wire.MessageType.ShellCommandMessage, message)
} }
, makeDeviceDataMessage: function(serial, seq, chunk) { , makeShellCommandDataMessage: function(serial, seq, chunk) {
var message = new wire.DeviceDataMessage(serial, seq, chunk) var message = new wire.ShellCommandDataMessage(serial, seq, chunk)
return wireutil.envelope(wire.MessageType.DEVICE_DATA, message) return wireutil.envelope(
wire.MessageType.ShellCommandDataMessage
, message
)
} }
, makeDeviceDoneMessage: function(serial) { , makeShellCommandDoneMessage: function(serial) {
var message = new wire.DeviceDoneMessage(serial) var message = new wire.ShellCommandDoneMessage(serial)
return wireutil.envelope(wire.MessageType.DEVICE_DONE, message) return wireutil.envelope(
wire.MessageType.ShellCommandDoneMessage
, message
)
} }
, makeDeviceFailMessage: function(serial, reason) { , makeShellCommandFailMessage: function(serial, reason) {
var message = new wire.DeviceFailMessage(serial, reason) var message = new wire.ShellCommandFailMessage(serial, reason)
return wireutil.envelope(wire.MessageType.DEVICE_FAIL, message) return wireutil.envelope(
wire.MessageType.ShellCommandFailMessage
, message
)
} }
} }

View file

@ -2,5 +2,15 @@ var path = require('path')
var ProtoBuf = require('protobufjs') var ProtoBuf = require('protobufjs')
module.exports = var wire = ProtoBuf.loadProtoFile(path.join(__dirname, 'wire.proto')).build()
ProtoBuf.loadProtoFile(path.join(__dirname, 'wire.proto')).build()
wire.ReverseMessageType = Object.keys(wire.MessageType)
.reduce(
function(acc, type) {
acc[wire.MessageType[type]] = type
return acc
}
, Object.create(null)
)
module.exports = wire

39
lib/wire/router.js Normal file
View file

@ -0,0 +1,39 @@
var events = require('events')
var util = require('util')
var wire = require('./')
var log = require('../util/logger').createLogger('wire:router')
function Router() {
if (!(this instanceof Router)) {
return new Router()
}
events.EventEmitter.call(this)
}
util.inherits(Router, events.EventEmitter)
Router.prototype.handler = function() {
return function(channel, data) {
var wrapper = wire.Envelope.decode(data)
, type = wire.ReverseMessageType[wrapper.type]
if (type) {
this.emit(
wrapper.type
, channel
, wire[type].decode(wrapper.message)
, data
)
}
else {
log.warn(
'Unknown message type "%d", perhaps we need an update?'
, wrapper.type
)
}
}.bind(this)
}
module.exports = Router

View file

@ -1,21 +1,20 @@
// Message wrapper // Message wrapper
enum MessageType { enum MessageType {
DEVICE_POKE = 1; DevicePokeMessage = 1;
DEVICE_STATUS = 2; DeviceStatusMessage = 2;
DEVICE_TYPE = 3; DevicePropertiesMessage = 4;
DEVICE_PROPERTIES = 4; GroupMessage = 5;
GROUP = 5; UngroupMessage = 15;
UNGROUP = 15; JoinGroupMessage = 6;
JOIN_GROUP = 6; LeaveGroupMessage = 7;
LEAVE_GROUP = 7; ProbeMessage = 8;
PROBE = 8; ShellCommandMessage = 9;
SHELL_COMMAND = 9; ShellCommandDataMessage = 10;
DEVICE_DATA = 10; ShellCommandDoneMessage = 11;
DEVICE_DONE = 11; ShellCommandFailMessage = 12;
DEVICE_FAIL = 12; DeviceIdentityMessage = 13;
DEVICE_IDENTITY = 13; DeviceLogMessage = 14;
DEVICE_LOG = 14;
} }
message Envelope { message Envelope {