From df3a29d94cd6cf6bcdc2d6d1b05e2a6eb6239983 Mon Sep 17 00:00:00 2001 From: Simo Kinnunen Date: Fri, 21 Feb 2014 14:44:23 +0900 Subject: [PATCH] Use promises for socket client for better reliability. --- lib/roles/app.js | 154 +++++++++++++++++++++++++---------------------- 1 file changed, 83 insertions(+), 71 deletions(-) diff --git a/lib/roles/app.js b/lib/roles/app.js index 7260bbe7..f971c04b 100644 --- a/lib/roles/app.js +++ b/lib/roles/app.js @@ -24,7 +24,7 @@ module.exports = function(options) { , app = express() , server = http.createServer(app) , io = socketio.listen(server) - , groupRouter = new events.EventEmitter() + , channelRouter = new events.EventEmitter() app.set('view engine', 'jade') app.set('views', pathutil.resource('app/views')) @@ -76,7 +76,7 @@ module.exports = function(options) { }) sub.on('message', function(channel, data) { - groupRouter.emit(channel.toString(), channel, data) + channelRouter.emit(channel.toString(), channel, data) }) app.get('/partials/*', function(req, res) { @@ -202,13 +202,13 @@ module.exports = function(options) { function joinChannel(channel) { channels.push(channel) - groupRouter.on(channel, messageListener) + channelRouter.on(channel, messageListener) sub.subscribe(channel) } function leaveChannel(channel) { _.pull(channels, channel) - groupRouter.removeListener(channel, messageListener) + channelRouter.removeListener(channel, messageListener) sub.unsubscribe(channel) } @@ -267,80 +267,92 @@ module.exports = function(options) { // @todo Use socket.io to push global events to all clients instead // of listening on every connection, otherwise we're very likely to // hit EventEmitter's leak complaints (plus it's more work) - groupRouter.on(wireutil.global, messageListener) + channelRouter.on(wireutil.global, messageListener) // User's private group joinChannel(user.group) - socket - // Clean up all listeners and subscriptions - .on('disconnect', function() { - groupRouter.removeListener(wireutil.global, messageListener) - channels.forEach(function(channel) { - groupRouter.removeListener(channel, messageListener) - sub.unsubscribe(channel) + new Promise(function(resolve, reject) { + socket.on('disconnect', resolve) + // Grouping + .on('group.invite', function(data) { + push.send([ + wireutil.global + , wireutil.envelope(new wire.GroupMessage( + new wire.OwnerMessage( + user.email + , user.name + , user.group + ) + , options.groupTimeout + , wireutil.toDeviceRequirements(data) + )) + ]) }) - }) - // Grouping - .on('group.invite', function(data) { - push.send([ - wireutil.global - , wireutil.envelope(new wire.GroupMessage( - new wire.OwnerMessage( - user.email - , user.name - , user.group + .on('group.kick', function(data) { + push.send([ + user.group + , wireutil.envelope(new wire.UngroupMessage( + wireutil.toDeviceRequirements(data) + )) + ]) + }) + // Touch events + .on('input.touchDown', createTouchHandler(wire.TouchDownMessage)) + .on('input.touchMove', createTouchHandler(wire.TouchMoveMessage)) + .on('input.touchUp', createTouchHandler(wire.TouchUpMessage)) + .on('input.tap', createTouchHandler(wire.TapMessage)) + // Key events + .on('input.keyDown', createKeyHandler(wire.KeyDownMessage)) + .on('input.keyUp', createKeyHandler(wire.KeyUpMessage)) + .on('input.keyPress', createKeyHandler(wire.KeyPressMessage)) + .on('input.type', function(channel, data) { + push.send([ + channel + , wireutil.envelope(new wire.TypeMessage( + data.text + )) + ]) + }) + // Transactions + .on('tx.cleanup', function(channel) { + leaveChannel(channel) + }) + .on('shell.command', function(channel, responseChannel, data) { + joinChannel(responseChannel) + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.ShellCommandMessage(data) ) - , options.groupTimeout - , wireutil.toDeviceRequirements(data) - )) - ]) - }) - .on('group.kick', function(data) { - push.send([ - user.group - , wireutil.envelope(new wire.UngroupMessage( - wireutil.toDeviceRequirements(data) - )) - ]) - }) - // Touch events - .on('input.touchDown', createTouchHandler(wire.TouchDownMessage)) - .on('input.touchMove', createTouchHandler(wire.TouchMoveMessage)) - .on('input.touchUp', createTouchHandler(wire.TouchUpMessage)) - .on('input.tap', createTouchHandler(wire.TapMessage)) - // Key events - .on('input.keyDown', createKeyHandler(wire.KeyDownMessage)) - .on('input.keyUp', createKeyHandler(wire.KeyUpMessage)) - .on('input.keyPress', createKeyHandler(wire.KeyPressMessage)) - .on('input.type', function(channel, data) { - push.send([ - channel - , wireutil.envelope(new wire.TypeMessage( - data.text - )) - ]) - }) - // Transactions - .on('tx.cleanup', function(channel) { - leaveChannel(channel) - }) - .on('shell.command', function(channel, responseChannel, data) { - joinChannel(responseChannel) - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.ShellCommandMessage(data) - ) - ]) - }) - .on('shell.keepalive', function(channel, data) { - push.send([ - channel - , wireutil.envelope(new wire.ShellKeepAliveMessage(data)) - ]) + ]) + }) + .on('shell.keepalive', function(channel, data) { + push.send([ + channel + , wireutil.envelope(new wire.ShellKeepAliveMessage(data)) + ]) + }) + }) + .finally(function() { + // Clean up all listeners and subscriptions + channelRouter.removeListener(wireutil.global, messageListener) + channels.forEach(function(channel) { + channelRouter.removeListener(channel, messageListener) + sub.unsubscribe(channel) }) + }) + .catch(function(err) { + // Cannot guarantee integrity of client + log.error( + 'Client had an error, disconnecting due to probable loss of integrity' + , err.stack + ) + + socket.disconnect(true) + }) + /* socket.on('flick', function(data) {})