From 4bbcaa45db8da9647e2360f044b4423f29330558 Mon Sep 17 00:00:00 2001 From: Simo Kinnunen Date: Tue, 14 Oct 2014 20:00:49 +0900 Subject: [PATCH] Separate ForwardManager into its own file and attempt to make it a bit more resilient. --- lib/units/device/plugins/forward/index.js | 121 ++++---------- .../device/plugins/forward/util/manager.js | 158 ++++++++++++++++++ lib/wire/wire.proto | 14 ++ 3 files changed, 201 insertions(+), 92 deletions(-) create mode 100644 lib/units/device/plugins/forward/util/manager.js diff --git a/lib/units/device/plugins/forward/index.js b/lib/units/device/plugins/forward/index.js index ab844127..127b3f30 100644 --- a/lib/units/device/plugins/forward/index.js +++ b/lib/units/device/plugins/forward/index.js @@ -9,8 +9,7 @@ var lifecycle = require('../../../../util/lifecycle') var streamutil = require('../../../../util/streamutil') var wireutil = require('../../../../wire/util') -var ForwardReader = require('./util/reader') -var ForwardWriter = require('./util/writer') +var ForwardManager = require('./util/manager') module.exports = syrup.serial() .dependency(require('../../support/adb')) @@ -21,96 +20,6 @@ module.exports = syrup.serial() .define(function(options, adb, router, push, minirev, group) { var log = logger.createLogger('device:plugins:forward') var plugin = Object.create(null) - - function ForwardManager() { - var forwards = Object.create(null) - - function Forward(conn, to) { - var proxies = Object.create(null) - - function Proxy(fd) { - function maybeSend() { - var chunk - while ((chunk = this.read())) { - if (!conn.write(chunk)) { - break - } - } - } - - function killListeners() { - src.removeListener('readable', maybeSend) - conn.removeListener('drain', maybeSend) - conn.removeListener('end', killListeners) - } - - var src = new ForwardWriter(fd) - .on('readable', maybeSend) - .on('error', function(err) { - log.error('Proxy writer %d had an error', fd, to, err.stack) - }) - - conn.on('drain', maybeSend) - conn.on('end', killListeners) - - this.dest = net.connect(to) - .once('end', function() { - delete proxies[fd] - killListeners() - }) - .on('error', function(err) { - log.error('Proxy reader %d had an error', fd, to, err.stack) - }) - - this.dest.pipe(src) - - this.stop = function() { - this.dest.end() - } - } - - conn.pipe(new ForwardReader()) - .on('packet', function(fd, packet) { - var proxy = proxies[fd] - - if (packet) { - if (!proxy) { - // New connection - proxy = proxies[fd] = new Proxy(fd) - } - - proxy.dest.write(packet) - } - else { - // The connection ended - if (proxy) { - proxy.stop() - } - } - }) - - this.end = function() { - conn.end() - } - } - - this.add = function(port, conn, to) { - forwards[port] = new Forward(conn, to) - } - - this.remove = function(port) { - if (forwards[port]) { - forwards[port].end() - } - } - - this.removeAll = function() { - Object.keys(forwards).forEach(function(port) { - forwards[port].end() - }) - } - } - var manager = new ForwardManager() function startService() { @@ -203,9 +112,37 @@ module.exports = syrup.serial() group.on('leave', plugin.reset) + manager.on('add', function(port, to) { + push.send([ + wireutil.global + , wireutil.envelope(new wire.DeviceForwardAddEvent( + options.serial + , port + , to.host + , to.port + )) + ]) + }) + + manager.on('remove', function(port) { + push.send([ + wireutil.global + , wireutil.envelope(new wire.DeviceForwardRemoveEvent( + options.serial + , port + )) + ]) + }) + return startService() .then(awaitServer) .then(function() { + + plugin.createForward(3000, { + host: '127.0.0.1' + , port: 3000 + }) + router .on(wire.ForwardTestMessage, function(channel, message) { var reply = wireutil.reply(options.serial) diff --git a/lib/units/device/plugins/forward/util/manager.js b/lib/units/device/plugins/forward/util/manager.js new file mode 100644 index 00000000..8bd2d460 --- /dev/null +++ b/lib/units/device/plugins/forward/util/manager.js @@ -0,0 +1,158 @@ +var util = require('util') +var events = require('events') +var net = require('net') + +var ForwardReader = require('./reader') +var ForwardWriter = require('./writer') + +// Handles multiple ports +function ForwardManager() { + var handlersByPort = Object.create(null) + + this.has = function(port) { + return !!handlersByPort[port] + } + + this.add = function(port, conn, to) { + function endListener() { + delete handlersByPort[port] + this.emit('remove', port, to) + } + + var handler = new ForwardHandler(conn, to) + handler.on('end', endListener.bind(this)) + + handlersByPort[port] = handler + + this.emit('add', port, to) + } + + this.remove = function(port) { + var handler = handlersByPort[port] + if (handler) { + handler.end() + } + } + + this.removeAll = function() { + Object.keys(handlersByPort).forEach(function(port) { + handlersByPort[port].end() + }) + } + + this.listAll = function() { + return Object.keys(handlersByPort).map(function(port) { + var handler = handlersByPort[port] + return { + port: port + , to: handler.to + } + }) + } + + events.EventEmitter.call(this) +} + +util.inherits(ForwardManager, events.EventEmitter) + +// Handles a single port +function ForwardHandler(conn, to) { + var destHandlersById = Object.create(null) + + function endListener() { + this.emit('end') + } + + function packetEndListener(id) { + delete destHandlersById[id] + } + + function packetListener(id, packet) { + var dest = destHandlersById[id] + + if (packet) { + if (!dest) { + // Let's create a new connection + dest = destHandlersById[id] = new DestHandler(id, conn, to) + dest.on('end', packetEndListener.bind(null, id)) + } + + dest.write(packet) + } + else { + // It's a simulated fin packet + if (dest) { + dest.end() + } + } + } + + conn.pipe(new ForwardReader()) + .on('end', endListener.bind(this)) + .on('packet', packetListener) + + this.to = to + + this.end = function() { + conn.end() + } + + events.EventEmitter.call(this) +} + +util.inherits(ForwardHandler, events.EventEmitter) + +// Handles a single connection +function DestHandler(id, conn, to) { + function endListener() { + conn.removeListener('drain', drainListener) + this.emit('end') + } + + function errorListener() { + writer.end() + } + + function readableListener() { + maybePipeManually() + } + + function drainListener() { + maybePipeManually() + } + + // We can't just pipe to conn because we don't want to end it + // when the dest closes. Instead we'll send a special packet + // to it (which is handled by the writer). + function maybePipeManually() { + var chunk + while ((chunk = writer.read())) { + if (!conn.write(chunk)) { + break + } + } + } + + var dest = net.connect(to) + .on('error', errorListener) + + var writer = dest.pipe(new ForwardWriter(id)) + .on('end', endListener.bind(this)) + .on('readable', readableListener) + + conn.on('drain', drainListener) + + this.end = function() { + dest.end() + } + + this.write = function(chunk) { + dest.write(chunk) + } + + events.EventEmitter.call(this) +} + +util.inherits(DestHandler, events.EventEmitter) + +module.exports = ForwardManager diff --git a/lib/wire/wire.proto b/lib/wire/wire.proto index 6a64dbf7..4f19b7ef 100644 --- a/lib/wire/wire.proto +++ b/lib/wire/wire.proto @@ -70,6 +70,8 @@ enum MessageType { AccountGetMessage = 62; AccountRemoveMessage = 55; SdStatusMessage = 61; + DeviceForwardAddEvent = 72; + DeviceForwardRemoveEvent = 73; } message Envelope { @@ -406,6 +408,18 @@ message ForwardRemoveMessage { required uint32 devicePort = 1; } +message DeviceForwardAddEvent { + required string serial = 1; + required uint32 devicePort = 2; + required string targetHost = 3; + required uint32 targetPort = 4; +} + +message DeviceForwardRemoveEvent { + required string serial = 1; + required uint32 devicePort = 2; +} + message BrowserOpenMessage { required string url = 1; optional string browser = 2;