1
0
Fork 0
mirror of https://github.com/openstf/stf synced 2025-10-04 18:29:17 +02:00

Separate ForwardManager into its own file and attempt to make it a bit more resilient.

This commit is contained in:
Simo Kinnunen 2014-10-14 20:00:49 +09:00
parent 0a6cefcf59
commit 4bbcaa45db
3 changed files with 201 additions and 92 deletions

View file

@ -9,8 +9,7 @@ var lifecycle = require('../../../../util/lifecycle')
var streamutil = require('../../../../util/streamutil')
var wireutil = require('../../../../wire/util')
var ForwardReader = require('./util/reader')
var ForwardWriter = require('./util/writer')
var ForwardManager = require('./util/manager')
module.exports = syrup.serial()
.dependency(require('../../support/adb'))
@ -21,96 +20,6 @@ module.exports = syrup.serial()
.define(function(options, adb, router, push, minirev, group) {
var log = logger.createLogger('device:plugins:forward')
var plugin = Object.create(null)
function ForwardManager() {
var forwards = Object.create(null)
function Forward(conn, to) {
var proxies = Object.create(null)
function Proxy(fd) {
function maybeSend() {
var chunk
while ((chunk = this.read())) {
if (!conn.write(chunk)) {
break
}
}
}
function killListeners() {
src.removeListener('readable', maybeSend)
conn.removeListener('drain', maybeSend)
conn.removeListener('end', killListeners)
}
var src = new ForwardWriter(fd)
.on('readable', maybeSend)
.on('error', function(err) {
log.error('Proxy writer %d had an error', fd, to, err.stack)
})
conn.on('drain', maybeSend)
conn.on('end', killListeners)
this.dest = net.connect(to)
.once('end', function() {
delete proxies[fd]
killListeners()
})
.on('error', function(err) {
log.error('Proxy reader %d had an error', fd, to, err.stack)
})
this.dest.pipe(src)
this.stop = function() {
this.dest.end()
}
}
conn.pipe(new ForwardReader())
.on('packet', function(fd, packet) {
var proxy = proxies[fd]
if (packet) {
if (!proxy) {
// New connection
proxy = proxies[fd] = new Proxy(fd)
}
proxy.dest.write(packet)
}
else {
// The connection ended
if (proxy) {
proxy.stop()
}
}
})
this.end = function() {
conn.end()
}
}
this.add = function(port, conn, to) {
forwards[port] = new Forward(conn, to)
}
this.remove = function(port) {
if (forwards[port]) {
forwards[port].end()
}
}
this.removeAll = function() {
Object.keys(forwards).forEach(function(port) {
forwards[port].end()
})
}
}
var manager = new ForwardManager()
function startService() {
@ -203,9 +112,37 @@ module.exports = syrup.serial()
group.on('leave', plugin.reset)
manager.on('add', function(port, to) {
push.send([
wireutil.global
, wireutil.envelope(new wire.DeviceForwardAddEvent(
options.serial
, port
, to.host
, to.port
))
])
})
manager.on('remove', function(port) {
push.send([
wireutil.global
, wireutil.envelope(new wire.DeviceForwardRemoveEvent(
options.serial
, port
))
])
})
return startService()
.then(awaitServer)
.then(function() {
plugin.createForward(3000, {
host: '127.0.0.1'
, port: 3000
})
router
.on(wire.ForwardTestMessage, function(channel, message) {
var reply = wireutil.reply(options.serial)

View file

@ -0,0 +1,158 @@
var util = require('util')
var events = require('events')
var net = require('net')
var ForwardReader = require('./reader')
var ForwardWriter = require('./writer')
// Handles multiple ports
function ForwardManager() {
var handlersByPort = Object.create(null)
this.has = function(port) {
return !!handlersByPort[port]
}
this.add = function(port, conn, to) {
function endListener() {
delete handlersByPort[port]
this.emit('remove', port, to)
}
var handler = new ForwardHandler(conn, to)
handler.on('end', endListener.bind(this))
handlersByPort[port] = handler
this.emit('add', port, to)
}
this.remove = function(port) {
var handler = handlersByPort[port]
if (handler) {
handler.end()
}
}
this.removeAll = function() {
Object.keys(handlersByPort).forEach(function(port) {
handlersByPort[port].end()
})
}
this.listAll = function() {
return Object.keys(handlersByPort).map(function(port) {
var handler = handlersByPort[port]
return {
port: port
, to: handler.to
}
})
}
events.EventEmitter.call(this)
}
util.inherits(ForwardManager, events.EventEmitter)
// Handles a single port
function ForwardHandler(conn, to) {
var destHandlersById = Object.create(null)
function endListener() {
this.emit('end')
}
function packetEndListener(id) {
delete destHandlersById[id]
}
function packetListener(id, packet) {
var dest = destHandlersById[id]
if (packet) {
if (!dest) {
// Let's create a new connection
dest = destHandlersById[id] = new DestHandler(id, conn, to)
dest.on('end', packetEndListener.bind(null, id))
}
dest.write(packet)
}
else {
// It's a simulated fin packet
if (dest) {
dest.end()
}
}
}
conn.pipe(new ForwardReader())
.on('end', endListener.bind(this))
.on('packet', packetListener)
this.to = to
this.end = function() {
conn.end()
}
events.EventEmitter.call(this)
}
util.inherits(ForwardHandler, events.EventEmitter)
// Handles a single connection
function DestHandler(id, conn, to) {
function endListener() {
conn.removeListener('drain', drainListener)
this.emit('end')
}
function errorListener() {
writer.end()
}
function readableListener() {
maybePipeManually()
}
function drainListener() {
maybePipeManually()
}
// We can't just pipe to conn because we don't want to end it
// when the dest closes. Instead we'll send a special packet
// to it (which is handled by the writer).
function maybePipeManually() {
var chunk
while ((chunk = writer.read())) {
if (!conn.write(chunk)) {
break
}
}
}
var dest = net.connect(to)
.on('error', errorListener)
var writer = dest.pipe(new ForwardWriter(id))
.on('end', endListener.bind(this))
.on('readable', readableListener)
conn.on('drain', drainListener)
this.end = function() {
dest.end()
}
this.write = function(chunk) {
dest.write(chunk)
}
events.EventEmitter.call(this)
}
util.inherits(DestHandler, events.EventEmitter)
module.exports = ForwardManager

View file

@ -70,6 +70,8 @@ enum MessageType {
AccountGetMessage = 62;
AccountRemoveMessage = 55;
SdStatusMessage = 61;
DeviceForwardAddEvent = 72;
DeviceForwardRemoveEvent = 73;
}
message Envelope {
@ -406,6 +408,18 @@ message ForwardRemoveMessage {
required uint32 devicePort = 1;
}
message DeviceForwardAddEvent {
required string serial = 1;
required uint32 devicePort = 2;
required string targetHost = 3;
required uint32 targetPort = 4;
}
message DeviceForwardRemoveEvent {
required string serial = 1;
required uint32 devicePort = 2;
}
message BrowserOpenMessage {
required string url = 1;
optional string browser = 2;