mirror of
https://github.com/openstf/stf
synced 2025-10-04 18:29:17 +02:00
551 lines
16 KiB
JavaScript
551 lines
16 KiB
JavaScript
var util = require('util')
|
|
|
|
var Promise = require('bluebird')
|
|
var syrup = require('stf-syrup')
|
|
var WebSocket = require('ws')
|
|
var uuid = require('node-uuid')
|
|
var EventEmitter = require('eventemitter3').EventEmitter
|
|
var split = require('split')
|
|
var adbkit = require('adbkit')
|
|
|
|
var logger = require('../../../../util/logger')
|
|
var lifecycle = require('../../../../util/lifecycle')
|
|
var bannerutil = require('./util/banner')
|
|
var FrameParser = require('./util/frameparser')
|
|
var FrameConfig = require('./util/frameconfig')
|
|
var BroadcastSet = require('./util/broadcastset')
|
|
var StateQueue = require('../../../../util/statequeue')
|
|
var RiskyStream = require('../../../../util/riskystream')
|
|
var FailCounter = require('../../../../util/failcounter')
|
|
|
|
module.exports = syrup.serial()
|
|
.dependency(require('../../support/adb'))
|
|
.dependency(require('../../resources/minicap'))
|
|
.dependency(require('../display'))
|
|
.dependency(require('./options'))
|
|
.define(function(options, adb, minicap, display, screenOptions) {
|
|
var log = logger.createLogger('device:plugins:screen:stream')
|
|
var plugin = Object.create(null)
|
|
|
|
function FrameProducer(config) {
|
|
EventEmitter.call(this)
|
|
this.actionQueue = []
|
|
this.runningState = FrameProducer.STATE_STOPPED
|
|
this.desiredState = new StateQueue()
|
|
this.output = null
|
|
this.socket = null
|
|
this.banner = null
|
|
this.parser = null
|
|
this.frameConfig = config
|
|
this.readable = false
|
|
this.needsReadable = false
|
|
this.starter = Promise.resolve(true)
|
|
this.failCounter = new FailCounter(3, 10000)
|
|
this.failCounter.on('exceedLimit', this._failLimitExceeded.bind(this))
|
|
this.failed = false
|
|
this.readableListener = this._readableListener.bind(this)
|
|
}
|
|
|
|
util.inherits(FrameProducer, EventEmitter)
|
|
|
|
FrameProducer.STATE_STOPPED = 1
|
|
FrameProducer.STATE_STARTING = 2
|
|
FrameProducer.STATE_STARTED = 3
|
|
FrameProducer.STATE_STOPPING = 4
|
|
|
|
FrameProducer.prototype._ensureState = function() {
|
|
if (this.desiredState.empty()) {
|
|
return
|
|
}
|
|
|
|
if (this.failed) {
|
|
log.warn('Will not apply desired state due to too many failures')
|
|
return
|
|
}
|
|
|
|
switch (this.runningState) {
|
|
case FrameProducer.STATE_STARTING:
|
|
case FrameProducer.STATE_STOPPING:
|
|
// Just wait.
|
|
break
|
|
case FrameProducer.STATE_STOPPED:
|
|
if (this.desiredState.next() === FrameProducer.STATE_STARTED) {
|
|
this.runningState = FrameProducer.STATE_STARTING
|
|
this.starter = this._startService().bind(this)
|
|
.then(function(out) {
|
|
this.output = new RiskyStream(out)
|
|
.on('unexpectedEnd', this._outputEnded.bind(this))
|
|
return this._readOutput(this.output.stream)
|
|
})
|
|
.then(function() {
|
|
return this._connectService()
|
|
})
|
|
.then(function(socket) {
|
|
this.parser = new FrameParser()
|
|
this.socket = new RiskyStream(socket)
|
|
.on('unexpectedEnd', this._socketEnded.bind(this))
|
|
return this._readBanner(this.socket.stream)
|
|
})
|
|
.then(function(banner) {
|
|
this.banner = banner
|
|
return this._readFrames(this.socket.stream)
|
|
})
|
|
.then(function() {
|
|
this.runningState = FrameProducer.STATE_STARTED
|
|
this.emit('start')
|
|
})
|
|
.catch(Promise.CancellationError, function() {
|
|
return this._stop()
|
|
})
|
|
.catch(function(err) {
|
|
return this._stop().finally(function() {
|
|
this.failCounter.inc()
|
|
this.emit('error', err)
|
|
})
|
|
})
|
|
.finally(function() {
|
|
this._ensureState()
|
|
})
|
|
}
|
|
else {
|
|
setImmediate(this._ensureState.bind(this))
|
|
}
|
|
break
|
|
case FrameProducer.STATE_STARTED:
|
|
if (this.desiredState.next() === FrameProducer.STATE_STOPPED) {
|
|
this.runningState = FrameProducer.STATE_STOPPING
|
|
this._stop().finally(function() {
|
|
this._ensureState()
|
|
})
|
|
}
|
|
else {
|
|
setImmediate(this._ensureState.bind(this))
|
|
}
|
|
break
|
|
}
|
|
}
|
|
|
|
FrameProducer.prototype.start = function() {
|
|
log.info('Requesting frame producer to start')
|
|
this.desiredState.push(FrameProducer.STATE_STARTED)
|
|
this._ensureState()
|
|
}
|
|
|
|
FrameProducer.prototype.stop = function() {
|
|
log.info('Requesting frame producer to stop')
|
|
this.desiredState.push(FrameProducer.STATE_STOPPED)
|
|
this._ensureState()
|
|
}
|
|
|
|
FrameProducer.prototype.restart = function() {
|
|
switch (this.runningState) {
|
|
case FrameProducer.STATE_STARTED:
|
|
case FrameProducer.STATE_STARTING:
|
|
this.starter.cancel()
|
|
this.desiredState.push(FrameProducer.STATE_STOPPED)
|
|
this.desiredState.push(FrameProducer.STATE_STARTED)
|
|
this._ensureState()
|
|
break
|
|
}
|
|
}
|
|
|
|
FrameProducer.prototype.updateRotation = function(rotation) {
|
|
if (this.frameConfig.rotation === rotation) {
|
|
log.info('Keeping %d as current frame producer rotation', rotation)
|
|
return
|
|
}
|
|
|
|
log.info('Setting frame producer rotation to %d', rotation)
|
|
this.frameConfig.rotation = rotation
|
|
this._configChanged()
|
|
}
|
|
|
|
FrameProducer.prototype.updateProjection = function(width, height) {
|
|
if (this.frameConfig.virtualWidth === width &&
|
|
this.frameConfig.virtualHeight === height) {
|
|
log.info(
|
|
'Keeping %dx%d as current frame producer projection', width, height)
|
|
return
|
|
}
|
|
|
|
log.info('Setting frame producer projection to %dx%d', width, height)
|
|
this.frameConfig.virtualWidth = width
|
|
this.frameConfig.virtualHeight = height
|
|
this._configChanged()
|
|
}
|
|
|
|
FrameProducer.prototype.nextFrame = function() {
|
|
var frame = null, chunk
|
|
|
|
if (this.parser) {
|
|
while ((frame = this.parser.nextFrame()) === null) {
|
|
if ((chunk = this.socket.stream.read())) {
|
|
this.parser.push(chunk)
|
|
}
|
|
else {
|
|
this.readable = false
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
return frame
|
|
}
|
|
|
|
FrameProducer.prototype.needFrame = function() {
|
|
this.needsReadable = true
|
|
this._maybeEmitReadable()
|
|
}
|
|
|
|
FrameProducer.prototype._configChanged = function() {
|
|
this.restart()
|
|
}
|
|
|
|
FrameProducer.prototype._socketEnded = function() {
|
|
log.warn('Connection to minicap ended unexpectedly')
|
|
this.failCounter.inc()
|
|
this.restart()
|
|
}
|
|
|
|
FrameProducer.prototype._outputEnded = function() {
|
|
log.warn('Shell keeping minicap running ended unexpectedly')
|
|
this.failCounter.inc()
|
|
this.restart()
|
|
}
|
|
|
|
FrameProducer.prototype._failLimitExceeded = function(limit, time) {
|
|
this._stop()
|
|
this.failed = true
|
|
this.emit('error', new Error(util.format(
|
|
'Failed more than %d times in %dms'
|
|
, limit
|
|
, time
|
|
)))
|
|
}
|
|
|
|
FrameProducer.prototype._startService = function() {
|
|
log.info('Launching screen service')
|
|
return minicap.run(util.format('-S -P %s', this.frameConfig.toString()))
|
|
.timeout(10000)
|
|
}
|
|
|
|
FrameProducer.prototype._readOutput = function(out) {
|
|
out.pipe(split()).on('data', function(line) {
|
|
var trimmed = line.toString().trim()
|
|
|
|
if (trimmed === '') {
|
|
return
|
|
}
|
|
|
|
if (/ERROR/.test(line)) {
|
|
log.fatal('minicap error: "%s"', line)
|
|
return lifecycle.fatal()
|
|
}
|
|
|
|
log.info('minicap says: "%s"', line)
|
|
})
|
|
}
|
|
|
|
FrameProducer.prototype._connectService = function() {
|
|
function tryConnect(times, delay) {
|
|
return adb.openLocal(options.serial, 'localabstract:minicap')
|
|
.timeout(10000)
|
|
.then(function(out) {
|
|
return out
|
|
})
|
|
.catch(function(err) {
|
|
if (/closed/.test(err.message) && times > 1) {
|
|
return Promise.delay(delay)
|
|
.then(function() {
|
|
return tryConnect(--times, delay * 2)
|
|
})
|
|
}
|
|
return Promise.reject(err)
|
|
})
|
|
}
|
|
log.info('Connecting to minicap service')
|
|
return tryConnect(5, 100)
|
|
}
|
|
|
|
FrameProducer.prototype._stop = function() {
|
|
return this._disconnectService(this.socket).bind(this)
|
|
.timeout(2000)
|
|
.then(function() {
|
|
return this._stopService(this.output).timeout(10000)
|
|
})
|
|
.then(function() {
|
|
this.runningState = FrameProducer.STATE_STOPPED
|
|
this.emit('stop')
|
|
})
|
|
.catch(function(err) {
|
|
// In practice we _should_ never get here due to _stopService()
|
|
// being quite aggressive. But if we do, well... assume it
|
|
// stopped anyway for now.
|
|
this.runningState = FrameProducer.STATE_STOPPED
|
|
this.emit('error', err)
|
|
this.emit('stop')
|
|
})
|
|
.finally(function() {
|
|
this.output = null
|
|
this.socket = null
|
|
this.banner = null
|
|
this.parser = null
|
|
})
|
|
}
|
|
|
|
FrameProducer.prototype._disconnectService = function(socket) {
|
|
log.info('Disconnecting from minicap service')
|
|
|
|
if (!socket || socket.ended) {
|
|
return Promise.resolve(true)
|
|
}
|
|
|
|
socket.stream.removeListener('readable', this.readableListener)
|
|
|
|
var endListener
|
|
return new Promise(function(resolve/*, reject*/) {
|
|
socket.on('end', endListener = function() {
|
|
resolve(true)
|
|
})
|
|
|
|
socket.stream.resume()
|
|
socket.end()
|
|
})
|
|
.finally(function() {
|
|
socket.removeListener('end', endListener)
|
|
})
|
|
}
|
|
|
|
FrameProducer.prototype._stopService = function(output) {
|
|
log.info('Stopping minicap service')
|
|
|
|
if (!output || output.ended) {
|
|
return Promise.resolve(true)
|
|
}
|
|
|
|
var pid = this.banner ? this.banner.pid : -1
|
|
|
|
function waitForEnd() {
|
|
var endListener
|
|
return new Promise(function(resolve/*, reject*/) {
|
|
output.expectEnd().on('end', endListener = function() {
|
|
resolve(true)
|
|
})
|
|
})
|
|
.finally(function() {
|
|
output.removeListener('end', endListener)
|
|
})
|
|
}
|
|
|
|
function kill(signal) {
|
|
if (pid <= 0) {
|
|
return Promise.reject(new Error('Minicap service pid is unknown'))
|
|
}
|
|
|
|
log.info('Sending SIGTERM to minicap')
|
|
return Promise.all([
|
|
waitForEnd()
|
|
, adb.shell(options.serial, ['kill', signal, pid])
|
|
.then(adbkit.util.readAll)
|
|
.timeout(2000)
|
|
.return(true)
|
|
])
|
|
}
|
|
|
|
function kindKill() {
|
|
return kill('-15')
|
|
}
|
|
|
|
function forceKill() {
|
|
return kill('-9')
|
|
}
|
|
|
|
function forceEnd() {
|
|
log.info('Ending minicap I/O as a last resort')
|
|
output.end()
|
|
return Promise.resolve(true)
|
|
}
|
|
|
|
return kindKill()
|
|
.catch(Promise.TimeoutError, forceKill)
|
|
.catch(forceEnd)
|
|
}
|
|
|
|
FrameProducer.prototype._readBanner = function(socket) {
|
|
log.info('Reading minicap banner')
|
|
return bannerutil.read(socket).timeout(2000)
|
|
}
|
|
|
|
FrameProducer.prototype._readFrames = function(socket) {
|
|
this.needsReadable = true
|
|
socket.on('readable', this.readableListener)
|
|
|
|
// We may already have data pending. Let the user know they should
|
|
// at least attempt to read frames now.
|
|
this.readableListener()
|
|
}
|
|
|
|
FrameProducer.prototype._maybeEmitReadable = function() {
|
|
if (this.readable && this.needsReadable) {
|
|
this.needsReadable = false
|
|
this.emit('readable')
|
|
}
|
|
}
|
|
|
|
FrameProducer.prototype._readableListener = function() {
|
|
this.readable = true
|
|
this._maybeEmitReadable()
|
|
}
|
|
|
|
function createServer() {
|
|
log.info('Starting WebSocket server on port %d', screenOptions.publicPort)
|
|
|
|
var wss = new WebSocket.Server({
|
|
port: screenOptions.publicPort
|
|
, perMessageDeflate: false
|
|
})
|
|
|
|
var listeningListener, errorListener
|
|
return new Promise(function(resolve, reject) {
|
|
listeningListener = function() {
|
|
return resolve(wss)
|
|
}
|
|
|
|
errorListener = function(err) {
|
|
return reject(err)
|
|
}
|
|
|
|
wss.on('listening', listeningListener)
|
|
wss.on('error', errorListener)
|
|
})
|
|
.finally(function() {
|
|
wss.removeListener('listening', listeningListener)
|
|
wss.removeListener('error', errorListener)
|
|
})
|
|
}
|
|
|
|
return createServer()
|
|
.then(function(wss) {
|
|
var broadcastSet = new BroadcastSet()
|
|
var frameProducer = new FrameProducer(
|
|
new FrameConfig(display.properties, display.properties))
|
|
|
|
broadcastSet.on('nonempty', function() {
|
|
frameProducer.start()
|
|
})
|
|
|
|
broadcastSet.on('empty', function() {
|
|
frameProducer.stop()
|
|
})
|
|
|
|
display.on('rotationChange', function(newRotation) {
|
|
frameProducer.updateRotation(newRotation)
|
|
})
|
|
|
|
frameProducer.on('start', function() {
|
|
var message = util.format(
|
|
'start %s'
|
|
, JSON.stringify(frameProducer.banner)
|
|
)
|
|
|
|
broadcastSet.keys().forEach(function(id) {
|
|
var ws = broadcastSet.get(id)
|
|
switch (ws.readyState) {
|
|
case WebSocket.OPENING:
|
|
// This should never happen.
|
|
log.warn('Unable to send banner to OPENING client "%s"', id)
|
|
break
|
|
case WebSocket.OPEN:
|
|
// This is what SHOULD happen.
|
|
ws.send(message)
|
|
break
|
|
case WebSocket.CLOSING:
|
|
// Ok, a 'close' event should remove the client from the set
|
|
// soon.
|
|
break
|
|
case WebSocket.CLOSED:
|
|
// This should never happen.
|
|
log.warn('Unable to send banner to CLOSED client "%s"', id)
|
|
broadcastSet.remove(id)
|
|
break
|
|
}
|
|
})
|
|
})
|
|
|
|
frameProducer.on('readable', function next() {
|
|
var frame
|
|
if ((frame = frameProducer.nextFrame())) {
|
|
Promise.settle([broadcastSet.keys().map(function(id) {
|
|
return new Promise(function(resolve, reject) {
|
|
var ws = broadcastSet.get(id)
|
|
switch (ws.readyState) {
|
|
case WebSocket.OPENING:
|
|
// This should never happen.
|
|
return reject(new Error(util.format(
|
|
'Unable to send frame to OPENING client "%s"', id)))
|
|
case WebSocket.OPEN:
|
|
// This is what SHOULD happen.
|
|
ws.send(frame, {
|
|
binary: true
|
|
}, function(err) {
|
|
return err ? reject(err) : resolve()
|
|
})
|
|
return
|
|
case WebSocket.CLOSING:
|
|
// Ok, a 'close' event should remove the client from the set
|
|
// soon.
|
|
return
|
|
case WebSocket.CLOSED:
|
|
// This should never happen.
|
|
broadcastSet.remove(id)
|
|
return reject(new Error(util.format(
|
|
'Unable to send frame to CLOSED client "%s"', id)))
|
|
}
|
|
})
|
|
})]).then(next)
|
|
}
|
|
else {
|
|
frameProducer.needFrame()
|
|
}
|
|
})
|
|
|
|
frameProducer.on('error', function(err) {
|
|
log.fatal('Frame producer had an error', err.stack)
|
|
lifecycle.fatal()
|
|
})
|
|
|
|
wss.on('connection', function(ws) {
|
|
var id = uuid.v4()
|
|
|
|
ws.on('message', function(data) {
|
|
var match
|
|
if ((match = /^(on|off|(size) ([0-9]+)x([0-9]+))$/.exec(data))) {
|
|
switch (match[2] || match[1]) {
|
|
case 'on':
|
|
broadcastSet.insert(id, ws)
|
|
break
|
|
case 'off':
|
|
broadcastSet.remove(id)
|
|
break
|
|
case 'size':
|
|
frameProducer.updateProjection(+match[3], +match[4])
|
|
break
|
|
}
|
|
}
|
|
})
|
|
|
|
ws.on('close', function() {
|
|
broadcastSet.remove(id)
|
|
})
|
|
})
|
|
|
|
lifecycle.observe(function() {
|
|
wss.close()
|
|
})
|
|
|
|
lifecycle.observe(function() {
|
|
frameProducer.stop()
|
|
})
|
|
})
|
|
.return(plugin)
|
|
})
|