1
0
Fork 0
mirror of https://github.com/openstf/stf synced 2025-10-04 18:29:17 +02:00
OpenSTF/lib/roles/app.js
2014-06-03 21:00:00 +09:00

771 lines
22 KiB
JavaScript

var http = require('http')
var events = require('events')
var path = require('path')
var util = require('util')
var express = require('express')
var validator = require('express-validator')
var cookieSession = require('cookie-session')
var bodyParser = require('body-parser')
var serveFavicon = require('serve-favicon')
var serveStatic = require('serve-static')
var csrf = require('csurf')
var socketio = require('socket.io')
var zmq = require('zmq')
var Promise = require('bluebird')
var httpProxy = require('http-proxy')
var _ = require('lodash')
var request = Promise.promisifyAll(require('request'))
var proxyaddr = require('proxy-addr')
var logger = require('../util/logger')
var pathutil = require('../util/pathutil')
var wire = require('../wire')
var wireutil = require('../wire/util')
var wirerouter = require('../wire/router')
var dbapi = require('../db/api')
var datautil = require('../util/datautil')
var auth = require('../middleware/auth')
var webpack = require('../middleware/webpack')
var deviceIconMiddleware = require('../middleware/device-icons')
var browserIconMiddleware = require('../middleware/browser-icons')
module.exports = function(options) {
var log = logger.createLogger('app')
, app = express()
, server = http.createServer(app)
, io = socketio.listen(server, {
serveClient: false
, transports: ['websocket']
})
, channelRouter = new events.EventEmitter()
, proxy = httpProxy.createProxyServer()
, sessionMiddleware
proxy.on('error', function(err) {
log.error('Proxy had an error', err.stack)
})
app.set('view engine', 'jade')
app.set('views', pathutil.resource('app/views'))
app.set('strict routing', true)
app.set('case sensitive routing', true)
if (!options.disableWatch) {
app.use('/static/build', webpack({
debug: true
, devtool: 'eval'
}))
}
app.use('/static/bower_components',
serveStatic(pathutil.resource('bower_components')))
app.use('/intro',
serveStatic(pathutil.resource('bower_components/stf-site/intro')))
app.use('/manual-basic',
serveStatic(pathutil.resource('bower_components/stf-site/manual/basic')))
app.use('/manual-advanced',
serveStatic(pathutil.resource('bower_components/stf-site/manual/advanced')))
app.use('/v2-features',
serveStatic(pathutil.resource('bower_components/stf-site/v2-features')))
app.use('/static/data', serveStatic(pathutil.resource('data')))
app.use('/static/build', serveStatic(pathutil.resource('build')))
app.use('/static/status', serveStatic(pathutil.resource('common/status')))
app.use('/static/browsers', browserIconMiddleware())
app.use('/static/devices', deviceIconMiddleware())
app.use('/static', serveStatic(pathutil.resource('app')))
app.use(serveFavicon(pathutil.resource(
'bower_components/stf-graphics/logo/exports/STF-128.png')))
app.use(sessionMiddleware = cookieSession({
name: options.ssid
, keys: [options.secret]
}))
app.use(auth({
secret: options.secret
, authUrl: options.authUrl
}))
// Proxied requests must come before any body parsers. These proxies are
// here mainly for convenience, they should be replaced with proper reverse
// proxies in production.
app.all('/api/v1/s/image/*', function(req, res) {
proxy.web(req, res, {
target: options.storagePluginImageUrl
})
})
app.all('/api/v1/s/apk/*', function(req, res) {
proxy.web(req, res, {
target: options.storagePluginApkUrl
})
})
app.all('/api/v1/s/*', function(req, res) {
proxy.web(req, res, {
target: options.storageUrl
})
})
app.use(bodyParser.json())
app.use(csrf())
app.use(validator())
// Output
var push = zmq.socket('push')
options.endpoints.push.forEach(function(endpoint) {
log.info('Sending output to %s', endpoint)
push.connect(endpoint)
})
// Input
var sub = zmq.socket('sub')
options.endpoints.sub.forEach(function(endpoint) {
log.info('Receiving input from %s', endpoint)
sub.connect(endpoint)
})
// Establish always-on channels
;[wireutil.global].forEach(function(channel) {
log.info('Subscribing to permanent channel "%s"', channel)
sub.subscribe(channel)
})
sub.on('message', function(channel, data) {
channelRouter.emit(channel.toString(), channel, data)
})
app.get('/partials/*', function(req, res) {
var whitelist = {
'devices/index': true
, 'devices/control': true
, 'devices/screen': true
}
if (whitelist.hasOwnProperty(req.params[0])) {
res.render(path.join('partials', req.params[0]))
}
else {
res.send(404)
}
})
app.get('/', function(req, res) {
res.render('index')
})
app.get('/api/v1/user', function(req, res) {
res.json({
success: true
, user: req.user
})
})
app.get('/api/v1/group', function(req, res) {
dbapi.loadGroup(req.user.email)
.then(function(cursor) {
return Promise.promisify(cursor.toArray, cursor)()
.then(function(list) {
list.forEach(function(device) {
datautil.normalize(device, req.user)
})
res.json({
success: true
, devices: list
})
})
})
.catch(function(err) {
log.error('Failed to load group: ', err.stack)
res.json(500, {
success: false
})
})
})
app.get('/api/v1/devices', function(req, res) {
dbapi.loadDevices()
.then(function(cursor) {
return Promise.promisify(cursor.toArray, cursor)()
.then(function(list) {
list.forEach(function(device) {
datautil.normalize(device, req.user)
})
res.json({
success: true
, devices: list
})
})
})
.catch(function(err) {
log.error('Failed to load device list: ', err.stack)
res.json(500, {
success: false
})
})
})
app.get('/api/v1/devices/:serial', function(req, res) {
dbapi.loadDevice(req.params.serial)
.then(function(device) {
if (device) {
datautil.normalize(device, req.user)
res.json({
success: true
, device: device
})
}
else {
res.json(404, {
success: false
})
}
})
.catch(function(err) {
log.error('Failed to load device "%s": ', req.params.serial, err.stack)
res.json(500, {
success: false
})
})
})
io.use(function(socket, next) {
var req = socket.request
, res = Object.create(null)
sessionMiddleware(req, res, next)
})
io.use(function(socket, next) {
var req = socket.request
// This is similar to what Express does behind the scenes
req.ip = proxyaddr(req, app.get('trust proxy fn'))
next()
})
io.use(function(socket, next) {
var req = socket.request
, token = req.session.jwt
if (token) {
return dbapi.loadUser(token.email)
.then(function(user) {
if (user) {
req.user = user
next()
}
else {
next(new Error('Invalid user'))
}
})
.catch(next)
}
else {
next(new Error('Missing authorization token'))
}
})
io.on('connection', function(socket) {
var channels = []
, user = socket.request.user
socket.emit('socket.ip', socket.request.ip)
function joinChannel(channel) {
channels.push(channel)
channelRouter.on(channel, messageListener)
sub.subscribe(channel)
}
function leaveChannel(channel) {
_.pull(channels, channel)
channelRouter.removeListener(channel, messageListener)
sub.unsubscribe(channel)
}
function createTouchHandler(Klass) {
return function(channel, data) {
push.send([
channel
, wireutil.envelope(new Klass(
data.seq
, data.x
, data.y
))
])
}
}
function createKeyHandler(Klass) {
return function(channel, data) {
push.send([
channel
, wireutil.envelope(new Klass(
data.key
))
])
}
}
var messageListener = wirerouter()
.on(wire.DeviceLogMessage, function(channel, message) {
socket.emit('device.log', message)
})
.on(wire.DevicePresentMessage, function(channel, message) {
socket.emit('device.add', {
important: true
, data: {
serial: message.serial
, present: true
}
})
})
.on(wire.DeviceAbsentMessage, function(channel, message) {
socket.emit('device.remove', {
important: true
, data: {
serial: message.serial
, present: false
, ready: false
, lastHeartbeatAt: null
, using: false
}
})
})
.on(wire.JoinGroupMessage, function(channel, message) {
socket.emit('device.change', {
important: true
, data: datautil.applyOwner({
serial: message.serial
, owner: message.owner
}
, user
)
})
})
.on(wire.LeaveGroupMessage, function(channel, message) {
socket.emit('device.change', {
important: true
, data: datautil.applyOwner({
serial: message.serial
, owner: null
}
, user
)
})
})
.on(wire.DeviceStatusMessage, function(channel, message) {
socket.emit('device.change', {
important: true
, data: message
})
})
.on(wire.DeviceIdentityMessage, function(channel, message) {
datautil.applyData(message)
message.ready = true
socket.emit('device.change', {
important: true
, data: message
})
})
.on(wire.TransactionProgressMessage, function(channel, message) {
socket.emit('tx.progress', channel.toString(), message)
})
.on(wire.TransactionDoneMessage, function(channel, message) {
socket.emit('tx.done', channel.toString(), message)
})
.on(wire.DeviceLogcatEntryMessage, function(channel, message) {
socket.emit('logcat.entry', message)
})
.on(wire.AirplaneModeEvent, function(channel, message) {
socket.emit('device.change', {
important: true
, data: {
serial: message.serial
, airplaneMode: message.enabled
}
})
})
.on(wire.BatteryEvent, function(channel, message) {
var serial = message.serial
delete message.serial
socket.emit('device.change', {
important: false
, data: {
serial: serial
, battery: message
}
})
})
.on(wire.DeviceBrowserMessage, function(channel, message) {
var serial = message.serial
delete message.serial
socket.emit('device.change', {
important: true
, data: datautil.applyBrowsers({
serial: serial
, browser: message
})
})
})
.on(wire.ConnectivityEvent, function(channel, message) {
var serial = message.serial
delete message.serial
socket.emit('device.change', {
important: false
, data: {
serial: serial
, network: message
}
})
})
.on(wire.PhoneStateEvent, function(channel, message) {
var serial = message.serial
delete message.serial
socket.emit('device.change', {
important: false
, data: {
serial: serial
, network: message
}
})
})
.on(wire.RotationEvent, function(channel, message) {
socket.emit('device.change', {
important: false
, data: {
serial: message.serial
, display: {
orientation: message.rotation
}
}
})
})
.handler()
// Global messages
//
// @todo Use socket.io to push global events to all clients instead
// of listening on every connection, otherwise we're very likely to
// hit EventEmitter's leak complaints (plus it's more work)
channelRouter.on(wireutil.global, messageListener)
// User's private group
joinChannel(user.group)
new Promise(function(resolve) {
socket.on('disconnect', resolve)
// Touch events
.on('input.touchDown', createTouchHandler(wire.TouchDownMessage))
.on('input.touchMove', createTouchHandler(wire.TouchMoveMessage))
.on('input.touchUp', createTouchHandler(wire.TouchUpMessage))
.on('input.tap', createTouchHandler(wire.TapMessage))
// Key events
.on('input.keyDown', createKeyHandler(wire.KeyDownMessage))
.on('input.keyUp', createKeyHandler(wire.KeyUpMessage))
.on('input.keyPress', createKeyHandler(wire.KeyPressMessage))
.on('input.type', function(channel, data) {
push.send([
channel
, wireutil.envelope(new wire.TypeMessage(
data.text
))
])
})
.on('display.rotate', function(channel, data) {
push.send([
channel
, wireutil.envelope(new wire.RotateMessage(
data.rotation
))
])
})
// Transactions
.on('clipboard.paste', function(channel, responseChannel, data) {
joinChannel(responseChannel)
push.send([
channel
, wireutil.transaction(
responseChannel
, new wire.PasteMessage(data.text)
)
])
})
.on('clipboard.copy', function(channel, responseChannel) {
joinChannel(responseChannel)
push.send([
channel
, wireutil.transaction(
responseChannel
, new wire.CopyMessage()
)
])
})
.on('device.identify', function(channel, responseChannel) {
push.send([
channel
, wireutil.transaction(
responseChannel
, new wire.PhysicalIdentifyMessage()
)
])
})
.on('group.invite', function(channel, responseChannel, data) {
joinChannel(responseChannel)
push.send([
channel
, wireutil.transaction(
responseChannel
, new wire.GroupMessage(
new wire.OwnerMessage(
user.email
, user.name
, user.group
)
, data.timeout || null
, wireutil.toDeviceRequirements(data.requirements)
)
)
])
})
.on('group.kick', function(channel, responseChannel, data) {
joinChannel(responseChannel)
push.send([
channel
, wireutil.transaction(
responseChannel
, new wire.UngroupMessage(
wireutil.toDeviceRequirements(data.requirements)
)
)
])
})
.on('tx.cleanup', function(channel) {
leaveChannel(channel)
})
.on('tx.punch', function(channel) {
joinChannel(channel)
socket.emit('tx.punch', channel)
})
.on('shell.command', function(channel, responseChannel, data) {
joinChannel(responseChannel)
push.send([
channel
, wireutil.transaction(
responseChannel
, new wire.ShellCommandMessage(data)
)
])
})
.on('shell.keepalive', function(channel, data) {
push.send([
channel
, wireutil.envelope(new wire.ShellKeepAliveMessage(data))
])
})
.on('device.install', function(channel, responseChannel, data) {
joinChannel(responseChannel)
push.send([
channel
, wireutil.transaction(
responseChannel
, new wire.InstallMessage(
data.href
, data.launch === true
, JSON.stringify(data.manifest)
)
)
])
})
.on('device.uninstall', function(channel, responseChannel, data) {
joinChannel(responseChannel)
push.send([
channel
, wireutil.transaction(
responseChannel
, new wire.UninstallMessage(data)
)
])
})
.on('storage.upload', function(channel, responseChannel, data) {
joinChannel(responseChannel)
request.postAsync({
url: util.format(
'%sapi/v1/resources?channel=%s'
, options.storageUrl
, responseChannel
)
, json: true
, body: {
url: data.url
}
})
.catch(function(err) {
log.error('Storage upload had an error', err.stack)
leaveChannel(responseChannel)
socket.emit('tx.cancel', responseChannel, {
success: false
, data: 'fail_upload'
})
})
})
.on('forward.test', function(channel, responseChannel, data) {
joinChannel(responseChannel)
if (!data.targetHost || data.targetHost === 'localhost') {
data.targetHost = ip
}
push.send([
channel
, wireutil.transaction(
responseChannel
, new wire.ForwardTestMessage(data)
)
])
})
.on('forward.create', function(channel, responseChannel, data) {
if (!data.targetHost || data.targetHost === 'localhost') {
data.targetHost = ip
}
dbapi.addUserForward(user.email, data)
.then(function() {
socket.emit('forward.create', data)
joinChannel(responseChannel)
push.send([
channel
, wireutil.transaction(
responseChannel
, new wire.ForwardCreateMessage(data)
)
])
})
})
.on('forward.remove', function(channel, responseChannel, data) {
dbapi.removeUserForward(user.email, data.devicePort)
.then(function() {
socket.emit('forward.remove', data)
joinChannel(responseChannel)
push.send([
channel
, wireutil.transaction(
responseChannel
, new wire.ForwardRemoveMessage(data)
)
])
})
})
.on('logcat.start', function(channel, responseChannel, data) {
joinChannel(responseChannel)
push.send([
channel
, wireutil.transaction(
responseChannel
, new wire.LogcatStartMessage(data)
)
])
})
.on('logcat.stop', function(channel, responseChannel) {
joinChannel(responseChannel)
push.send([
channel
, wireutil.transaction(
responseChannel
, new wire.LogcatStopMessage()
)
])
})
.on('browser.open', function(channel, responseChannel, data) {
joinChannel(responseChannel)
push.send([
channel
, wireutil.transaction(
responseChannel
, new wire.BrowserOpenMessage(data)
)
])
})
.on('browser.clear', function(channel, responseChannel, data) {
joinChannel(responseChannel)
push.send([
channel
, wireutil.transaction(
responseChannel
, new wire.BrowserClearMessage(data)
)
])
})
.on('store.open', function(channel, responseChannel) {
joinChannel(responseChannel)
push.send([
channel
, wireutil.transaction(
responseChannel
, new wire.StoreOpenMessage()
)
])
})
.on('screen.capture', function(channel, responseChannel) {
joinChannel(responseChannel)
push.send([
channel
, wireutil.transaction(
responseChannel
, new wire.ScreenCaptureMessage()
)
])
})
})
.finally(function() {
// Clean up all listeners and subscriptions
channelRouter.removeListener(wireutil.global, messageListener)
channels.forEach(function(channel) {
channelRouter.removeListener(channel, messageListener)
sub.unsubscribe(channel)
})
})
.catch(function(err) {
// Cannot guarantee integrity of client
log.error(
'Client had an error, disconnecting due to probable loss of integrity'
, err.stack
)
socket.disconnect(true)
})
/*
socket.on('flick', function(data) {})
socket.on('back', function(data) {})
socket.on('forward', function(data) {})
socket.on('refresh', function(data) {})
socket.on('internal.relaunch', function(data) {})
socket.on('internal.clear', function(data) {})
socket.on('selenium.setCookie', function(data) {})
socket.on('selenium.deleteCookie', function(data) {})
socket.on('selenium.deleteAllCookies', function(data) {})
socket.on('debug.benchmark.pull.start', function(data) {})
socket.on('debug.benchmark.pull.stop', function(data) {})
socket.on('logcat', function(data) {})
socket.on('debug.benchmark.pull.rate', function(data) {})
socket.on('cpu.monitor.load', function(data) {})
socket.on('safeExecute', function(data) {})
socket.on('eval', function(data) {})
socket.on('safeEval', function(data) {})
socket.on('executeAsync', function(data) {})
socket.on('safeExecuteAsync', function(data) {})
socket.on('execute', function(data) {})
socket.on('screenshot', function(data) {})
socket.on('selenium.screenshot', function(data) {})
socket.on('url', function(data) {})
socket.on('selenium.allCookies', function(data) {})
*/
//this._react 'selenium.weinre', =>
// this._runTransaction 'selenium.weinre',
// targetHost: conf.weinre.httpHost
// targetPort: conf.weinre.httpPort
})
server.listen(options.port)
log.info('Listening on port %d', options.port)
}