var http = require('http') var events = require('events') var path = require('path') var util = require('util') var express = require('express') var validator = require('express-validator') var cookieSession = require('cookie-session') var bodyParser = require('body-parser') var serveFavicon = require('serve-favicon') var serveStatic = require('serve-static') var csrf = require('csurf') var socketio = require('socket.io') var zmq = require('zmq') var Promise = require('bluebird') var httpProxy = require('http-proxy') var _ = require('lodash') var request = Promise.promisifyAll(require('request')) var proxyaddr = require('proxy-addr') var logger = require('../util/logger') var pathutil = require('../util/pathutil') var wire = require('../wire') var wireutil = require('../wire/util') var wirerouter = require('../wire/router') var dbapi = require('../db/api') var datautil = require('../util/datautil') var auth = require('../middleware/auth') var webpack = require('../middleware/webpack') var deviceIconMiddleware = require('../middleware/device-icons') var browserIconMiddleware = require('../middleware/browser-icons') module.exports = function(options) { var log = logger.createLogger('app') , app = express() , server = http.createServer(app) , io = socketio.listen(server, { serveClient: false , transports: ['websocket'] }) , channelRouter = new events.EventEmitter() , proxy = httpProxy.createProxyServer() , sessionMiddleware proxy.on('error', function(err) { log.error('Proxy had an error', err.stack) }) app.set('view engine', 'jade') app.set('views', pathutil.resource('app/views')) app.set('strict routing', true) app.set('case sensitive routing', true) if (!options.disableWatch) { app.use('/static/build', webpack({ debug: true , devtool: 'eval' })) } app.use('/static/bower_components', serveStatic(pathutil.resource('bower_components'))) app.use('/intro', serveStatic(pathutil.resource('bower_components/stf-site/intro'))) app.use('/manual-basic', serveStatic(pathutil.resource('bower_components/stf-site/manual/basic'))) app.use('/manual-advanced', serveStatic(pathutil.resource('bower_components/stf-site/manual/advanced'))) app.use('/v2-features', serveStatic(pathutil.resource('bower_components/stf-site/v2-features'))) app.use('/static/data', serveStatic(pathutil.resource('data'))) app.use('/static/build', serveStatic(pathutil.resource('build'))) app.use('/static/browsers', browserIconMiddleware()) app.use('/static/devices', deviceIconMiddleware()) app.use('/static', serveStatic(pathutil.resource('app'))) app.use(serveFavicon(pathutil.resource( 'bower_components/stf-graphics/logo/exports/STF-128.png'))) app.use(sessionMiddleware = cookieSession({ name: options.ssid , keys: [options.secret] })) app.use(auth({ secret: options.secret , authUrl: options.authUrl })) // Proxied requests must come before any body parsers. These proxies are // here mainly for convenience, they should be replaced with proper reverse // proxies in production. app.all('/api/v1/s/image/*', function(req, res) { proxy.web(req, res, { target: options.storagePluginImageUrl }) }) app.all('/api/v1/s/apk/*', function(req, res) { proxy.web(req, res, { target: options.storagePluginApkUrl }) }) app.all('/api/v1/s/*', function(req, res) { proxy.web(req, res, { target: options.storageUrl }) }) app.use(bodyParser.json()) app.use(csrf()) app.use(validator()) // Output var push = zmq.socket('push') options.endpoints.push.forEach(function(endpoint) { log.info('Sending output to %s', endpoint) push.connect(endpoint) }) // Input var sub = zmq.socket('sub') options.endpoints.sub.forEach(function(endpoint) { log.info('Receiving input from %s', endpoint) sub.connect(endpoint) }) // Establish always-on channels ;[wireutil.global].forEach(function(channel) { log.info('Subscribing to permanent channel "%s"', channel) sub.subscribe(channel) }) sub.on('message', function(channel, data) { channelRouter.emit(channel.toString(), channel, data) }) app.get('/partials/*', function(req, res) { var whitelist = { 'devices/index': true , 'devices/control': true , 'devices/screen': true } if (whitelist.hasOwnProperty(req.params[0])) { res.render(path.join('partials', req.params[0])) } else { res.send(404) } }) app.get('/', function(req, res) { res.render('index') }) app.get('/api/v1/user', function(req, res) { res.json({ success: true , user: req.user }) }) app.get('/api/v1/group', function(req, res) { dbapi.loadGroup(req.user.email) .then(function(cursor) { return Promise.promisify(cursor.toArray, cursor)() .then(function(list) { list.forEach(function(device) { datautil.normalize(device, req.user) }) res.json({ success: true , devices: list }) }) }) .catch(function(err) { log.error('Failed to load group: ', err.stack) res.json(500, { success: false }) }) }) app.get('/api/v1/devices', function(req, res) { dbapi.loadDevices() .then(function(cursor) { return Promise.promisify(cursor.toArray, cursor)() .then(function(list) { list.forEach(function(device) { datautil.normalize(device, req.user) }) res.json({ success: true , devices: list }) }) }) .catch(function(err) { log.error('Failed to load device list: ', err.stack) res.json(500, { success: false }) }) }) app.get('/api/v1/devices/:serial', function(req, res) { dbapi.loadDevice(req.params.serial) .then(function(device) { if (device) { datautil.normalize(device, req.user) res.json({ success: true , device: device }) } else { res.json(404, { success: false }) } }) .catch(function(err) { log.error('Failed to load device "%s": ', req.params.serial, err.stack) res.json(500, { success: false }) }) }) io.use(function(socket, next) { var req = socket.request , res = Object.create(null) sessionMiddleware(req, res, next) }) io.use(function(socket, next) { var req = socket.request // This is similar to what Express does behind the scenes req.ip = proxyaddr(req, app.get('trust proxy fn')) next() }) io.use(function(socket, next) { var req = socket.request , token = req.session.jwt if (token) { return dbapi.loadUser(token.email) .then(function(user) { if (user) { req.user = user next() } else { next(new Error('Invalid user')) } }) .catch(next) } else { next(new Error('Missing authorization token')) } }) io.on('connection', function(socket) { var channels = [] , user = socket.request.user socket.emit('socket.ip', socket.request.ip) function joinChannel(channel) { channels.push(channel) channelRouter.on(channel, messageListener) sub.subscribe(channel) } function leaveChannel(channel) { _.pull(channels, channel) channelRouter.removeListener(channel, messageListener) sub.unsubscribe(channel) } function createTouchHandler(Klass) { return function(channel, data) { push.send([ channel , wireutil.envelope(new Klass( data.seq , data.x , data.y )) ]) } } function createKeyHandler(Klass) { return function(channel, data) { push.send([ channel , wireutil.envelope(new Klass( data.key )) ]) } } var messageListener = wirerouter() .on(wire.DeviceLogMessage, function(channel, message) { socket.emit('device.log', message) }) .on(wire.DevicePresentMessage, function(channel, message) { socket.emit('device.add', { important: true , data: { serial: message.serial , present: true } }) }) .on(wire.DeviceAbsentMessage, function(channel, message) { socket.emit('device.remove', { important: true , data: { serial: message.serial , present: false , ready: false , lastHeartbeatAt: null , using: false } }) }) .on(wire.JoinGroupMessage, function(channel, message) { socket.emit('device.change', { important: true , data: datautil.applyOwner({ serial: message.serial , owner: message.owner } , user ) }) }) .on(wire.LeaveGroupMessage, function(channel, message) { socket.emit('device.change', { important: true , data: datautil.applyOwner({ serial: message.serial , owner: null } , user ) }) }) .on(wire.DeviceStatusMessage, function(channel, message) { socket.emit('device.change', { important: true , data: message }) }) .on(wire.DeviceIdentityMessage, function(channel, message) { datautil.applyData(message) message.ready = true socket.emit('device.change', { important: true , data: message }) }) .on(wire.TransactionProgressMessage, function(channel, message) { socket.emit('tx.progress', channel.toString(), message) }) .on(wire.TransactionDoneMessage, function(channel, message) { socket.emit('tx.done', channel.toString(), message) }) .on(wire.DeviceLogcatEntryMessage, function(channel, message) { socket.emit('logcat.entry', message) }) .on(wire.AirplaneModeEvent, function(channel, message) { socket.emit('device.change', { important: true , data: { serial: message.serial , airplaneMode: message.enabled } }) }) .on(wire.BatteryEvent, function(channel, message) { var serial = message.serial delete message.serial socket.emit('device.change', { important: false , data: { serial: serial , battery: message } }) }) .on(wire.DeviceBrowserMessage, function(channel, message) { var serial = message.serial delete message.serial socket.emit('device.change', { important: true , data: datautil.applyBrowsers({ serial: serial , browser: message }) }) }) .on(wire.ConnectivityEvent, function(channel, message) { var serial = message.serial delete message.serial socket.emit('device.change', { important: false , data: { serial: serial , network: message } }) }) .on(wire.PhoneStateEvent, function(channel, message) { var serial = message.serial delete message.serial socket.emit('device.change', { important: false , data: { serial: serial , network: message } }) }) .on(wire.RotationEvent, function(channel, message) { socket.emit('device.change', { important: false , data: { serial: message.serial , display: { orientation: message.rotation } } }) }) .handler() // Global messages // // @todo Use socket.io to push global events to all clients instead // of listening on every connection, otherwise we're very likely to // hit EventEmitter's leak complaints (plus it's more work) channelRouter.on(wireutil.global, messageListener) // User's private group joinChannel(user.group) new Promise(function(resolve) { socket.on('disconnect', resolve) // Touch events .on('input.touchDown', createTouchHandler(wire.TouchDownMessage)) .on('input.touchMove', createTouchHandler(wire.TouchMoveMessage)) .on('input.touchUp', createTouchHandler(wire.TouchUpMessage)) .on('input.tap', createTouchHandler(wire.TapMessage)) // Key events .on('input.keyDown', createKeyHandler(wire.KeyDownMessage)) .on('input.keyUp', createKeyHandler(wire.KeyUpMessage)) .on('input.keyPress', createKeyHandler(wire.KeyPressMessage)) .on('input.type', function(channel, data) { push.send([ channel , wireutil.envelope(new wire.TypeMessage( data.text )) ]) }) .on('display.rotate', function(channel, data) { push.send([ channel , wireutil.envelope(new wire.RotateMessage( data.rotation )) ]) }) // Transactions .on('clipboard.paste', function(channel, responseChannel, data) { joinChannel(responseChannel) push.send([ channel , wireutil.transaction( responseChannel , new wire.PasteMessage(data.text) ) ]) }) .on('clipboard.copy', function(channel, responseChannel) { joinChannel(responseChannel) push.send([ channel , wireutil.transaction( responseChannel , new wire.CopyMessage() ) ]) }) .on('device.identify', function(channel, responseChannel) { push.send([ channel , wireutil.transaction( responseChannel , new wire.PhysicalIdentifyMessage() ) ]) }) .on('group.invite', function(channel, responseChannel, data) { joinChannel(responseChannel) push.send([ channel , wireutil.transaction( responseChannel , new wire.GroupMessage( new wire.OwnerMessage( user.email , user.name , user.group ) , data.timeout || null , wireutil.toDeviceRequirements(data.requirements) ) ) ]) }) .on('group.kick', function(channel, responseChannel, data) { joinChannel(responseChannel) push.send([ channel , wireutil.transaction( responseChannel , new wire.UngroupMessage( wireutil.toDeviceRequirements(data.requirements) ) ) ]) }) .on('tx.cleanup', function(channel) { leaveChannel(channel) }) .on('tx.punch', function(channel) { joinChannel(channel) socket.emit('tx.punch', channel) }) .on('shell.command', function(channel, responseChannel, data) { joinChannel(responseChannel) push.send([ channel , wireutil.transaction( responseChannel , new wire.ShellCommandMessage(data) ) ]) }) .on('shell.keepalive', function(channel, data) { push.send([ channel , wireutil.envelope(new wire.ShellKeepAliveMessage(data)) ]) }) .on('device.install', function(channel, responseChannel, data) { joinChannel(responseChannel) push.send([ channel , wireutil.transaction( responseChannel , new wire.InstallMessage( data.href , data.launch === true , JSON.stringify(data.manifest) ) ) ]) }) .on('device.uninstall', function(channel, responseChannel, data) { joinChannel(responseChannel) push.send([ channel , wireutil.transaction( responseChannel , new wire.UninstallMessage(data) ) ]) }) .on('storage.upload', function(channel, responseChannel, data) { joinChannel(responseChannel) request.postAsync({ url: util.format( '%sapi/v1/resources?channel=%s' , options.storageUrl , responseChannel ) , json: true , body: { url: data.url } }) .catch(function(err) { log.error('Storage upload had an error', err.stack) leaveChannel(responseChannel) socket.emit('tx.cancel', responseChannel, { success: false , data: 'fail_upload' }) }) }) .on('forward.test', function(channel, responseChannel, data) { joinChannel(responseChannel) if (!data.targetHost || data.targetHost === 'localhost') { data.targetHost = ip } push.send([ channel , wireutil.transaction( responseChannel , new wire.ForwardTestMessage(data) ) ]) }) .on('forward.create', function(channel, responseChannel, data) { if (!data.targetHost || data.targetHost === 'localhost') { data.targetHost = ip } dbapi.addUserForward(user.email, data) .then(function() { socket.emit('forward.create', data) joinChannel(responseChannel) push.send([ channel , wireutil.transaction( responseChannel , new wire.ForwardCreateMessage(data) ) ]) }) }) .on('forward.remove', function(channel, responseChannel, data) { dbapi.removeUserForward(user.email, data.devicePort) .then(function() { socket.emit('forward.remove', data) joinChannel(responseChannel) push.send([ channel , wireutil.transaction( responseChannel , new wire.ForwardRemoveMessage(data) ) ]) }) }) .on('logcat.start', function(channel, responseChannel, data) { joinChannel(responseChannel) push.send([ channel , wireutil.transaction( responseChannel , new wire.LogcatStartMessage(data) ) ]) }) .on('logcat.stop', function(channel, responseChannel) { joinChannel(responseChannel) push.send([ channel , wireutil.transaction( responseChannel , new wire.LogcatStopMessage() ) ]) }) .on('browser.open', function(channel, responseChannel, data) { joinChannel(responseChannel) push.send([ channel , wireutil.transaction( responseChannel , new wire.BrowserOpenMessage(data) ) ]) }) .on('browser.clear', function(channel, responseChannel, data) { joinChannel(responseChannel) push.send([ channel , wireutil.transaction( responseChannel , new wire.BrowserClearMessage(data) ) ]) }) .on('store.open', function(channel, responseChannel) { joinChannel(responseChannel) push.send([ channel , wireutil.transaction( responseChannel , new wire.StoreOpenMessage() ) ]) }) .on('screen.capture', function(channel, responseChannel) { joinChannel(responseChannel) push.send([ channel , wireutil.transaction( responseChannel , new wire.ScreenCaptureMessage() ) ]) }) }) .finally(function() { // Clean up all listeners and subscriptions channelRouter.removeListener(wireutil.global, messageListener) channels.forEach(function(channel) { channelRouter.removeListener(channel, messageListener) sub.unsubscribe(channel) }) }) .catch(function(err) { // Cannot guarantee integrity of client log.error( 'Client had an error, disconnecting due to probable loss of integrity' , err.stack ) socket.disconnect(true) }) /* socket.on('flick', function(data) {}) socket.on('back', function(data) {}) socket.on('forward', function(data) {}) socket.on('refresh', function(data) {}) socket.on('internal.relaunch', function(data) {}) socket.on('internal.clear', function(data) {}) socket.on('selenium.setCookie', function(data) {}) socket.on('selenium.deleteCookie', function(data) {}) socket.on('selenium.deleteAllCookies', function(data) {}) socket.on('debug.benchmark.pull.start', function(data) {}) socket.on('debug.benchmark.pull.stop', function(data) {}) socket.on('logcat', function(data) {}) socket.on('debug.benchmark.pull.rate', function(data) {}) socket.on('cpu.monitor.load', function(data) {}) socket.on('safeExecute', function(data) {}) socket.on('eval', function(data) {}) socket.on('safeEval', function(data) {}) socket.on('executeAsync', function(data) {}) socket.on('safeExecuteAsync', function(data) {}) socket.on('execute', function(data) {}) socket.on('screenshot', function(data) {}) socket.on('selenium.screenshot', function(data) {}) socket.on('url', function(data) {}) socket.on('selenium.allCookies', function(data) {}) */ //this._react 'selenium.weinre', => // this._runTransaction 'selenium.weinre', // targetHost: conf.weinre.httpHost // targetPort: conf.weinre.httpPort }) server.listen(options.port) log.info('Listening on port %d', options.port) }