diff --git a/doc/topo-v1.ditaa b/doc/topo-v1.ditaa index 047bf79a..bbbabd2d 100644 --- a/doc/topo-v1.ditaa +++ b/doc/topo-v1.ditaa @@ -1,5 +1,5 @@ /------------\ /------------\ /------------\ - | app | | app | | app | + | websocket | | websocket | | websocket | |------------| |------------| |------------| x N | PUSH | SUB | | PUSH | SUB | | PUSH | SUB | \------------/ \------------/ \------------/ diff --git a/lib/cli.js b/lib/cli.js index 91d9964e..ff098236 100644 --- a/lib/cli.js +++ b/lib/cli.js @@ -402,6 +402,9 @@ program .option('-a, --auth-url ' , 'URL to auth client' , String) + .option('-w, --websocket-url ' + , 'URL to websocket client' + , String) .option('-r, --storage-url ' , 'URL to storage client' , String) @@ -411,12 +414,6 @@ program .option('--storage-plugin-apk-url ' , 'URL to apk storage plugin' , String) - .option('-u, --connect-sub ' - , 'sub endpoint' - , cliutil.list) - .option('-c, --connect-push ' - , 'push endpoint' - , cliutil.list) .option('-d, --disable-watch' , 'disable watching resources') .action(function(options) { @@ -426,6 +423,9 @@ program if (!options.authUrl) { this.missingArgument('--auth-url') } + if (!options.websocketUrl) { + this.missingArgument('--websocket-url') + } if (!options.storageUrl) { this.missingArgument('--storage-url') } @@ -435,6 +435,51 @@ program if (!options.storagePluginApkUrl) { this.missingArgument('--storage-plugin-apk-url') } + + require('./roles/app')({ + port: options.port + , secret: options.secret + , ssid: options.ssid + , authUrl: options.authUrl + , websocketUrl: options.websocketUrl + , storageUrl: options.storageUrl + , storagePluginImageUrl: options.storagePluginImageUrl + , storagePluginApkUrl: options.storagePluginApkUrl + , disableWatch: options.disableWatch + }) + }) + +program + .command('websocket') + .description('start websocket') + .option('-p, --port ' + , 'port (or $PORT)' + , Number + , process.env.PORT || 7110) + .option('-s, --secret ' + , 'secret (or $SECRET)' + , String + , process.env.SECRET) + .option('-i, --ssid ' + , 'session SSID (or $SSID)' + , String + , process.env.SSID || 'ssid') + .option('-r, --storage-url ' + , 'URL to storage client' + , String) + .option('-u, --connect-sub ' + , 'sub endpoint' + , cliutil.list) + .option('-c, --connect-push ' + , 'push endpoint' + , cliutil.list) + .action(function(options) { + if (!options.secret) { + this.missingArgument('--secret') + } + if (!options.storageUrl) { + this.missingArgument('--storage-url') + } if (!options.connectSub) { this.missingArgument('--connect-sub') } @@ -442,19 +487,15 @@ program this.missingArgument('--connect-push') } - require('./roles/app')({ + require('./roles/websocket')({ port: options.port , secret: options.secret , ssid: options.ssid - , authUrl: options.authUrl , storageUrl: options.storageUrl - , storagePluginImageUrl: options.storagePluginImageUrl - , storagePluginApkUrl: options.storagePluginApkUrl , endpoints: { sub: options.connectSub , push: options.connectPush } - , disableWatch: options.disableWatch }) }) @@ -621,6 +662,10 @@ program , 'app port' , Number , 7100) + .option('--websocket-port ' + , 'websocket port' + , Number + , 7110) .option('--storage-port ' , 'storage port' , Number @@ -729,14 +774,17 @@ program , options.publicIp , options.authPort ) + , '--websocket-url', util.format( + 'http://%s:%d/' + , options.publicIp + , options.websocketPort + ) , '--storage-url' , util.format('http://localhost:%d/', options.storagePort) , '--storage-plugin-image-url' , util.format('http://localhost:%d/', options.storagePluginImagePort) , '--storage-plugin-apk-url' , util.format('http://localhost:%d/', options.storagePluginApkPort) - , '--connect-sub', options.bindAppPub - , '--connect-push', options.bindAppPull ].concat((function() { var extra = [] if (options.disableWatch) { @@ -745,6 +793,17 @@ program return extra })())) + // websocket + , procutil.fork(__filename, [ + 'websocket' + , '--port', options.websocketPort + , '--secret', options.authSecret + , '--storage-url' + , util.format('http://localhost:%d/', options.storagePort) + , '--connect-sub', options.bindAppPub + , '--connect-push', options.bindAppPull + ]) + // storage , procutil.fork(__filename, [ 'storage-temp' diff --git a/lib/roles/app.js b/lib/roles/app.js index f06b78e3..2889cf91 100644 --- a/lib/roles/app.js +++ b/lib/roles/app.js @@ -1,7 +1,5 @@ var http = require('http') -var events = require('events') var path = require('path') -var util = require('util') var express = require('express') var validator = require('express-validator') @@ -10,38 +8,24 @@ var bodyParser = require('body-parser') var serveFavicon = require('serve-favicon') var serveStatic = require('serve-static') var csrf = require('csurf') -var socketio = require('socket.io') -var zmq = require('zmq') var Promise = require('bluebird') var httpProxy = require('http-proxy') -var _ = require('lodash') -var request = Promise.promisifyAll(require('request')) -var proxyaddr = require('proxy-addr') var logger = require('../util/logger') var pathutil = require('../util/pathutil') -var wire = require('../wire') -var wireutil = require('../wire/util') -var wirerouter = require('../wire/router') var dbapi = require('../db/api') var datautil = require('../util/datautil') -var auth = require('../middleware/auth') -var webpack = require('../middleware/webpack') -var deviceIconMiddleware = require('../middleware/device-icons') -var browserIconMiddleware = require('../middleware/browser-icons') +var auth = require('./app/middleware/auth') +var webpack = require('./app/middleware/webpack') +var deviceIconMiddleware = require('./app/middleware/device-icons') +var browserIconMiddleware = require('./app/middleware/browser-icons') module.exports = function(options) { var log = logger.createLogger('app') , app = express() , server = http.createServer(app) - , io = socketio.listen(server, { - serveClient: false - , transports: ['websocket'] - }) - , channelRouter = new events.EventEmitter() , proxy = httpProxy.createProxyServer() - , sessionMiddleware proxy.on('error', function(err) { log.error('Proxy had an error', err.stack) @@ -79,15 +63,21 @@ module.exports = function(options) { app.use(serveFavicon(pathutil.resource( 'bower_components/stf-graphics/logo/exports/STF-128.png'))) - app.use(sessionMiddleware = cookieSession({ + app.use(cookieSession({ name: options.ssid , keys: [options.secret] })) + app.use(auth({ secret: options.secret , authUrl: options.authUrl })) + // Variables for templates + app.locals.APP = { + websocketUrl: options.websocketUrl + } + // Proxied requests must come before any body parsers. These proxies are // here mainly for convenience, they should be replaced with proper reverse // proxies in production. @@ -113,30 +103,6 @@ module.exports = function(options) { app.use(csrf()) app.use(validator()) - // Output - var push = zmq.socket('push') - options.endpoints.push.forEach(function(endpoint) { - log.info('Sending output to %s', endpoint) - push.connect(endpoint) - }) - - // Input - var sub = zmq.socket('sub') - options.endpoints.sub.forEach(function(endpoint) { - log.info('Receiving input from %s', endpoint) - sub.connect(endpoint) - }) - - // Establish always-on channels - ;[wireutil.global].forEach(function(channel) { - log.info('Subscribing to permanent channel "%s"', channel) - sub.subscribe(channel) - }) - - sub.on('message', function(channel, data) { - channelRouter.emit(channel.toString(), channel, data) - }) - app.get('/partials/*', function(req, res) { var whitelist = { 'devices/index': true @@ -232,541 +198,6 @@ module.exports = function(options) { }) }) - io.use(function(socket, next) { - var req = socket.request - , res = Object.create(null) - sessionMiddleware(req, res, next) - }) - - io.use(function(socket, next) { - var req = socket.request - // This is similar to what Express does behind the scenes - req.ip = proxyaddr(req, app.get('trust proxy fn')) - next() - }) - - io.use(function(socket, next) { - var req = socket.request - , token = req.session.jwt - if (token) { - return dbapi.loadUser(token.email) - .then(function(user) { - if (user) { - req.user = user - next() - } - else { - next(new Error('Invalid user')) - } - }) - .catch(next) - } - else { - next(new Error('Missing authorization token')) - } - }) - - io.on('connection', function(socket) { - var channels = [] - , user = socket.request.user - - socket.emit('socket.ip', socket.request.ip) - - function joinChannel(channel) { - channels.push(channel) - channelRouter.on(channel, messageListener) - sub.subscribe(channel) - } - - function leaveChannel(channel) { - _.pull(channels, channel) - channelRouter.removeListener(channel, messageListener) - sub.unsubscribe(channel) - } - - function createTouchHandler(Klass) { - return function(channel, data) { - push.send([ - channel - , wireutil.envelope(new Klass( - data.seq - , data.x - , data.y - )) - ]) - } - } - - function createKeyHandler(Klass) { - return function(channel, data) { - push.send([ - channel - , wireutil.envelope(new Klass( - data.key - )) - ]) - } - } - - var messageListener = wirerouter() - .on(wire.DeviceLogMessage, function(channel, message) { - socket.emit('device.log', message) - }) - .on(wire.DevicePresentMessage, function(channel, message) { - socket.emit('device.add', { - important: true - , data: { - serial: message.serial - , present: true - } - }) - }) - .on(wire.DeviceAbsentMessage, function(channel, message) { - socket.emit('device.remove', { - important: true - , data: { - serial: message.serial - , present: false - , ready: false - , lastHeartbeatAt: null - , using: false - } - }) - }) - .on(wire.JoinGroupMessage, function(channel, message) { - socket.emit('device.change', { - important: true - , data: datautil.applyOwner({ - serial: message.serial - , owner: message.owner - } - , user - ) - }) - }) - .on(wire.LeaveGroupMessage, function(channel, message) { - socket.emit('device.change', { - important: true - , data: datautil.applyOwner({ - serial: message.serial - , owner: null - } - , user - ) - }) - }) - .on(wire.DeviceStatusMessage, function(channel, message) { - socket.emit('device.change', { - important: true - , data: message - }) - }) - .on(wire.DeviceIdentityMessage, function(channel, message) { - datautil.applyData(message) - message.ready = true - socket.emit('device.change', { - important: true - , data: message - }) - }) - .on(wire.TransactionProgressMessage, function(channel, message) { - socket.emit('tx.progress', channel.toString(), message) - }) - .on(wire.TransactionDoneMessage, function(channel, message) { - socket.emit('tx.done', channel.toString(), message) - }) - .on(wire.DeviceLogcatEntryMessage, function(channel, message) { - socket.emit('logcat.entry', message) - }) - .on(wire.AirplaneModeEvent, function(channel, message) { - socket.emit('device.change', { - important: true - , data: { - serial: message.serial - , airplaneMode: message.enabled - } - }) - }) - .on(wire.BatteryEvent, function(channel, message) { - var serial = message.serial - delete message.serial - socket.emit('device.change', { - important: false - , data: { - serial: serial - , battery: message - } - }) - }) - .on(wire.DeviceBrowserMessage, function(channel, message) { - var serial = message.serial - delete message.serial - socket.emit('device.change', { - important: true - , data: datautil.applyBrowsers({ - serial: serial - , browser: message - }) - }) - }) - .on(wire.ConnectivityEvent, function(channel, message) { - var serial = message.serial - delete message.serial - socket.emit('device.change', { - important: false - , data: { - serial: serial - , network: message - } - }) - }) - .on(wire.PhoneStateEvent, function(channel, message) { - var serial = message.serial - delete message.serial - socket.emit('device.change', { - important: false - , data: { - serial: serial - , network: message - } - }) - }) - .on(wire.RotationEvent, function(channel, message) { - socket.emit('device.change', { - important: false - , data: { - serial: message.serial - , display: { - orientation: message.rotation - } - } - }) - }) - .handler() - - // Global messages - // - // @todo Use socket.io to push global events to all clients instead - // of listening on every connection, otherwise we're very likely to - // hit EventEmitter's leak complaints (plus it's more work) - channelRouter.on(wireutil.global, messageListener) - - // User's private group - joinChannel(user.group) - - new Promise(function(resolve) { - socket.on('disconnect', resolve) - // Touch events - .on('input.touchDown', createTouchHandler(wire.TouchDownMessage)) - .on('input.touchMove', createTouchHandler(wire.TouchMoveMessage)) - .on('input.touchUp', createTouchHandler(wire.TouchUpMessage)) - .on('input.tap', createTouchHandler(wire.TapMessage)) - // Key events - .on('input.keyDown', createKeyHandler(wire.KeyDownMessage)) - .on('input.keyUp', createKeyHandler(wire.KeyUpMessage)) - .on('input.keyPress', createKeyHandler(wire.KeyPressMessage)) - .on('input.type', function(channel, data) { - push.send([ - channel - , wireutil.envelope(new wire.TypeMessage( - data.text - )) - ]) - }) - .on('display.rotate', function(channel, data) { - push.send([ - channel - , wireutil.envelope(new wire.RotateMessage( - data.rotation - )) - ]) - }) - // Transactions - .on('clipboard.paste', function(channel, responseChannel, data) { - joinChannel(responseChannel) - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.PasteMessage(data.text) - ) - ]) - }) - .on('clipboard.copy', function(channel, responseChannel) { - joinChannel(responseChannel) - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.CopyMessage() - ) - ]) - }) - .on('device.identify', function(channel, responseChannel) { - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.PhysicalIdentifyMessage() - ) - ]) - }) - .on('group.invite', function(channel, responseChannel, data) { - joinChannel(responseChannel) - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.GroupMessage( - new wire.OwnerMessage( - user.email - , user.name - , user.group - ) - , data.timeout || null - , wireutil.toDeviceRequirements(data.requirements) - ) - ) - ]) - }) - .on('group.kick', function(channel, responseChannel, data) { - joinChannel(responseChannel) - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.UngroupMessage( - wireutil.toDeviceRequirements(data.requirements) - ) - ) - ]) - }) - .on('tx.cleanup', function(channel) { - leaveChannel(channel) - }) - .on('tx.punch', function(channel) { - joinChannel(channel) - socket.emit('tx.punch', channel) - }) - .on('shell.command', function(channel, responseChannel, data) { - joinChannel(responseChannel) - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.ShellCommandMessage(data) - ) - ]) - }) - .on('shell.keepalive', function(channel, data) { - push.send([ - channel - , wireutil.envelope(new wire.ShellKeepAliveMessage(data)) - ]) - }) - .on('device.install', function(channel, responseChannel, data) { - joinChannel(responseChannel) - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.InstallMessage( - data.href - , data.launch === true - , JSON.stringify(data.manifest) - ) - ) - ]) - }) - .on('device.uninstall', function(channel, responseChannel, data) { - joinChannel(responseChannel) - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.UninstallMessage(data) - ) - ]) - }) - .on('storage.upload', function(channel, responseChannel, data) { - joinChannel(responseChannel) - request.postAsync({ - url: util.format( - '%sapi/v1/resources?channel=%s' - , options.storageUrl - , responseChannel - ) - , json: true - , body: { - url: data.url - } - }) - .catch(function(err) { - log.error('Storage upload had an error', err.stack) - leaveChannel(responseChannel) - socket.emit('tx.cancel', responseChannel, { - success: false - , data: 'fail_upload' - }) - }) - }) - .on('forward.test', function(channel, responseChannel, data) { - joinChannel(responseChannel) - if (!data.targetHost || data.targetHost === 'localhost') { - data.targetHost = socket.request.ip - } - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.ForwardTestMessage(data) - ) - ]) - }) - .on('forward.create', function(channel, responseChannel, data) { - if (!data.targetHost || data.targetHost === 'localhost') { - data.targetHost = socket.request.ip - } - dbapi.addUserForward(user.email, data) - .then(function() { - socket.emit('forward.create', data) - joinChannel(responseChannel) - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.ForwardCreateMessage(data) - ) - ]) - }) - }) - .on('forward.remove', function(channel, responseChannel, data) { - dbapi.removeUserForward(user.email, data.devicePort) - .then(function() { - socket.emit('forward.remove', data) - joinChannel(responseChannel) - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.ForwardRemoveMessage(data) - ) - ]) - }) - }) - .on('logcat.start', function(channel, responseChannel, data) { - joinChannel(responseChannel) - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.LogcatStartMessage(data) - ) - ]) - }) - .on('logcat.stop', function(channel, responseChannel) { - joinChannel(responseChannel) - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.LogcatStopMessage() - ) - ]) - }) - .on('browser.open', function(channel, responseChannel, data) { - joinChannel(responseChannel) - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.BrowserOpenMessage(data) - ) - ]) - }) - .on('browser.clear', function(channel, responseChannel, data) { - joinChannel(responseChannel) - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.BrowserClearMessage(data) - ) - ]) - }) - .on('store.open', function(channel, responseChannel) { - joinChannel(responseChannel) - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.StoreOpenMessage() - ) - ]) - }) - .on('screen.capture', function(channel, responseChannel) { - joinChannel(responseChannel) - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.ScreenCaptureMessage() - ) - ]) - }) - }) - .finally(function() { - // Clean up all listeners and subscriptions - channelRouter.removeListener(wireutil.global, messageListener) - channels.forEach(function(channel) { - channelRouter.removeListener(channel, messageListener) - sub.unsubscribe(channel) - }) - }) - .catch(function(err) { - // Cannot guarantee integrity of client - log.error( - 'Client had an error, disconnecting due to probable loss of integrity' - , err.stack - ) - - socket.disconnect(true) - }) - - - /* - socket.on('flick', function(data) {}) - socket.on('back', function(data) {}) - socket.on('forward', function(data) {}) - socket.on('refresh', function(data) {}) - socket.on('internal.relaunch', function(data) {}) - socket.on('internal.clear', function(data) {}) - socket.on('selenium.setCookie', function(data) {}) - socket.on('selenium.deleteCookie', function(data) {}) - socket.on('selenium.deleteAllCookies', function(data) {}) - socket.on('debug.benchmark.pull.start', function(data) {}) - socket.on('debug.benchmark.pull.stop', function(data) {}) - socket.on('logcat', function(data) {}) - socket.on('debug.benchmark.pull.rate', function(data) {}) - socket.on('cpu.monitor.load', function(data) {}) - - socket.on('safeExecute', function(data) {}) - socket.on('eval', function(data) {}) - socket.on('safeEval', function(data) {}) - socket.on('executeAsync', function(data) {}) - socket.on('safeExecuteAsync', function(data) {}) - socket.on('execute', function(data) {}) - socket.on('screenshot', function(data) {}) - socket.on('selenium.screenshot', function(data) {}) - socket.on('url', function(data) {}) - socket.on('selenium.allCookies', function(data) {}) - */ - //this._react 'selenium.weinre', => - // this._runTransaction 'selenium.weinre', - // targetHost: conf.weinre.httpHost - // targetPort: conf.weinre.httpPort - }) - server.listen(options.port) log.info('Listening on port %d', options.port) } diff --git a/lib/middleware/auth.js b/lib/roles/app/middleware/auth.js similarity index 90% rename from lib/middleware/auth.js rename to lib/roles/app/middleware/auth.js index 2da5c431..92984c5c 100644 --- a/lib/middleware/auth.js +++ b/lib/roles/app/middleware/auth.js @@ -1,7 +1,7 @@ -var jwtutil = require('../util/jwtutil') -var urlutil = require('../util/urlutil') +var jwtutil = require('../../../util/jwtutil') +var urlutil = require('../../../util/urlutil') -var dbapi = require('../db/api') +var dbapi = require('../../../db/api') module.exports = function(options) { return function(req, res, next) { diff --git a/lib/middleware/browser-icons.js b/lib/roles/app/middleware/browser-icons.js similarity index 75% rename from lib/middleware/browser-icons.js rename to lib/roles/app/middleware/browser-icons.js index e0b571a2..aefe027f 100644 --- a/lib/middleware/browser-icons.js +++ b/lib/roles/app/middleware/browser-icons.js @@ -1,6 +1,6 @@ var express = require('express') -var pathutil = require('../util/pathutil') +var pathutil = require('../../../util/pathutil') module.exports = function() { return express.static( diff --git a/lib/middleware/device-icons.js b/lib/roles/app/middleware/device-icons.js similarity index 74% rename from lib/middleware/device-icons.js rename to lib/roles/app/middleware/device-icons.js index bc604ade..c875c356 100644 --- a/lib/middleware/device-icons.js +++ b/lib/roles/app/middleware/device-icons.js @@ -1,6 +1,6 @@ var express = require('express') -var pathutil = require('../util/pathutil') +var pathutil = require('../../../util/pathutil') module.exports = function() { return express.static( diff --git a/lib/middleware/webpack.js b/lib/roles/app/middleware/webpack.js similarity index 94% rename from lib/middleware/webpack.js rename to lib/roles/app/middleware/webpack.js index 5bd4243f..215468e4 100644 --- a/lib/middleware/webpack.js +++ b/lib/roles/app/middleware/webpack.js @@ -9,9 +9,9 @@ var MemoryOutputFileSystem = require('webpack/lib/MemoryOutputFileSystem') var MemoryInputFileSystem = require('webpack/node_modules/enhanced-resolve/lib/MemoryInputFileSystem') -var logger = require('../util/logger') -var lifecycle = require('../util/lifecycle') -var globalOptions = require('../../webpack.config') +var logger = require('../../../util/logger') +var lifecycle = require('../../../util/lifecycle') +var globalOptions = require('../../../../webpack.config') // Similar to webpack-dev-middleware, but integrates with our custom // lifecycle, behaves more like normal express middleware, and removes diff --git a/lib/roles/websocket.js b/lib/roles/websocket.js new file mode 100644 index 00000000..b322b4cf --- /dev/null +++ b/lib/roles/websocket.js @@ -0,0 +1,570 @@ +var http = require('http') +var events = require('events') +var util = require('util') + +var socketio = require('socket.io') +var zmq = require('zmq') +var Promise = require('bluebird') +var _ = require('lodash') +var request = Promise.promisifyAll(require('request')) + +var logger = require('../util/logger') +var wire = require('../wire') +var wireutil = require('../wire/util') +var wirerouter = require('../wire/router') +var dbapi = require('../db/api') +var datautil = require('../util/datautil') +var cookieSession = require('./websocket/middleware/cookie-session') +var ip = require('./websocket/middleware/remote-ip') +var auth = require('./websocket/middleware/auth') + +module.exports = function(options) { + var log = logger.createLogger('websocket') + , server = http.createServer() + , io = socketio.listen(server, { + serveClient: false + , transports: ['websocket'] + }) + , channelRouter = new events.EventEmitter() + + // Output + var push = zmq.socket('push') + options.endpoints.push.forEach(function(endpoint) { + log.info('Sending output to %s', endpoint) + push.connect(endpoint) + }) + + // Input + var sub = zmq.socket('sub') + options.endpoints.sub.forEach(function(endpoint) { + log.info('Receiving input from %s', endpoint) + sub.connect(endpoint) + }) + + // Establish always-on channels + ;[wireutil.global].forEach(function(channel) { + log.info('Subscribing to permanent channel "%s"', channel) + sub.subscribe(channel) + }) + + sub.on('message', function(channel, data) { + channelRouter.emit(channel.toString(), channel, data) + }) + + io.use(cookieSession({ + name: options.ssid + , keys: [options.secret] + })) + + io.use(ip({ + trust: function() { + return true + } + })) + + io.use(auth) + + io.on('connection', function(socket) { + var channels = [] + , user = socket.request.user + + socket.emit('socket.ip', socket.request.ip) + + function joinChannel(channel) { + channels.push(channel) + channelRouter.on(channel, messageListener) + sub.subscribe(channel) + } + + function leaveChannel(channel) { + _.pull(channels, channel) + channelRouter.removeListener(channel, messageListener) + sub.unsubscribe(channel) + } + + function createTouchHandler(Klass) { + return function(channel, data) { + push.send([ + channel + , wireutil.envelope(new Klass( + data.seq + , data.x + , data.y + )) + ]) + } + } + + function createKeyHandler(Klass) { + return function(channel, data) { + push.send([ + channel + , wireutil.envelope(new Klass( + data.key + )) + ]) + } + } + + var messageListener = wirerouter() + .on(wire.DeviceLogMessage, function(channel, message) { + socket.emit('device.log', message) + }) + .on(wire.DevicePresentMessage, function(channel, message) { + socket.emit('device.add', { + important: true + , data: { + serial: message.serial + , present: true + } + }) + }) + .on(wire.DeviceAbsentMessage, function(channel, message) { + socket.emit('device.remove', { + important: true + , data: { + serial: message.serial + , present: false + , ready: false + , lastHeartbeatAt: null + , using: false + } + }) + }) + .on(wire.JoinGroupMessage, function(channel, message) { + socket.emit('device.change', { + important: true + , data: datautil.applyOwner({ + serial: message.serial + , owner: message.owner + } + , user + ) + }) + }) + .on(wire.LeaveGroupMessage, function(channel, message) { + socket.emit('device.change', { + important: true + , data: datautil.applyOwner({ + serial: message.serial + , owner: null + } + , user + ) + }) + }) + .on(wire.DeviceStatusMessage, function(channel, message) { + socket.emit('device.change', { + important: true + , data: message + }) + }) + .on(wire.DeviceIdentityMessage, function(channel, message) { + datautil.applyData(message) + message.ready = true + socket.emit('device.change', { + important: true + , data: message + }) + }) + .on(wire.TransactionProgressMessage, function(channel, message) { + socket.emit('tx.progress', channel.toString(), message) + }) + .on(wire.TransactionDoneMessage, function(channel, message) { + socket.emit('tx.done', channel.toString(), message) + }) + .on(wire.DeviceLogcatEntryMessage, function(channel, message) { + socket.emit('logcat.entry', message) + }) + .on(wire.AirplaneModeEvent, function(channel, message) { + socket.emit('device.change', { + important: true + , data: { + serial: message.serial + , airplaneMode: message.enabled + } + }) + }) + .on(wire.BatteryEvent, function(channel, message) { + var serial = message.serial + delete message.serial + socket.emit('device.change', { + important: false + , data: { + serial: serial + , battery: message + } + }) + }) + .on(wire.DeviceBrowserMessage, function(channel, message) { + var serial = message.serial + delete message.serial + socket.emit('device.change', { + important: true + , data: datautil.applyBrowsers({ + serial: serial + , browser: message + }) + }) + }) + .on(wire.ConnectivityEvent, function(channel, message) { + var serial = message.serial + delete message.serial + socket.emit('device.change', { + important: false + , data: { + serial: serial + , network: message + } + }) + }) + .on(wire.PhoneStateEvent, function(channel, message) { + var serial = message.serial + delete message.serial + socket.emit('device.change', { + important: false + , data: { + serial: serial + , network: message + } + }) + }) + .on(wire.RotationEvent, function(channel, message) { + socket.emit('device.change', { + important: false + , data: { + serial: message.serial + , display: { + orientation: message.rotation + } + } + }) + }) + .handler() + + // Global messages + // + // @todo Use socket.io to push global events to all clients instead + // of listening on every connection, otherwise we're very likely to + // hit EventEmitter's leak complaints (plus it's more work) + channelRouter.on(wireutil.global, messageListener) + + // User's private group + joinChannel(user.group) + + new Promise(function(resolve) { + socket.on('disconnect', resolve) + // Touch events + .on('input.touchDown', createTouchHandler(wire.TouchDownMessage)) + .on('input.touchMove', createTouchHandler(wire.TouchMoveMessage)) + .on('input.touchUp', createTouchHandler(wire.TouchUpMessage)) + .on('input.tap', createTouchHandler(wire.TapMessage)) + // Key events + .on('input.keyDown', createKeyHandler(wire.KeyDownMessage)) + .on('input.keyUp', createKeyHandler(wire.KeyUpMessage)) + .on('input.keyPress', createKeyHandler(wire.KeyPressMessage)) + .on('input.type', function(channel, data) { + push.send([ + channel + , wireutil.envelope(new wire.TypeMessage( + data.text + )) + ]) + }) + .on('display.rotate', function(channel, data) { + push.send([ + channel + , wireutil.envelope(new wire.RotateMessage( + data.rotation + )) + ]) + }) + // Transactions + .on('clipboard.paste', function(channel, responseChannel, data) { + joinChannel(responseChannel) + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.PasteMessage(data.text) + ) + ]) + }) + .on('clipboard.copy', function(channel, responseChannel) { + joinChannel(responseChannel) + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.CopyMessage() + ) + ]) + }) + .on('device.identify', function(channel, responseChannel) { + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.PhysicalIdentifyMessage() + ) + ]) + }) + .on('group.invite', function(channel, responseChannel, data) { + joinChannel(responseChannel) + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.GroupMessage( + new wire.OwnerMessage( + user.email + , user.name + , user.group + ) + , data.timeout || null + , wireutil.toDeviceRequirements(data.requirements) + ) + ) + ]) + }) + .on('group.kick', function(channel, responseChannel, data) { + joinChannel(responseChannel) + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.UngroupMessage( + wireutil.toDeviceRequirements(data.requirements) + ) + ) + ]) + }) + .on('tx.cleanup', function(channel) { + leaveChannel(channel) + }) + .on('tx.punch', function(channel) { + joinChannel(channel) + socket.emit('tx.punch', channel) + }) + .on('shell.command', function(channel, responseChannel, data) { + joinChannel(responseChannel) + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.ShellCommandMessage(data) + ) + ]) + }) + .on('shell.keepalive', function(channel, data) { + push.send([ + channel + , wireutil.envelope(new wire.ShellKeepAliveMessage(data)) + ]) + }) + .on('device.install', function(channel, responseChannel, data) { + joinChannel(responseChannel) + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.InstallMessage( + data.href + , data.launch === true + , JSON.stringify(data.manifest) + ) + ) + ]) + }) + .on('device.uninstall', function(channel, responseChannel, data) { + joinChannel(responseChannel) + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.UninstallMessage(data) + ) + ]) + }) + .on('storage.upload', function(channel, responseChannel, data) { + joinChannel(responseChannel) + request.postAsync({ + url: util.format( + '%sapi/v1/resources?channel=%s' + , options.storageUrl + , responseChannel + ) + , json: true + , body: { + url: data.url + } + }) + .catch(function(err) { + log.error('Storage upload had an error', err.stack) + leaveChannel(responseChannel) + socket.emit('tx.cancel', responseChannel, { + success: false + , data: 'fail_upload' + }) + }) + }) + .on('forward.test', function(channel, responseChannel, data) { + joinChannel(responseChannel) + if (!data.targetHost || data.targetHost === 'localhost') { + data.targetHost = socket.request.ip + } + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.ForwardTestMessage(data) + ) + ]) + }) + .on('forward.create', function(channel, responseChannel, data) { + if (!data.targetHost || data.targetHost === 'localhost') { + data.targetHost = socket.request.ip + } + dbapi.addUserForward(user.email, data) + .then(function() { + socket.emit('forward.create', data) + joinChannel(responseChannel) + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.ForwardCreateMessage(data) + ) + ]) + }) + }) + .on('forward.remove', function(channel, responseChannel, data) { + dbapi.removeUserForward(user.email, data.devicePort) + .then(function() { + socket.emit('forward.remove', data) + joinChannel(responseChannel) + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.ForwardRemoveMessage(data) + ) + ]) + }) + }) + .on('logcat.start', function(channel, responseChannel, data) { + joinChannel(responseChannel) + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.LogcatStartMessage(data) + ) + ]) + }) + .on('logcat.stop', function(channel, responseChannel) { + joinChannel(responseChannel) + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.LogcatStopMessage() + ) + ]) + }) + .on('browser.open', function(channel, responseChannel, data) { + joinChannel(responseChannel) + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.BrowserOpenMessage(data) + ) + ]) + }) + .on('browser.clear', function(channel, responseChannel, data) { + joinChannel(responseChannel) + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.BrowserClearMessage(data) + ) + ]) + }) + .on('store.open', function(channel, responseChannel) { + joinChannel(responseChannel) + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.StoreOpenMessage() + ) + ]) + }) + .on('screen.capture', function(channel, responseChannel) { + joinChannel(responseChannel) + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.ScreenCaptureMessage() + ) + ]) + }) + }) + .finally(function() { + // Clean up all listeners and subscriptions + channelRouter.removeListener(wireutil.global, messageListener) + channels.forEach(function(channel) { + channelRouter.removeListener(channel, messageListener) + sub.unsubscribe(channel) + }) + }) + .catch(function(err) { + // Cannot guarantee integrity of client + log.error( + 'Client had an error, disconnecting due to probable loss of integrity' + , err.stack + ) + + socket.disconnect(true) + }) + + + /* + socket.on('flick', function(data) {}) + socket.on('back', function(data) {}) + socket.on('forward', function(data) {}) + socket.on('refresh', function(data) {}) + socket.on('internal.relaunch', function(data) {}) + socket.on('internal.clear', function(data) {}) + socket.on('selenium.setCookie', function(data) {}) + socket.on('selenium.deleteCookie', function(data) {}) + socket.on('selenium.deleteAllCookies', function(data) {}) + socket.on('debug.benchmark.pull.start', function(data) {}) + socket.on('debug.benchmark.pull.stop', function(data) {}) + socket.on('logcat', function(data) {}) + socket.on('debug.benchmark.pull.rate', function(data) {}) + socket.on('cpu.monitor.load', function(data) {}) + + socket.on('safeExecute', function(data) {}) + socket.on('eval', function(data) {}) + socket.on('safeEval', function(data) {}) + socket.on('executeAsync', function(data) {}) + socket.on('safeExecuteAsync', function(data) {}) + socket.on('execute', function(data) {}) + socket.on('screenshot', function(data) {}) + socket.on('selenium.screenshot', function(data) {}) + socket.on('url', function(data) {}) + socket.on('selenium.allCookies', function(data) {}) + */ + //this._react 'selenium.weinre', => + // this._runTransaction 'selenium.weinre', + // targetHost: conf.weinre.httpHost + // targetPort: conf.weinre.httpPort + }) + + server.listen(options.port) + log.info('Listening on port %d', options.port) +} diff --git a/lib/roles/websocket/middleware/auth.js b/lib/roles/websocket/middleware/auth.js new file mode 100644 index 00000000..bebcb451 --- /dev/null +++ b/lib/roles/websocket/middleware/auth.js @@ -0,0 +1,22 @@ +var dbapi = require('../../../db/api') + +module.exports = function(socket, next) { + var req = socket.request + , token = req.session.jwt + if (token) { + return dbapi.loadUser(token.email) + .then(function(user) { + if (user) { + req.user = user + next() + } + else { + next(new Error('Invalid user')) + } + }) + .catch(next) + } + else { + next(new Error('Missing authorization token')) + } +} diff --git a/lib/roles/websocket/middleware/cookie-session.js b/lib/roles/websocket/middleware/cookie-session.js new file mode 100644 index 00000000..14132828 --- /dev/null +++ b/lib/roles/websocket/middleware/cookie-session.js @@ -0,0 +1,10 @@ +var cookieSession = require('cookie-session') + +module.exports = function(options) { + var session = cookieSession(options) + return function(socket, next) { + var req = socket.request + , res = Object.create(null) + session(req, res, next) + } +} diff --git a/lib/roles/websocket/middleware/remote-ip.js b/lib/roles/websocket/middleware/remote-ip.js new file mode 100644 index 00000000..9f25b839 --- /dev/null +++ b/lib/roles/websocket/middleware/remote-ip.js @@ -0,0 +1,9 @@ +var proxyaddr = require('proxy-addr') + +module.exports = function(options) { + return function(socket, next) { + var req = socket.request + req.ip = proxyaddr(req, options.trust) + next() + } +} diff --git a/res/app/components/stf/socket/socket-service.js b/res/app/components/stf/socket/socket-service.js index d1b07ca1..8e473464 100644 --- a/res/app/components/stf/socket/socket-service.js +++ b/res/app/components/stf/socket/socket-service.js @@ -1,7 +1,9 @@ var io = require('socket.io') module.exports = function SocketFactory($rootScope, VersionUpdateService) { - var socket = io('/', { + /*globals APP:false*/ + + var socket = io(APP.websocketUrl, { reconnection: false , transports: ['websocket'] }) diff --git a/res/app/views/index.jade b/res/app/views/index.jade index 75367ee5..b130cc61 100644 --- a/res/app/views/index.jade +++ b/res/app/views/index.jade @@ -22,5 +22,6 @@ html div(growl) div(ng-view).fill-height + script var APP = !{JSON.stringify(APP)} script(src='/static/build/bundle.js') script(src='/static/bower_components/stf-analytics/analytics.js')