From 984c45b18320235aba4db9068b1d3939d70b44cb Mon Sep 17 00:00:00 2001 From: Simo Kinnunen Date: Fri, 6 Jun 2014 15:02:29 +0900 Subject: [PATCH] Separate websocket to its own role. Necessary because the app was getting pretty big already, but mostly because our Windows PCs don't like to connect to websockets on port 80, which is what we use for the app. --- doc/topo-v1.ditaa | 2 +- lib/cli.js | 85 ++- lib/roles/app.js | 591 +----------------- lib/{ => roles/app}/middleware/auth.js | 6 +- .../app}/middleware/browser-icons.js | 2 +- .../app}/middleware/device-icons.js | 2 +- lib/{ => roles/app}/middleware/webpack.js | 6 +- lib/roles/websocket.js | 570 +++++++++++++++++ lib/roles/websocket/middleware/auth.js | 22 + .../websocket/middleware/cookie-session.js | 10 + lib/roles/websocket/middleware/remote-ip.js | 9 + .../components/stf/socket/socket-service.js | 4 +- res/app/views/index.jade | 1 + 13 files changed, 707 insertions(+), 603 deletions(-) rename lib/{ => roles/app}/middleware/auth.js (90%) rename lib/{ => roles/app}/middleware/browser-icons.js (75%) rename lib/{ => roles/app}/middleware/device-icons.js (74%) rename lib/{ => roles/app}/middleware/webpack.js (94%) create mode 100644 lib/roles/websocket.js create mode 100644 lib/roles/websocket/middleware/auth.js create mode 100644 lib/roles/websocket/middleware/cookie-session.js create mode 100644 lib/roles/websocket/middleware/remote-ip.js diff --git a/doc/topo-v1.ditaa b/doc/topo-v1.ditaa index 047bf79a..bbbabd2d 100644 --- a/doc/topo-v1.ditaa +++ b/doc/topo-v1.ditaa @@ -1,5 +1,5 @@ /------------\ /------------\ /------------\ - | app | | app | | app | + | websocket | | websocket | | websocket | |------------| |------------| |------------| x N | PUSH | SUB | | PUSH | SUB | | PUSH | SUB | \------------/ \------------/ \------------/ diff --git a/lib/cli.js b/lib/cli.js index 91d9964e..ff098236 100644 --- a/lib/cli.js +++ b/lib/cli.js @@ -402,6 +402,9 @@ program .option('-a, --auth-url ' , 'URL to auth client' , String) + .option('-w, --websocket-url ' + , 'URL to websocket client' + , String) .option('-r, --storage-url ' , 'URL to storage client' , String) @@ -411,12 +414,6 @@ program .option('--storage-plugin-apk-url ' , 'URL to apk storage plugin' , String) - .option('-u, --connect-sub ' - , 'sub endpoint' - , cliutil.list) - .option('-c, --connect-push ' - , 'push endpoint' - , cliutil.list) .option('-d, --disable-watch' , 'disable watching resources') .action(function(options) { @@ -426,6 +423,9 @@ program if (!options.authUrl) { this.missingArgument('--auth-url') } + if (!options.websocketUrl) { + this.missingArgument('--websocket-url') + } if (!options.storageUrl) { this.missingArgument('--storage-url') } @@ -435,6 +435,51 @@ program if (!options.storagePluginApkUrl) { this.missingArgument('--storage-plugin-apk-url') } + + require('./roles/app')({ + port: options.port + , secret: options.secret + , ssid: options.ssid + , authUrl: options.authUrl + , websocketUrl: options.websocketUrl + , storageUrl: options.storageUrl + , storagePluginImageUrl: options.storagePluginImageUrl + , storagePluginApkUrl: options.storagePluginApkUrl + , disableWatch: options.disableWatch + }) + }) + +program + .command('websocket') + .description('start websocket') + .option('-p, --port ' + , 'port (or $PORT)' + , Number + , process.env.PORT || 7110) + .option('-s, --secret ' + , 'secret (or $SECRET)' + , String + , process.env.SECRET) + .option('-i, --ssid ' + , 'session SSID (or $SSID)' + , String + , process.env.SSID || 'ssid') + .option('-r, --storage-url ' + , 'URL to storage client' + , String) + .option('-u, --connect-sub ' + , 'sub endpoint' + , cliutil.list) + .option('-c, --connect-push ' + , 'push endpoint' + , cliutil.list) + .action(function(options) { + if (!options.secret) { + this.missingArgument('--secret') + } + if (!options.storageUrl) { + this.missingArgument('--storage-url') + } if (!options.connectSub) { this.missingArgument('--connect-sub') } @@ -442,19 +487,15 @@ program this.missingArgument('--connect-push') } - require('./roles/app')({ + require('./roles/websocket')({ port: options.port , secret: options.secret , ssid: options.ssid - , authUrl: options.authUrl , storageUrl: options.storageUrl - , storagePluginImageUrl: options.storagePluginImageUrl - , storagePluginApkUrl: options.storagePluginApkUrl , endpoints: { sub: options.connectSub , push: options.connectPush } - , disableWatch: options.disableWatch }) }) @@ -621,6 +662,10 @@ program , 'app port' , Number , 7100) + .option('--websocket-port ' + , 'websocket port' + , Number + , 7110) .option('--storage-port ' , 'storage port' , Number @@ -729,14 +774,17 @@ program , options.publicIp , options.authPort ) + , '--websocket-url', util.format( + 'http://%s:%d/' + , options.publicIp + , options.websocketPort + ) , '--storage-url' , util.format('http://localhost:%d/', options.storagePort) , '--storage-plugin-image-url' , util.format('http://localhost:%d/', options.storagePluginImagePort) , '--storage-plugin-apk-url' , util.format('http://localhost:%d/', options.storagePluginApkPort) - , '--connect-sub', options.bindAppPub - , '--connect-push', options.bindAppPull ].concat((function() { var extra = [] if (options.disableWatch) { @@ -745,6 +793,17 @@ program return extra })())) + // websocket + , procutil.fork(__filename, [ + 'websocket' + , '--port', options.websocketPort + , '--secret', options.authSecret + , '--storage-url' + , util.format('http://localhost:%d/', options.storagePort) + , '--connect-sub', options.bindAppPub + , '--connect-push', options.bindAppPull + ]) + // storage , procutil.fork(__filename, [ 'storage-temp' diff --git a/lib/roles/app.js b/lib/roles/app.js index f06b78e3..2889cf91 100644 --- a/lib/roles/app.js +++ b/lib/roles/app.js @@ -1,7 +1,5 @@ var http = require('http') -var events = require('events') var path = require('path') -var util = require('util') var express = require('express') var validator = require('express-validator') @@ -10,38 +8,24 @@ var bodyParser = require('body-parser') var serveFavicon = require('serve-favicon') var serveStatic = require('serve-static') var csrf = require('csurf') -var socketio = require('socket.io') -var zmq = require('zmq') var Promise = require('bluebird') var httpProxy = require('http-proxy') -var _ = require('lodash') -var request = Promise.promisifyAll(require('request')) -var proxyaddr = require('proxy-addr') var logger = require('../util/logger') var pathutil = require('../util/pathutil') -var wire = require('../wire') -var wireutil = require('../wire/util') -var wirerouter = require('../wire/router') var dbapi = require('../db/api') var datautil = require('../util/datautil') -var auth = require('../middleware/auth') -var webpack = require('../middleware/webpack') -var deviceIconMiddleware = require('../middleware/device-icons') -var browserIconMiddleware = require('../middleware/browser-icons') +var auth = require('./app/middleware/auth') +var webpack = require('./app/middleware/webpack') +var deviceIconMiddleware = require('./app/middleware/device-icons') +var browserIconMiddleware = require('./app/middleware/browser-icons') module.exports = function(options) { var log = logger.createLogger('app') , app = express() , server = http.createServer(app) - , io = socketio.listen(server, { - serveClient: false - , transports: ['websocket'] - }) - , channelRouter = new events.EventEmitter() , proxy = httpProxy.createProxyServer() - , sessionMiddleware proxy.on('error', function(err) { log.error('Proxy had an error', err.stack) @@ -79,15 +63,21 @@ module.exports = function(options) { app.use(serveFavicon(pathutil.resource( 'bower_components/stf-graphics/logo/exports/STF-128.png'))) - app.use(sessionMiddleware = cookieSession({ + app.use(cookieSession({ name: options.ssid , keys: [options.secret] })) + app.use(auth({ secret: options.secret , authUrl: options.authUrl })) + // Variables for templates + app.locals.APP = { + websocketUrl: options.websocketUrl + } + // Proxied requests must come before any body parsers. These proxies are // here mainly for convenience, they should be replaced with proper reverse // proxies in production. @@ -113,30 +103,6 @@ module.exports = function(options) { app.use(csrf()) app.use(validator()) - // Output - var push = zmq.socket('push') - options.endpoints.push.forEach(function(endpoint) { - log.info('Sending output to %s', endpoint) - push.connect(endpoint) - }) - - // Input - var sub = zmq.socket('sub') - options.endpoints.sub.forEach(function(endpoint) { - log.info('Receiving input from %s', endpoint) - sub.connect(endpoint) - }) - - // Establish always-on channels - ;[wireutil.global].forEach(function(channel) { - log.info('Subscribing to permanent channel "%s"', channel) - sub.subscribe(channel) - }) - - sub.on('message', function(channel, data) { - channelRouter.emit(channel.toString(), channel, data) - }) - app.get('/partials/*', function(req, res) { var whitelist = { 'devices/index': true @@ -232,541 +198,6 @@ module.exports = function(options) { }) }) - io.use(function(socket, next) { - var req = socket.request - , res = Object.create(null) - sessionMiddleware(req, res, next) - }) - - io.use(function(socket, next) { - var req = socket.request - // This is similar to what Express does behind the scenes - req.ip = proxyaddr(req, app.get('trust proxy fn')) - next() - }) - - io.use(function(socket, next) { - var req = socket.request - , token = req.session.jwt - if (token) { - return dbapi.loadUser(token.email) - .then(function(user) { - if (user) { - req.user = user - next() - } - else { - next(new Error('Invalid user')) - } - }) - .catch(next) - } - else { - next(new Error('Missing authorization token')) - } - }) - - io.on('connection', function(socket) { - var channels = [] - , user = socket.request.user - - socket.emit('socket.ip', socket.request.ip) - - function joinChannel(channel) { - channels.push(channel) - channelRouter.on(channel, messageListener) - sub.subscribe(channel) - } - - function leaveChannel(channel) { - _.pull(channels, channel) - channelRouter.removeListener(channel, messageListener) - sub.unsubscribe(channel) - } - - function createTouchHandler(Klass) { - return function(channel, data) { - push.send([ - channel - , wireutil.envelope(new Klass( - data.seq - , data.x - , data.y - )) - ]) - } - } - - function createKeyHandler(Klass) { - return function(channel, data) { - push.send([ - channel - , wireutil.envelope(new Klass( - data.key - )) - ]) - } - } - - var messageListener = wirerouter() - .on(wire.DeviceLogMessage, function(channel, message) { - socket.emit('device.log', message) - }) - .on(wire.DevicePresentMessage, function(channel, message) { - socket.emit('device.add', { - important: true - , data: { - serial: message.serial - , present: true - } - }) - }) - .on(wire.DeviceAbsentMessage, function(channel, message) { - socket.emit('device.remove', { - important: true - , data: { - serial: message.serial - , present: false - , ready: false - , lastHeartbeatAt: null - , using: false - } - }) - }) - .on(wire.JoinGroupMessage, function(channel, message) { - socket.emit('device.change', { - important: true - , data: datautil.applyOwner({ - serial: message.serial - , owner: message.owner - } - , user - ) - }) - }) - .on(wire.LeaveGroupMessage, function(channel, message) { - socket.emit('device.change', { - important: true - , data: datautil.applyOwner({ - serial: message.serial - , owner: null - } - , user - ) - }) - }) - .on(wire.DeviceStatusMessage, function(channel, message) { - socket.emit('device.change', { - important: true - , data: message - }) - }) - .on(wire.DeviceIdentityMessage, function(channel, message) { - datautil.applyData(message) - message.ready = true - socket.emit('device.change', { - important: true - , data: message - }) - }) - .on(wire.TransactionProgressMessage, function(channel, message) { - socket.emit('tx.progress', channel.toString(), message) - }) - .on(wire.TransactionDoneMessage, function(channel, message) { - socket.emit('tx.done', channel.toString(), message) - }) - .on(wire.DeviceLogcatEntryMessage, function(channel, message) { - socket.emit('logcat.entry', message) - }) - .on(wire.AirplaneModeEvent, function(channel, message) { - socket.emit('device.change', { - important: true - , data: { - serial: message.serial - , airplaneMode: message.enabled - } - }) - }) - .on(wire.BatteryEvent, function(channel, message) { - var serial = message.serial - delete message.serial - socket.emit('device.change', { - important: false - , data: { - serial: serial - , battery: message - } - }) - }) - .on(wire.DeviceBrowserMessage, function(channel, message) { - var serial = message.serial - delete message.serial - socket.emit('device.change', { - important: true - , data: datautil.applyBrowsers({ - serial: serial - , browser: message - }) - }) - }) - .on(wire.ConnectivityEvent, function(channel, message) { - var serial = message.serial - delete message.serial - socket.emit('device.change', { - important: false - , data: { - serial: serial - , network: message - } - }) - }) - .on(wire.PhoneStateEvent, function(channel, message) { - var serial = message.serial - delete message.serial - socket.emit('device.change', { - important: false - , data: { - serial: serial - , network: message - } - }) - }) - .on(wire.RotationEvent, function(channel, message) { - socket.emit('device.change', { - important: false - , data: { - serial: message.serial - , display: { - orientation: message.rotation - } - } - }) - }) - .handler() - - // Global messages - // - // @todo Use socket.io to push global events to all clients instead - // of listening on every connection, otherwise we're very likely to - // hit EventEmitter's leak complaints (plus it's more work) - channelRouter.on(wireutil.global, messageListener) - - // User's private group - joinChannel(user.group) - - new Promise(function(resolve) { - socket.on('disconnect', resolve) - // Touch events - .on('input.touchDown', createTouchHandler(wire.TouchDownMessage)) - .on('input.touchMove', createTouchHandler(wire.TouchMoveMessage)) - .on('input.touchUp', createTouchHandler(wire.TouchUpMessage)) - .on('input.tap', createTouchHandler(wire.TapMessage)) - // Key events - .on('input.keyDown', createKeyHandler(wire.KeyDownMessage)) - .on('input.keyUp', createKeyHandler(wire.KeyUpMessage)) - .on('input.keyPress', createKeyHandler(wire.KeyPressMessage)) - .on('input.type', function(channel, data) { - push.send([ - channel - , wireutil.envelope(new wire.TypeMessage( - data.text - )) - ]) - }) - .on('display.rotate', function(channel, data) { - push.send([ - channel - , wireutil.envelope(new wire.RotateMessage( - data.rotation - )) - ]) - }) - // Transactions - .on('clipboard.paste', function(channel, responseChannel, data) { - joinChannel(responseChannel) - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.PasteMessage(data.text) - ) - ]) - }) - .on('clipboard.copy', function(channel, responseChannel) { - joinChannel(responseChannel) - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.CopyMessage() - ) - ]) - }) - .on('device.identify', function(channel, responseChannel) { - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.PhysicalIdentifyMessage() - ) - ]) - }) - .on('group.invite', function(channel, responseChannel, data) { - joinChannel(responseChannel) - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.GroupMessage( - new wire.OwnerMessage( - user.email - , user.name - , user.group - ) - , data.timeout || null - , wireutil.toDeviceRequirements(data.requirements) - ) - ) - ]) - }) - .on('group.kick', function(channel, responseChannel, data) { - joinChannel(responseChannel) - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.UngroupMessage( - wireutil.toDeviceRequirements(data.requirements) - ) - ) - ]) - }) - .on('tx.cleanup', function(channel) { - leaveChannel(channel) - }) - .on('tx.punch', function(channel) { - joinChannel(channel) - socket.emit('tx.punch', channel) - }) - .on('shell.command', function(channel, responseChannel, data) { - joinChannel(responseChannel) - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.ShellCommandMessage(data) - ) - ]) - }) - .on('shell.keepalive', function(channel, data) { - push.send([ - channel - , wireutil.envelope(new wire.ShellKeepAliveMessage(data)) - ]) - }) - .on('device.install', function(channel, responseChannel, data) { - joinChannel(responseChannel) - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.InstallMessage( - data.href - , data.launch === true - , JSON.stringify(data.manifest) - ) - ) - ]) - }) - .on('device.uninstall', function(channel, responseChannel, data) { - joinChannel(responseChannel) - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.UninstallMessage(data) - ) - ]) - }) - .on('storage.upload', function(channel, responseChannel, data) { - joinChannel(responseChannel) - request.postAsync({ - url: util.format( - '%sapi/v1/resources?channel=%s' - , options.storageUrl - , responseChannel - ) - , json: true - , body: { - url: data.url - } - }) - .catch(function(err) { - log.error('Storage upload had an error', err.stack) - leaveChannel(responseChannel) - socket.emit('tx.cancel', responseChannel, { - success: false - , data: 'fail_upload' - }) - }) - }) - .on('forward.test', function(channel, responseChannel, data) { - joinChannel(responseChannel) - if (!data.targetHost || data.targetHost === 'localhost') { - data.targetHost = socket.request.ip - } - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.ForwardTestMessage(data) - ) - ]) - }) - .on('forward.create', function(channel, responseChannel, data) { - if (!data.targetHost || data.targetHost === 'localhost') { - data.targetHost = socket.request.ip - } - dbapi.addUserForward(user.email, data) - .then(function() { - socket.emit('forward.create', data) - joinChannel(responseChannel) - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.ForwardCreateMessage(data) - ) - ]) - }) - }) - .on('forward.remove', function(channel, responseChannel, data) { - dbapi.removeUserForward(user.email, data.devicePort) - .then(function() { - socket.emit('forward.remove', data) - joinChannel(responseChannel) - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.ForwardRemoveMessage(data) - ) - ]) - }) - }) - .on('logcat.start', function(channel, responseChannel, data) { - joinChannel(responseChannel) - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.LogcatStartMessage(data) - ) - ]) - }) - .on('logcat.stop', function(channel, responseChannel) { - joinChannel(responseChannel) - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.LogcatStopMessage() - ) - ]) - }) - .on('browser.open', function(channel, responseChannel, data) { - joinChannel(responseChannel) - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.BrowserOpenMessage(data) - ) - ]) - }) - .on('browser.clear', function(channel, responseChannel, data) { - joinChannel(responseChannel) - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.BrowserClearMessage(data) - ) - ]) - }) - .on('store.open', function(channel, responseChannel) { - joinChannel(responseChannel) - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.StoreOpenMessage() - ) - ]) - }) - .on('screen.capture', function(channel, responseChannel) { - joinChannel(responseChannel) - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.ScreenCaptureMessage() - ) - ]) - }) - }) - .finally(function() { - // Clean up all listeners and subscriptions - channelRouter.removeListener(wireutil.global, messageListener) - channels.forEach(function(channel) { - channelRouter.removeListener(channel, messageListener) - sub.unsubscribe(channel) - }) - }) - .catch(function(err) { - // Cannot guarantee integrity of client - log.error( - 'Client had an error, disconnecting due to probable loss of integrity' - , err.stack - ) - - socket.disconnect(true) - }) - - - /* - socket.on('flick', function(data) {}) - socket.on('back', function(data) {}) - socket.on('forward', function(data) {}) - socket.on('refresh', function(data) {}) - socket.on('internal.relaunch', function(data) {}) - socket.on('internal.clear', function(data) {}) - socket.on('selenium.setCookie', function(data) {}) - socket.on('selenium.deleteCookie', function(data) {}) - socket.on('selenium.deleteAllCookies', function(data) {}) - socket.on('debug.benchmark.pull.start', function(data) {}) - socket.on('debug.benchmark.pull.stop', function(data) {}) - socket.on('logcat', function(data) {}) - socket.on('debug.benchmark.pull.rate', function(data) {}) - socket.on('cpu.monitor.load', function(data) {}) - - socket.on('safeExecute', function(data) {}) - socket.on('eval', function(data) {}) - socket.on('safeEval', function(data) {}) - socket.on('executeAsync', function(data) {}) - socket.on('safeExecuteAsync', function(data) {}) - socket.on('execute', function(data) {}) - socket.on('screenshot', function(data) {}) - socket.on('selenium.screenshot', function(data) {}) - socket.on('url', function(data) {}) - socket.on('selenium.allCookies', function(data) {}) - */ - //this._react 'selenium.weinre', => - // this._runTransaction 'selenium.weinre', - // targetHost: conf.weinre.httpHost - // targetPort: conf.weinre.httpPort - }) - server.listen(options.port) log.info('Listening on port %d', options.port) } diff --git a/lib/middleware/auth.js b/lib/roles/app/middleware/auth.js similarity index 90% rename from lib/middleware/auth.js rename to lib/roles/app/middleware/auth.js index 2da5c431..92984c5c 100644 --- a/lib/middleware/auth.js +++ b/lib/roles/app/middleware/auth.js @@ -1,7 +1,7 @@ -var jwtutil = require('../util/jwtutil') -var urlutil = require('../util/urlutil') +var jwtutil = require('../../../util/jwtutil') +var urlutil = require('../../../util/urlutil') -var dbapi = require('../db/api') +var dbapi = require('../../../db/api') module.exports = function(options) { return function(req, res, next) { diff --git a/lib/middleware/browser-icons.js b/lib/roles/app/middleware/browser-icons.js similarity index 75% rename from lib/middleware/browser-icons.js rename to lib/roles/app/middleware/browser-icons.js index e0b571a2..aefe027f 100644 --- a/lib/middleware/browser-icons.js +++ b/lib/roles/app/middleware/browser-icons.js @@ -1,6 +1,6 @@ var express = require('express') -var pathutil = require('../util/pathutil') +var pathutil = require('../../../util/pathutil') module.exports = function() { return express.static( diff --git a/lib/middleware/device-icons.js b/lib/roles/app/middleware/device-icons.js similarity index 74% rename from lib/middleware/device-icons.js rename to lib/roles/app/middleware/device-icons.js index bc604ade..c875c356 100644 --- a/lib/middleware/device-icons.js +++ b/lib/roles/app/middleware/device-icons.js @@ -1,6 +1,6 @@ var express = require('express') -var pathutil = require('../util/pathutil') +var pathutil = require('../../../util/pathutil') module.exports = function() { return express.static( diff --git a/lib/middleware/webpack.js b/lib/roles/app/middleware/webpack.js similarity index 94% rename from lib/middleware/webpack.js rename to lib/roles/app/middleware/webpack.js index 5bd4243f..215468e4 100644 --- a/lib/middleware/webpack.js +++ b/lib/roles/app/middleware/webpack.js @@ -9,9 +9,9 @@ var MemoryOutputFileSystem = require('webpack/lib/MemoryOutputFileSystem') var MemoryInputFileSystem = require('webpack/node_modules/enhanced-resolve/lib/MemoryInputFileSystem') -var logger = require('../util/logger') -var lifecycle = require('../util/lifecycle') -var globalOptions = require('../../webpack.config') +var logger = require('../../../util/logger') +var lifecycle = require('../../../util/lifecycle') +var globalOptions = require('../../../../webpack.config') // Similar to webpack-dev-middleware, but integrates with our custom // lifecycle, behaves more like normal express middleware, and removes diff --git a/lib/roles/websocket.js b/lib/roles/websocket.js new file mode 100644 index 00000000..b322b4cf --- /dev/null +++ b/lib/roles/websocket.js @@ -0,0 +1,570 @@ +var http = require('http') +var events = require('events') +var util = require('util') + +var socketio = require('socket.io') +var zmq = require('zmq') +var Promise = require('bluebird') +var _ = require('lodash') +var request = Promise.promisifyAll(require('request')) + +var logger = require('../util/logger') +var wire = require('../wire') +var wireutil = require('../wire/util') +var wirerouter = require('../wire/router') +var dbapi = require('../db/api') +var datautil = require('../util/datautil') +var cookieSession = require('./websocket/middleware/cookie-session') +var ip = require('./websocket/middleware/remote-ip') +var auth = require('./websocket/middleware/auth') + +module.exports = function(options) { + var log = logger.createLogger('websocket') + , server = http.createServer() + , io = socketio.listen(server, { + serveClient: false + , transports: ['websocket'] + }) + , channelRouter = new events.EventEmitter() + + // Output + var push = zmq.socket('push') + options.endpoints.push.forEach(function(endpoint) { + log.info('Sending output to %s', endpoint) + push.connect(endpoint) + }) + + // Input + var sub = zmq.socket('sub') + options.endpoints.sub.forEach(function(endpoint) { + log.info('Receiving input from %s', endpoint) + sub.connect(endpoint) + }) + + // Establish always-on channels + ;[wireutil.global].forEach(function(channel) { + log.info('Subscribing to permanent channel "%s"', channel) + sub.subscribe(channel) + }) + + sub.on('message', function(channel, data) { + channelRouter.emit(channel.toString(), channel, data) + }) + + io.use(cookieSession({ + name: options.ssid + , keys: [options.secret] + })) + + io.use(ip({ + trust: function() { + return true + } + })) + + io.use(auth) + + io.on('connection', function(socket) { + var channels = [] + , user = socket.request.user + + socket.emit('socket.ip', socket.request.ip) + + function joinChannel(channel) { + channels.push(channel) + channelRouter.on(channel, messageListener) + sub.subscribe(channel) + } + + function leaveChannel(channel) { + _.pull(channels, channel) + channelRouter.removeListener(channel, messageListener) + sub.unsubscribe(channel) + } + + function createTouchHandler(Klass) { + return function(channel, data) { + push.send([ + channel + , wireutil.envelope(new Klass( + data.seq + , data.x + , data.y + )) + ]) + } + } + + function createKeyHandler(Klass) { + return function(channel, data) { + push.send([ + channel + , wireutil.envelope(new Klass( + data.key + )) + ]) + } + } + + var messageListener = wirerouter() + .on(wire.DeviceLogMessage, function(channel, message) { + socket.emit('device.log', message) + }) + .on(wire.DevicePresentMessage, function(channel, message) { + socket.emit('device.add', { + important: true + , data: { + serial: message.serial + , present: true + } + }) + }) + .on(wire.DeviceAbsentMessage, function(channel, message) { + socket.emit('device.remove', { + important: true + , data: { + serial: message.serial + , present: false + , ready: false + , lastHeartbeatAt: null + , using: false + } + }) + }) + .on(wire.JoinGroupMessage, function(channel, message) { + socket.emit('device.change', { + important: true + , data: datautil.applyOwner({ + serial: message.serial + , owner: message.owner + } + , user + ) + }) + }) + .on(wire.LeaveGroupMessage, function(channel, message) { + socket.emit('device.change', { + important: true + , data: datautil.applyOwner({ + serial: message.serial + , owner: null + } + , user + ) + }) + }) + .on(wire.DeviceStatusMessage, function(channel, message) { + socket.emit('device.change', { + important: true + , data: message + }) + }) + .on(wire.DeviceIdentityMessage, function(channel, message) { + datautil.applyData(message) + message.ready = true + socket.emit('device.change', { + important: true + , data: message + }) + }) + .on(wire.TransactionProgressMessage, function(channel, message) { + socket.emit('tx.progress', channel.toString(), message) + }) + .on(wire.TransactionDoneMessage, function(channel, message) { + socket.emit('tx.done', channel.toString(), message) + }) + .on(wire.DeviceLogcatEntryMessage, function(channel, message) { + socket.emit('logcat.entry', message) + }) + .on(wire.AirplaneModeEvent, function(channel, message) { + socket.emit('device.change', { + important: true + , data: { + serial: message.serial + , airplaneMode: message.enabled + } + }) + }) + .on(wire.BatteryEvent, function(channel, message) { + var serial = message.serial + delete message.serial + socket.emit('device.change', { + important: false + , data: { + serial: serial + , battery: message + } + }) + }) + .on(wire.DeviceBrowserMessage, function(channel, message) { + var serial = message.serial + delete message.serial + socket.emit('device.change', { + important: true + , data: datautil.applyBrowsers({ + serial: serial + , browser: message + }) + }) + }) + .on(wire.ConnectivityEvent, function(channel, message) { + var serial = message.serial + delete message.serial + socket.emit('device.change', { + important: false + , data: { + serial: serial + , network: message + } + }) + }) + .on(wire.PhoneStateEvent, function(channel, message) { + var serial = message.serial + delete message.serial + socket.emit('device.change', { + important: false + , data: { + serial: serial + , network: message + } + }) + }) + .on(wire.RotationEvent, function(channel, message) { + socket.emit('device.change', { + important: false + , data: { + serial: message.serial + , display: { + orientation: message.rotation + } + } + }) + }) + .handler() + + // Global messages + // + // @todo Use socket.io to push global events to all clients instead + // of listening on every connection, otherwise we're very likely to + // hit EventEmitter's leak complaints (plus it's more work) + channelRouter.on(wireutil.global, messageListener) + + // User's private group + joinChannel(user.group) + + new Promise(function(resolve) { + socket.on('disconnect', resolve) + // Touch events + .on('input.touchDown', createTouchHandler(wire.TouchDownMessage)) + .on('input.touchMove', createTouchHandler(wire.TouchMoveMessage)) + .on('input.touchUp', createTouchHandler(wire.TouchUpMessage)) + .on('input.tap', createTouchHandler(wire.TapMessage)) + // Key events + .on('input.keyDown', createKeyHandler(wire.KeyDownMessage)) + .on('input.keyUp', createKeyHandler(wire.KeyUpMessage)) + .on('input.keyPress', createKeyHandler(wire.KeyPressMessage)) + .on('input.type', function(channel, data) { + push.send([ + channel + , wireutil.envelope(new wire.TypeMessage( + data.text + )) + ]) + }) + .on('display.rotate', function(channel, data) { + push.send([ + channel + , wireutil.envelope(new wire.RotateMessage( + data.rotation + )) + ]) + }) + // Transactions + .on('clipboard.paste', function(channel, responseChannel, data) { + joinChannel(responseChannel) + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.PasteMessage(data.text) + ) + ]) + }) + .on('clipboard.copy', function(channel, responseChannel) { + joinChannel(responseChannel) + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.CopyMessage() + ) + ]) + }) + .on('device.identify', function(channel, responseChannel) { + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.PhysicalIdentifyMessage() + ) + ]) + }) + .on('group.invite', function(channel, responseChannel, data) { + joinChannel(responseChannel) + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.GroupMessage( + new wire.OwnerMessage( + user.email + , user.name + , user.group + ) + , data.timeout || null + , wireutil.toDeviceRequirements(data.requirements) + ) + ) + ]) + }) + .on('group.kick', function(channel, responseChannel, data) { + joinChannel(responseChannel) + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.UngroupMessage( + wireutil.toDeviceRequirements(data.requirements) + ) + ) + ]) + }) + .on('tx.cleanup', function(channel) { + leaveChannel(channel) + }) + .on('tx.punch', function(channel) { + joinChannel(channel) + socket.emit('tx.punch', channel) + }) + .on('shell.command', function(channel, responseChannel, data) { + joinChannel(responseChannel) + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.ShellCommandMessage(data) + ) + ]) + }) + .on('shell.keepalive', function(channel, data) { + push.send([ + channel + , wireutil.envelope(new wire.ShellKeepAliveMessage(data)) + ]) + }) + .on('device.install', function(channel, responseChannel, data) { + joinChannel(responseChannel) + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.InstallMessage( + data.href + , data.launch === true + , JSON.stringify(data.manifest) + ) + ) + ]) + }) + .on('device.uninstall', function(channel, responseChannel, data) { + joinChannel(responseChannel) + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.UninstallMessage(data) + ) + ]) + }) + .on('storage.upload', function(channel, responseChannel, data) { + joinChannel(responseChannel) + request.postAsync({ + url: util.format( + '%sapi/v1/resources?channel=%s' + , options.storageUrl + , responseChannel + ) + , json: true + , body: { + url: data.url + } + }) + .catch(function(err) { + log.error('Storage upload had an error', err.stack) + leaveChannel(responseChannel) + socket.emit('tx.cancel', responseChannel, { + success: false + , data: 'fail_upload' + }) + }) + }) + .on('forward.test', function(channel, responseChannel, data) { + joinChannel(responseChannel) + if (!data.targetHost || data.targetHost === 'localhost') { + data.targetHost = socket.request.ip + } + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.ForwardTestMessage(data) + ) + ]) + }) + .on('forward.create', function(channel, responseChannel, data) { + if (!data.targetHost || data.targetHost === 'localhost') { + data.targetHost = socket.request.ip + } + dbapi.addUserForward(user.email, data) + .then(function() { + socket.emit('forward.create', data) + joinChannel(responseChannel) + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.ForwardCreateMessage(data) + ) + ]) + }) + }) + .on('forward.remove', function(channel, responseChannel, data) { + dbapi.removeUserForward(user.email, data.devicePort) + .then(function() { + socket.emit('forward.remove', data) + joinChannel(responseChannel) + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.ForwardRemoveMessage(data) + ) + ]) + }) + }) + .on('logcat.start', function(channel, responseChannel, data) { + joinChannel(responseChannel) + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.LogcatStartMessage(data) + ) + ]) + }) + .on('logcat.stop', function(channel, responseChannel) { + joinChannel(responseChannel) + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.LogcatStopMessage() + ) + ]) + }) + .on('browser.open', function(channel, responseChannel, data) { + joinChannel(responseChannel) + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.BrowserOpenMessage(data) + ) + ]) + }) + .on('browser.clear', function(channel, responseChannel, data) { + joinChannel(responseChannel) + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.BrowserClearMessage(data) + ) + ]) + }) + .on('store.open', function(channel, responseChannel) { + joinChannel(responseChannel) + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.StoreOpenMessage() + ) + ]) + }) + .on('screen.capture', function(channel, responseChannel) { + joinChannel(responseChannel) + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.ScreenCaptureMessage() + ) + ]) + }) + }) + .finally(function() { + // Clean up all listeners and subscriptions + channelRouter.removeListener(wireutil.global, messageListener) + channels.forEach(function(channel) { + channelRouter.removeListener(channel, messageListener) + sub.unsubscribe(channel) + }) + }) + .catch(function(err) { + // Cannot guarantee integrity of client + log.error( + 'Client had an error, disconnecting due to probable loss of integrity' + , err.stack + ) + + socket.disconnect(true) + }) + + + /* + socket.on('flick', function(data) {}) + socket.on('back', function(data) {}) + socket.on('forward', function(data) {}) + socket.on('refresh', function(data) {}) + socket.on('internal.relaunch', function(data) {}) + socket.on('internal.clear', function(data) {}) + socket.on('selenium.setCookie', function(data) {}) + socket.on('selenium.deleteCookie', function(data) {}) + socket.on('selenium.deleteAllCookies', function(data) {}) + socket.on('debug.benchmark.pull.start', function(data) {}) + socket.on('debug.benchmark.pull.stop', function(data) {}) + socket.on('logcat', function(data) {}) + socket.on('debug.benchmark.pull.rate', function(data) {}) + socket.on('cpu.monitor.load', function(data) {}) + + socket.on('safeExecute', function(data) {}) + socket.on('eval', function(data) {}) + socket.on('safeEval', function(data) {}) + socket.on('executeAsync', function(data) {}) + socket.on('safeExecuteAsync', function(data) {}) + socket.on('execute', function(data) {}) + socket.on('screenshot', function(data) {}) + socket.on('selenium.screenshot', function(data) {}) + socket.on('url', function(data) {}) + socket.on('selenium.allCookies', function(data) {}) + */ + //this._react 'selenium.weinre', => + // this._runTransaction 'selenium.weinre', + // targetHost: conf.weinre.httpHost + // targetPort: conf.weinre.httpPort + }) + + server.listen(options.port) + log.info('Listening on port %d', options.port) +} diff --git a/lib/roles/websocket/middleware/auth.js b/lib/roles/websocket/middleware/auth.js new file mode 100644 index 00000000..bebcb451 --- /dev/null +++ b/lib/roles/websocket/middleware/auth.js @@ -0,0 +1,22 @@ +var dbapi = require('../../../db/api') + +module.exports = function(socket, next) { + var req = socket.request + , token = req.session.jwt + if (token) { + return dbapi.loadUser(token.email) + .then(function(user) { + if (user) { + req.user = user + next() + } + else { + next(new Error('Invalid user')) + } + }) + .catch(next) + } + else { + next(new Error('Missing authorization token')) + } +} diff --git a/lib/roles/websocket/middleware/cookie-session.js b/lib/roles/websocket/middleware/cookie-session.js new file mode 100644 index 00000000..14132828 --- /dev/null +++ b/lib/roles/websocket/middleware/cookie-session.js @@ -0,0 +1,10 @@ +var cookieSession = require('cookie-session') + +module.exports = function(options) { + var session = cookieSession(options) + return function(socket, next) { + var req = socket.request + , res = Object.create(null) + session(req, res, next) + } +} diff --git a/lib/roles/websocket/middleware/remote-ip.js b/lib/roles/websocket/middleware/remote-ip.js new file mode 100644 index 00000000..9f25b839 --- /dev/null +++ b/lib/roles/websocket/middleware/remote-ip.js @@ -0,0 +1,9 @@ +var proxyaddr = require('proxy-addr') + +module.exports = function(options) { + return function(socket, next) { + var req = socket.request + req.ip = proxyaddr(req, options.trust) + next() + } +} diff --git a/res/app/components/stf/socket/socket-service.js b/res/app/components/stf/socket/socket-service.js index d1b07ca1..8e473464 100644 --- a/res/app/components/stf/socket/socket-service.js +++ b/res/app/components/stf/socket/socket-service.js @@ -1,7 +1,9 @@ var io = require('socket.io') module.exports = function SocketFactory($rootScope, VersionUpdateService) { - var socket = io('/', { + /*globals APP:false*/ + + var socket = io(APP.websocketUrl, { reconnection: false , transports: ['websocket'] }) diff --git a/res/app/views/index.jade b/res/app/views/index.jade index 75367ee5..b130cc61 100644 --- a/res/app/views/index.jade +++ b/res/app/views/index.jade @@ -22,5 +22,6 @@ html div(growl) div(ng-view).fill-height + script var APP = !{JSON.stringify(APP)} script(src='/static/build/bundle.js') script(src='/static/bower_components/stf-analytics/analytics.js')