diff --git a/bower.json b/bower.json index 46eb6d8f..4a063423 100644 --- a/bower.json +++ b/bower.json @@ -5,7 +5,8 @@ "angular": "~1.2.9", "angular-route": "~1.2.9", "requirejs": "~2.1.10", - "se7en-bootstrap-3": "git@ghe.amb.ca.local:stf/se7en-bootstrap-3.git" + "se7en-bootstrap-3": "git@ghe.amb.ca.local:stf/se7en-bootstrap-3.git", + "socket.io-client": "~0.9.16" }, "private": true } diff --git a/lib/cli.js b/lib/cli.js index 95756ace..18e7ada6 100644 --- a/lib/cli.js +++ b/lib/cli.js @@ -335,6 +335,12 @@ program .option('-a, --auth-url ' , 'URL to auth client' , String) + .option('-u, --connect-sub ' + , 'sub endpoint' + , cliutil.list) + .option('-c, --connect-push ' + , 'push endpoint' + , cliutil.list) .action(function(options) { if (!options.secret) { this.missingArgument('--secret') @@ -342,12 +348,23 @@ program if (!options.authUrl) { this.missingArgument('--auth-url') } + if (!options.connectSub) { + this.missingArgument('--connect-sub') + } + if (!options.connectPush) { + this.missingArgument('--connect-push') + } require('./roles/app')({ port: options.port , secret: options.secret , ssid: options.ssid , authUrl: options.authUrl + , groupTimeout: 10000 + , endpoints: { + sub: options.connectSub + , push: options.connectPush + } }) }) @@ -493,6 +510,8 @@ program , '--port', options.appPort , '--secret', options.authSecret , '--auth-url', util.format('http://localhost:%d/', options.authPort) + , '--connect-sub', options.bindAppPub + , '--connect-push', options.bindAppPull ]) .catch(function(err) { log.error('app died', err.stack) diff --git a/lib/db/api.js b/lib/db/api.js index e9988ce9..aa32a61e 100644 --- a/lib/db/api.js +++ b/lib/db/api.js @@ -1,16 +1,30 @@ var r = require('rethinkdb') var db = require('./') +var wire = require('../wire') +var wireutil = require('../util/wireutil')(wire) module.exports.saveUserAfterLogin = function(user) { - return db.run(r.table('users').insert({ - email: user.email - , name: user.name - , lastLogin: r.now() - } - , { - upsert: true + return db.run(r.table('users').get(user.email).update({ + name: user.name + , lastLoggedInAt: r.now() })) + .then(function(stats) { + if (stats.skipped) { + return db.run(r.table('users').insert({ + email: user.email + , name: user.name + , group: wireutil.makePrivateChannel() + , lastLoggedInAt: r.now() + , createdAt: r.now() + })) + } + return stats + }) +} + +module.exports.loadUser = function(email) { + return db.run(r.table('users').get(email)) } module.exports.saveDeviceLog = function(serial, entry) { diff --git a/lib/roles/app.js b/lib/roles/app.js index cd957a8c..1978ebaa 100644 --- a/lib/roles/app.js +++ b/lib/roles/app.js @@ -1,26 +1,40 @@ var url = require('url') +var http = require('http') +var events = require('events') var express = require('express') var validator = require('express-validator') +var socketio = require('socket.io') +var zmq = require('zmq') +var Promise = require('bluebird') var logger = require('../util/logger') var pathutil = require('../util/pathutil') +var wire = require('../wire') +var wireutil = require('../util/wireutil')(wire) +var dbapi = require('../db/api') var auth = require('../middleware/auth') module.exports = function(options) { var log = logger.createLogger('app') , app = express() + , server = http.createServer(app) + , io = socketio.listen(server) + , router = new events.EventEmitter() app.set('view engine', 'jade') app.set('views', pathutil.resource('app/views')) app.set('strict routing', true) app.set('case sensitive routing', true) + app.set('trust proxy', true) - app.use(express.cookieParser()) + io.set('log level', 1) + io.set('browser client', false) + + app.use(express.cookieParser(options.secret)) app.use(express.cookieSession({ - secret: options.secret - , key: options.ssid + key: options.ssid })) app.use(auth({ secret: options.secret @@ -33,10 +47,182 @@ module.exports = function(options) { app.use('/static/lib', express.static(pathutil.resource('lib'))) app.use('/static', express.static(pathutil.resource('app'))) + // Output + var push = zmq.socket('push') + options.endpoints.push.forEach(function(endpoint) { + log.info('Sending output to %s', endpoint) + push.connect(endpoint) + }) + + // Input + var sub = zmq.socket('sub') + options.endpoints.sub.forEach(function(endpoint) { + log.info('Receiving input from %s', endpoint) + sub.connect(endpoint) + }) + + // Establish always-on channels + ;[wireutil.global].forEach(function(channel) { + log.info('Subscribing to permanent channel "%s"', channel) + sub.subscribe(channel) + }) + + sub.on('message', function(channel, data) { + router.emit( + channel.toString() + , channel + , wire.Envelope.decode(data) + ) + }) + + app.get('/partials/:name', function(req, res) { + var whitelist = { + 'deviceList': true + } + + if (whitelist[req.params.name]) { + res.render('partials/' + req.params.name) + } + else { + res.send(404) + } + }) + app.get('/', function(req, res) { res.render('index') }) - app.listen(options.port) + io.set('authorization', (function() { + var parse = Promise.promisify(express.cookieParser(options.secret)) + return function(handshake, accept) { + parse(handshake, {}) + .then(function() { + if (handshake.signedCookies[options.ssid]) { + handshake.session = handshake.signedCookies[options.ssid] + return dbapi.loadUser(handshake.session.jwt.email) + .then(function(user) { + handshake.user = user + accept(null, true) + }) + } + else { + accept(null, false) + } + }) + .catch(function(err) { + accept(null, false) + }) + } + })()) + + io.on('connection', function(socket) { + var channels = [] + , group = socket.handshake.user.group + + function messageListener(channel, wrapper) { + switch (wrapper.type) { + case wire.MessageType.JOIN_GROUP: + var message = wire.JoinGroupMessage.decode(wrapper.message) + socket.emit('join', message) + break + case wire.MessageType.LEAVE_GROUP: + var message = wire.LeaveGroupMessage.decode(wrapper.message) + socket.emit('leave', message) + break + } + } + + // Global messages + // + // @todo Use socket.io to push global events to all clients instead + // of listening on every connection, otherwise we're very likely to + // hit EventEmitter's leak complaints (plus it's more work) + channels.push(wireutil.global) + router.on(wireutil.global, messageListener) + + // User's private group + channels.push(group) + sub.subscribe(group) + router.on(group, messageListener) + + // Clean up all listeners and subscriptions + socket.on('disconnect', function() { + channels.forEach(function(channel) { + router.removeListener(channel, messageListener) + sub.unsubscribe(channel) + }) + }) + + socket.on('invite', function(data) { + push.send([wireutil.global, wireutil.makeGroupMessage( + group + , options.groupTimeout + , data + )]) + }) + + socket.on('kick', function(data) { + push.send([group, wireutil.makeUngroupMessage( + group + , data + )]) + }) + + socket.on('flick', function(data) {}) + socket.on('back', function(data) {}) + socket.on('forward', function(data) {}) + socket.on('refresh', function(data) {}) + socket.on('monkey.touchDown', function(data) {}) + socket.on('monkey.touchMove', function(data) {}) + socket.on('monkey.touchUp', function(data) {}) + socket.on('monkey.keyDown', function(data) {}) + socket.on('monkey.keyUp', function(data) {}) + socket.on('monkey.press', function(data) {}) + socket.on('monkey.type', function(data) {}) + socket.on('monkey.back', function(data) {}) + socket.on('monkey.home', function(data) {}) + socket.on('monkey.menu', function(data) {}) + socket.on('internal.relaunch', function(data) {}) + socket.on('browser.open', function(data) {}) + socket.on('chrome.open', function(data) {}) + socket.on('browser.clear', function(data) {}) + socket.on('chrome.clear', function(data) {}) + socket.on('internal.clear', function(data) {}) + socket.on('selenium.setCookie', function(data) {}) + socket.on('selenium.deleteCookie', function(data) {}) + socket.on('selenium.deleteAllCookies', function(data) {}) + socket.on('debug.benchmark.pull.start', function(data) {}) + socket.on('debug.benchmark.pull.stop', function(data) {}) + socket.on('logcat', function(data) {}) + socket.on('debug.benchmark.pull.rate', function(data) {}) + socket.on('cpu.monitor.load', function(data) {}) + + socket.on('safeExecute', function(data) {}) + socket.on('eval', function(data) {}) + socket.on('safeEval', function(data) {}) + socket.on('executeAsync', function(data) {}) + socket.on('safeExecuteAsync', function(data) {}) + socket.on('execute', function(data) {}) + socket.on('screen', function(data) {}) + socket.on('screenshot', function(data) {}) + socket.on('selenium.screenshot', function(data) {}) + socket.on('url', function(data) {}) + socket.on('selenium.allCookies', function(data) {}) + socket.on('forward.unset', function(data) {}) + socket.on('forward.list', function(data) {}) + + //this._react 'forward.test', (data = {}) => + // this._runTransaction 'forward.test', + // this._insertOptionalIp data, 'targetHost' + //this._react 'forward.set', (data = {}) => + // this._runTransaction 'forward.set', + // this._insertOptionalIp data, 'targetHost' + //this._react 'selenium.weinre', => + // this._runTransaction 'selenium.weinre', + // targetHost: conf.weinre.httpHost + // targetPort: conf.weinre.httpPort + }) + + server.listen(options.port) log.info('Listening on port %d', options.port) } diff --git a/lib/roles/device.js b/lib/roles/device.js index e6d938af..08044eff 100644 --- a/lib/roles/device.js +++ b/lib/roles/device.js @@ -346,6 +346,17 @@ module.exports = function(options) { wireutil.makeJoinGroupMessage(options.serial)]) } break + case wire.MessageType.UNGROUP: + var message = wire.UngroupMessage.decode(wrapper.message) + , groupChannel = message.channel + if (devutil.matchesRequirements(identity, message.requirements)) { + channels.unregister(groupChannel) + log.info('Unsubscribing from group channel "%s"', groupChannel) + sub.unsubscribe(groupChannel) + push.send([groupChannel, + wireutil.makeLeaveGroupMessage(options.serial)]) + } + break case wire.MessageType.SHELL_COMMAND: var message = wire.ShellCommandMessage.decode(wrapper.message) log.info('Running shell command "%s"', message.command.join(' ')) diff --git a/lib/util/wireutil.js b/lib/util/wireutil.js index 183cd12e..446eb09e 100644 --- a/lib/util/wireutil.js +++ b/lib/util/wireutil.js @@ -47,6 +47,10 @@ module.exports = function(wire) { return wireutil.envelope(wire.MessageType.GROUP, message) } + , makeUngroupMessage: function(requirements) { + var message = new wire.UngroupMessage(requirements) + return wireutil.envelope(wire.MessageType.UNGROUP, message) + } , makeJoinGroupMessage: function(serial) { var message = new wire.JoinGroupMessage(serial) return wireutil.envelope(wire.MessageType.JOIN_GROUP, message) diff --git a/lib/wire/wire.proto b/lib/wire/wire.proto index aea1ddc7..a5ec7cdd 100644 --- a/lib/wire/wire.proto +++ b/lib/wire/wire.proto @@ -6,6 +6,7 @@ enum MessageType { DEVICE_TYPE = 3; DEVICE_PROPERTIES = 4; GROUP = 5; + UNGROUP = 15; JOIN_GROUP = 6; LEAVE_GROUP = 7; PROBE = 8; @@ -150,6 +151,11 @@ message GroupMessage { repeated DeviceRequirement requirements = 3; } +message UngroupMessage { + required string channel = 1; + repeated DeviceRequirement requirements = 2; +} + message JoinGroupMessage { required string serial = 1; } diff --git a/res/app/scripts/app.js b/res/app/scripts/app.js index 9e34d041..9bb5b8cb 100644 --- a/res/app/scripts/app.js +++ b/res/app/scripts/app.js @@ -1,11 +1,13 @@ define([ 'angular' , './controllers/index' + , './services/index' ] , function(ng) { return ng.module('app', [ 'ngRoute' , 'app.controllers' + , 'app.services' ]) } ) diff --git a/res/app/scripts/controllers/DeviceListCtrl.js b/res/app/scripts/controllers/DeviceListCtrl.js new file mode 100644 index 00000000..56fa1b4b --- /dev/null +++ b/res/app/scripts/controllers/DeviceListCtrl.js @@ -0,0 +1,5 @@ +define(['./module'], function(mod) { + mod.controller('DeviceListCtrl', ['$scope', '$http', 'devices', function($scope, $http) { + + }]) +}) diff --git a/res/app/scripts/controllers/index.js b/res/app/scripts/controllers/index.js index 44e3831b..962c0964 100644 --- a/res/app/scripts/controllers/index.js +++ b/res/app/scripts/controllers/index.js @@ -1,4 +1,5 @@ define([ + './DeviceListCtrl' ] , function() { } diff --git a/res/app/scripts/main.js b/res/app/scripts/main.js index 8fd793b2..e0eddecc 100644 --- a/res/app/scripts/main.js +++ b/res/app/scripts/main.js @@ -2,6 +2,7 @@ require.config({ paths: { 'angular': '../lib/angular/angular' , 'angular-route': '../lib/angular-route/angular-route' + , 'socket.io': '../lib/socket.io-client/dist/socket.io' } , shim: { 'angular': { diff --git a/res/app/scripts/routes.js b/res/app/scripts/routes.js index d207b550..37f480ea 100644 --- a/res/app/scripts/routes.js +++ b/res/app/scripts/routes.js @@ -6,8 +6,8 @@ define(['./app'], function(app) { $locationProvider.html5Mode(true) $routeProvider .when('/', { - templateUrl: 'partials/signin' - , controller: 'SignInCtrl' + templateUrl: 'partials/deviceList' + , controller: 'DeviceListCtrl' }) .otherwise({ redirectTo: '/' diff --git a/res/app/scripts/services/devices.js b/res/app/scripts/services/devices.js new file mode 100644 index 00000000..2f0ab86a --- /dev/null +++ b/res/app/scripts/services/devices.js @@ -0,0 +1,5 @@ +define(['./module'], function(mod) { + mod.factory('devices', ['io', function(io) { + return {} + }]) +}) diff --git a/res/app/scripts/services/index.js b/res/app/scripts/services/index.js new file mode 100644 index 00000000..1d90a898 --- /dev/null +++ b/res/app/scripts/services/index.js @@ -0,0 +1,7 @@ +define([ + './io' + , './devices' + ] +, function() { + } +) diff --git a/res/app/scripts/services/io.js b/res/app/scripts/services/io.js new file mode 100644 index 00000000..c88138b1 --- /dev/null +++ b/res/app/scripts/services/io.js @@ -0,0 +1,5 @@ +define(['./module', 'socket.io'], function(mod, io) { + mod.factory('io', [function() { + return io.connect() + }]) +}) diff --git a/res/app/scripts/services/module.js b/res/app/scripts/services/module.js new file mode 100644 index 00000000..242eb33f --- /dev/null +++ b/res/app/scripts/services/module.js @@ -0,0 +1,3 @@ +define(['angular'], function(ng) { + return ng.module('app.services', []) +}) diff --git a/res/app/views/partials/deviceList.jade b/res/app/views/partials/deviceList.jade new file mode 100644 index 00000000..7d9121cd --- /dev/null +++ b/res/app/views/partials/deviceList.jade @@ -0,0 +1 @@ +h1 Devices