diff --git a/lib/units/device/plugins/screen/stream.js b/lib/units/device/plugins/screen/stream.js index f75069fa..bc9e5b7d 100644 --- a/lib/units/device/plugins/screen/stream.js +++ b/lib/units/device/plugins/screen/stream.js @@ -443,7 +443,7 @@ module.exports = syrup.serial() return createServer() .then(function(wss) { - var broadcastSet = new BroadcastSet() + var broadcastSet = plugin.broadcastSet = new BroadcastSet() var frameProducer = new FrameProducer( new FrameConfig(display.properties, display.properties)) @@ -460,32 +460,8 @@ module.exports = syrup.serial() }) frameProducer.on('start', function() { - var message = util.format( - 'start %s' - , JSON.stringify(frameProducer.banner) - ) - - broadcastSet.keys().forEach(function(id) { - var ws = broadcastSet.get(id) - switch (ws.readyState) { - case WebSocket.OPENING: - // This should never happen. - log.warn('Unable to send banner to OPENING client "%s"', id) - break - case WebSocket.OPEN: - // This is what SHOULD happen. - ws.send(message) - break - case WebSocket.CLOSING: - // Ok, a 'close' event should remove the client from the set - // soon. - break - case WebSocket.CLOSED: - // This should never happen. - log.warn('Unable to send banner to CLOSED client "%s"', id) - broadcastSet.remove(id) - break - } + broadcastSet.keys().map(function(id) { + return broadcastSet.get(id).onStart(frameProducer) }) }) @@ -493,32 +469,7 @@ module.exports = syrup.serial() var frame if ((frame = frameProducer.nextFrame())) { Promise.settle([broadcastSet.keys().map(function(id) { - return new Promise(function(resolve, reject) { - var ws = broadcastSet.get(id) - switch (ws.readyState) { - case WebSocket.OPENING: - // This should never happen. - return reject(new Error(util.format( - 'Unable to send frame to OPENING client "%s"', id))) - case WebSocket.OPEN: - // This is what SHOULD happen. - ws.send(frame, { - binary: true - }, function(err) { - return err ? reject(err) : resolve() - }) - return - case WebSocket.CLOSING: - // Ok, a 'close' event should remove the client from the set - // soon. - return - case WebSocket.CLOSED: - // This should never happen. - broadcastSet.remove(id) - return reject(new Error(util.format( - 'Unable to send frame to CLOSED client "%s"', id))) - } - }) + return broadcastSet.get(id).onFrame(frame) })]).then(next) } else { @@ -534,12 +485,74 @@ module.exports = syrup.serial() wss.on('connection', function(ws) { var id = uuid.v4() + function wsStartNotifier() { + return new Promise(function(resolve, reject) { + var message = util.format( + 'start %s' + , JSON.stringify(frameProducer.banner) + ) + + switch (ws.readyState) { + case WebSocket.OPENING: + // This should never happen. + log.warn('Unable to send banner to OPENING client "%s"', id) + break + case WebSocket.OPEN: + // This is what SHOULD happen. + ws.send(message, function(err) { + return err ? reject(err) : resolve() + }) + break + case WebSocket.CLOSING: + // Ok, a 'close' event should remove the client from the set + // soon. + break + case WebSocket.CLOSED: + // This should never happen. + log.warn('Unable to send banner to CLOSED client "%s"', id) + broadcastSet.remove(id) + break + } + }) + } + + function wsFrameNotifier(frame) { + return new Promise(function(resolve, reject) { + switch (ws.readyState) { + case WebSocket.OPENING: + // This should never happen. + return reject(new Error(util.format( + 'Unable to send frame to OPENING client "%s"', id))) + case WebSocket.OPEN: + // This is what SHOULD happen. + ws.send(frame, { + binary: true + }, function(err) { + return err ? reject(err) : resolve() + }) + return + case WebSocket.CLOSING: + // Ok, a 'close' event should remove the client from the set + // soon. + return + case WebSocket.CLOSED: + // This should never happen. + broadcastSet.remove(id) + return reject(new Error(util.format( + 'Unable to send frame to CLOSED client "%s"', id))) + } + }) + } + ws.on('message', function(data) { var match if ((match = /^(on|off|(size) ([0-9]+)x([0-9]+))$/.exec(data))) { switch (match[2] || match[1]) { case 'on': - broadcastSet.insert(id, ws) + broadcastSet.insert(id, { + onStart: wsStartNotifier + , onFrame: wsFrameNotifier + }) break case 'off': broadcastSet.remove(id) diff --git a/lib/units/device/plugins/vnc/index.js b/lib/units/device/plugins/vnc/index.js index 51b93286..66581f3b 100644 --- a/lib/units/device/plugins/vnc/index.js +++ b/lib/units/device/plugins/vnc/index.js @@ -1,10 +1,125 @@ +var net = require('net') +var util = require('util') + var syrup = require('stf-syrup') var Promise = require('bluebird') -var _ = require('lodash') +var uuid = require('node-uuid') +var jpeg = require('jpeg-js') -var logger = require('../../../util/logger') +var logger = require('../../../../util/logger') +var lifecycle = require('../../../../util/lifecycle') + +var VncServer = require('./util/server') +var VncConnection = require('./util/connection') module.exports = syrup.serial() - .define(function(options) { + .dependency(require('../screen/stream')) + .define(function(options, screenStream) { + var log = logger.createLogger('device:plugins:vnc') + function createServer() { + log.info('Starting VNC server on port %d', options.vncPort) + + var vnc = new VncServer(net.createServer({ + allowHalfOpen: true + })) + + var listeningListener, errorListener + return new Promise(function(resolve, reject) { + listeningListener = function() { + return resolve(vnc) + } + + errorListener = function(err) { + return reject(err) + } + + vnc.on('listening', listeningListener) + vnc.on('error', errorListener) + + vnc.listen(options.vncPort) + }) + .finally(function() { + vnc.removeListener('listening', listeningListener) + vnc.removeListener('error', errorListener) + }) + } + + return createServer() + .then(function(vnc) { + vnc.on('connection', function(conn) { + var id = util.format('vnc-%s', uuid.v4()) + + var connState = { + lastFrame: null + , lastFrameTime: null + , frameWidth: 0 + , frameHeight: 0 + , sentFrameTime: null + , updateRequests: 0 + } + + function vncStartListener(frameProducer) { + return new Promise(function(resolve/*, reject*/) { + connState.frameWidth = frameProducer.banner.virtualWidth + connState.frameHeight = frameProducer.banner.virtualHeight + resolve() + }) + } + + function vncFrameListener(frame) { + return new Promise(function(resolve/*, reject*/) { + connState.lastFrame = frame + connState.lastFrameTime = Date.now() + maybeSendFrame() + resolve() + }) + } + + function maybeSendFrame() { + if (!connState.updateRequests) { + return + } + + if (!connState.lastFrame) { + return + } + + if (connState.lastFrameTime === connState.sentFrameTime) { + return + } + + var decoded = jpeg.decode(connState.lastFrame) + conn.writeFramebufferUpdate([{ + xPosition: 0 + , yPosition: 0 + , width: connState.frameWidth + , height: connState.frameHeight + , encodingType: VncConnection.ENCODING_RAW + , data: decoded.data + }]) + + connState.updateRequests = 0 + connState.sentFrameTime = connState.lastFrameTime + } + + screenStream.broadcastSet.insert(id, { + onStart: vncStartListener + , onFrame: vncFrameListener + }) + + conn.on('fbupdaterequest', function() { + connState.updateRequests += 1 + maybeSendFrame() + }) + + conn.on('close', function() { + screenStream.broadcastSet.remove(id) + }) + }) + + lifecycle.observe(function() { + vnc.close() + }) + }) }) diff --git a/lib/units/device/plugins/vnc/util/connection.js b/lib/units/device/plugins/vnc/util/connection.js index 8e55fa4b..80ac88e8 100644 --- a/lib/units/device/plugins/vnc/util/connection.js +++ b/lib/units/device/plugins/vnc/util/connection.js @@ -7,7 +7,10 @@ var PixelFormat = require('./pixelformat') function VncConnection(conn) { this._bound = { - _readableListener: this._readableListener.bind(this) + _errorListener: this._errorListener.bind(this) + , _readableListener: this._readableListener.bind(this) + , _endListener: this._endListener.bind(this) + , _closeListener: this._closeListener.bind(this) } this._buffer = null @@ -16,12 +19,12 @@ function VncConnection(conn) { this._serverVersion = VncConnection.V3_008 this._serverSupportedSecurity = [VncConnection.SECURITY_NONE] - this._serverWidth = 800 - this._serverHeight = 600 + this._serverWidth = 720 + this._serverHeight = 1280 this._serverPixelFormat = new PixelFormat({ bitsPerPixel: 32 , depth: 24 - , bigEndianFlag: 1 + , bigEndianFlag: 0 , trueColorFlag: 1 , redMax: 255 , greenMax: 255 @@ -30,6 +33,7 @@ function VncConnection(conn) { , greenShift: 8 , blueShift: 0 }) + this._requireServerPixelFormat = true this._serverName = 'stf' this._clientVersion = null @@ -42,7 +46,10 @@ function VncConnection(conn) { this._clientCutTextLength = 0 this.conn = conn + .on('error', this._bound._errorListener) .on('readable', this._bound._readableListener) + .on('end', this._bound._endListener) + .on('close', this._bound._closeListener) this._writeServerVersion() this._read() @@ -67,6 +74,8 @@ VncConnection.CLIENT_MESSAGE_KEYEVENT = 4 VncConnection.CLIENT_MESSAGE_POINTEREVENT = 5 VncConnection.CLIENT_MESSAGE_CLIENTCUTTEXT = 6 +VncConnection.SERVER_MESSAGE_FBUPDATE = 0 + var StateReverse = Object.create(null), State = { STATE_NEED_CLIENT_VERSION: 10 , STATE_NEED_CLIENT_SECURITY: 20 @@ -82,11 +91,61 @@ var StateReverse = Object.create(null), State = { , STATE_NEED_CLIENT_MESSAGE_CLIENTCUTTEXT_VALUE: 101 } +VncConnection.ENCODING_RAW = 0 + Object.keys(State).map(function(name) { VncConnection[name] = State[name] StateReverse[State[name]] = name }) +VncConnection.prototype.end = function() { + this.conn.end() +} + +VncConnection.prototype.writeFramebufferUpdate = function(rectangles) { + var chunk = new Buffer(4) + chunk[0] = VncConnection.SERVER_MESSAGE_FBUPDATE + chunk[1] = 0 + chunk.writeUInt16BE(rectangles.length, 2) + this._write(chunk) + + rectangles.forEach(function(rect) { + var chunk = new Buffer(12) + chunk.writeUInt16BE(rect.xPosition, 0) + chunk.writeUInt16BE(rect.yPosition, 2) + chunk.writeUInt16BE(rect.width, 4) + chunk.writeUInt16BE(rect.height, 6) + chunk.writeInt32BE(rect.encodingType, 8) + this._write(chunk) + + switch (rect.encodingType) { + case VncConnection.ENCODING_RAW: + this._write(rect.data) + break + default: + throw new Error(util.format( + 'Unsupported encoding type', rect.encodingType)) + } + }, this) +} + +VncConnection.prototype._error = function(err) { + this.emit('error', err) + this.end() +} + +VncConnection.prototype._errorListener = function(err) { + this._error(err) +} + +VncConnection.prototype._endListener = function() { + this.emit('end') +} + +VncConnection.prototype._closeListener = function() { + this.emit('close') +} + VncConnection.prototype._writeServerVersion = function() { // Yes, we could just format the string instead. Didn't feel like it. switch (this._serverVersion) { @@ -171,7 +230,10 @@ VncConnection.prototype._read = function() { switch (this._state) { case VncConnection.STATE_NEED_CLIENT_VERSION: if ((chunk = this._consume(12))) { - this._clientVersion = this._parseVersion(chunk) + if ((this._clientVersion = this._parseVersion(chunk)) === null) { + this.end() + return + } debug('client version', this._clientVersion) this._writeSupportedSecurity() this._changeState(VncConnection.STATE_NEED_CLIENT_SECURITY) @@ -179,7 +241,12 @@ VncConnection.prototype._read = function() { break case VncConnection.STATE_NEED_CLIENT_SECURITY: if ((chunk = this._consume(1))) { - this._clientSecurity = this._parseSecurity(chunk) + if ((this._clientSecurity = this._parseSecurity(chunk)) === null) { + this._writeSecurityResult( + VncConnection.SECURITYRESULT_FAIL, 'Unsupported security type') + this.end() + return + } debug('client security', this._clientSecurity) this._writeSecurityResult(VncConnection.SECURITYRESULT_OK) this._changeState(VncConnection.STATE_NEED_CLIENT_INIT) @@ -197,25 +264,33 @@ VncConnection.prototype._read = function() { if ((chunk = this._consume(1))) { switch (chunk[0]) { case VncConnection.CLIENT_MESSAGE_SETPIXELFORMAT: - this._changeState(VncConnection.STATE_NEED_CLIENT_MESSAGE_SETPIXELFORMAT) + this._changeState( + VncConnection.STATE_NEED_CLIENT_MESSAGE_SETPIXELFORMAT) break case VncConnection.CLIENT_MESSAGE_SETENCODINGS: - this._changeState(VncConnection.STATE_NEED_CLIENT_MESSAGE_SETENCODINGS) + this._changeState( + VncConnection.STATE_NEED_CLIENT_MESSAGE_SETENCODINGS) break case VncConnection.CLIENT_MESSAGE_FBUPDATEREQUEST: - this._changeState(VncConnection.STATE_NEED_CLIENT_MESSAGE_FBUPDATEREQUEST) + this._changeState( + VncConnection.STATE_NEED_CLIENT_MESSAGE_FBUPDATEREQUEST) break case VncConnection.CLIENT_MESSAGE_KEYEVENT: - this._changeState(VncConnection.STATE_NEED_CLIENT_MESSAGE_KEYEVENT) + this._changeState( + VncConnection.STATE_NEED_CLIENT_MESSAGE_KEYEVENT) break case VncConnection.CLIENT_MESSAGE_POINTEREVENT: - this._changeState(VncConnection.STATE_NEED_CLIENT_MESSAGE_POINTEREVENT) + this._changeState( + VncConnection.STATE_NEED_CLIENT_MESSAGE_POINTEREVENT) break case VncConnection.CLIENT_MESSAGE_CLIENTCUTTEXT: - this._changeState(VncConnection.STATE_NEED_CLIENT_MESSAGE_CLIENTCUTTEXT) + this._changeState( + VncConnection.STATE_NEED_CLIENT_MESSAGE_CLIENTCUTTEXT) break default: - throw new Error(util.format('Unsupported message type %d', chunk[0])) + this._error(new Error(util.format( + 'Unsupported message type %d', chunk[0]))) + return } } break @@ -236,6 +311,12 @@ VncConnection.prototype._read = function() { }) // [16b, 19b) padding debug('client pixel format', this._clientPixelFormat) + if (this._requireServerPixelFormat && + this._clientPixelFormat.bitsPerPixel < + this._serverPixelFormat.bitsPerPixel) { + this.end() + return + } this._changeState(VncConnection.STATE_NEED_CLIENT_MESSAGE) } break @@ -243,7 +324,8 @@ VncConnection.prototype._read = function() { if ((chunk = this._consume(3))) { // [0b, 1b) padding this._clientEncodingCount = chunk.readUInt16BE(1, true) - this._changeState(VncConnection.STATE_NEED_CLIENT_MESSAGE_SETENCODINGS_VALUE) + this._changeState( + VncConnection.STATE_NEED_CLIENT_MESSAGE_SETENCODINGS_VALUE) } break case VncConnection.STATE_NEED_CLIENT_MESSAGE_SETENCODINGS_VALUE: @@ -261,11 +343,13 @@ VncConnection.prototype._read = function() { break case VncConnection.STATE_NEED_CLIENT_MESSAGE_FBUPDATEREQUEST: if ((chunk = this._consume(9))) { - // incremental = chunk[0] - // xPosition = chunk.readUInt16BE(1, true) - // yPosition = chunk.readUInt16BE(3, true) - // width = chunk.readUInt16BE(5, true) - // height = chunk.readUInt16BE(7, true) + this.emit('fbupdaterequest', { + incremental: chunk[0] + , xPosition: chunk.readUInt16BE(1, true) + , yPosition: chunk.readUInt16BE(3, true) + , width: chunk.readUInt16BE(5, true) + , height: chunk.readUInt16BE(7, true) + }) this._changeState(VncConnection.STATE_NEED_CLIENT_MESSAGE) } break @@ -289,7 +373,8 @@ VncConnection.prototype._read = function() { if ((chunk = this._consume(7))) { // [0b, 3b) padding this._clientCutTextLength = chunk.readUInt32BE(3) - this._changeState(VncConnection.STATE_NEED_CLIENT_MESSAGE_CLIENTCUTTEXT_VALUE) + this._changeState( + VncConnection.STATE_NEED_CLIENT_MESSAGE_CLIENTCUTTEXT_VALUE) } break case VncConnection.STATE_NEED_CLIENT_MESSAGE_CLIENTCUTTEXT_VALUE: @@ -319,7 +404,7 @@ VncConnection.prototype._parseVersion = function(chunk) { return VncConnection.V3_003 } - throw new Error('Unsupported version') + return null } VncConnection.prototype._parseSecurity = function(chunk) { @@ -328,7 +413,7 @@ VncConnection.prototype._parseSecurity = function(chunk) { case VncConnection.SECURITY_VNC: return chunk[0] default: - throw new Error('Unsupported security type') + return null } } diff --git a/lib/units/device/plugins/vnc/util/example.js b/lib/units/device/plugins/vnc/util/example.js deleted file mode 100644 index 6ed338e0..00000000 --- a/lib/units/device/plugins/vnc/util/example.js +++ /dev/null @@ -1,10 +0,0 @@ -var net = require('net') -var VncServer = require('./server') - -var nserv = net.createServer({ - allowHalfOpen: true -}) - -var vserv = new VncServer(nserv) - -nserv.listen(5910) diff --git a/lib/units/device/plugins/vnc/util/server.js b/lib/units/device/plugins/vnc/util/server.js index 2c8f6162..33b6c8e2 100644 --- a/lib/units/device/plugins/vnc/util/server.js +++ b/lib/units/device/plugins/vnc/util/server.js @@ -1,4 +1,3 @@ -var net = require('net') var util = require('util') var EventEmitter = require('eventemitter3').EventEmitter @@ -8,18 +7,44 @@ var VncConnection = require('./connection') function VncServer(server) { this._bound = { - _connectionListener: this._connectionListener.bind(this) + _listeningListener: this._listeningListener.bind(this) + , _connectionListener: this._connectionListener.bind(this) + , _closeListener: this._closeListener.bind(this) + , _errorListener: this._errorListener.bind(this) } this.server = server + .on('listening', this._bound._listeningListener) .on('connection', this._bound._connectionListener) + .on('close', this._bound._closeListener) + .on('error', this._bound._errorListener) } util.inherits(VncServer, EventEmitter) +VncServer.prototype.close = function() { + this.server.close() +} + +VncServer.prototype.listen = function() { + this.server.listen.apply(this.server, arguments) +} + +VncServer.prototype._listeningListener = function() { + this.emit('listening') +} + VncServer.prototype._connectionListener = function(conn) { debug('connection', conn.remoteAddress, conn.remotePort) - new VncConnection(conn) + this.emit('connection', new VncConnection(conn)) +} + +VncServer.prototype._closeListener = function() { + this.emit('close') +} + +VncServer.prototype._errorListener = function(err) { + this.emit('error', err) } module.exports = VncServer