diff --git a/lib/db/api.js b/lib/db/api.js index d1606897..3345dc38 100644 --- a/lib/db/api.js +++ b/lib/db/api.js @@ -100,23 +100,6 @@ dbapi.lookupUserByAdbFingerprint = function(fingerprint) { }) } -dbapi.addUserForward = function(email, forward) { - var devicePort = forward.devicePort - return db.run(r.table('users').get(email).update({ - forwards: r.row('forwards').default([]).filter(function(forward) { - return forward('devicePort').ne(devicePort) - }).append(forward) - })) -} - -dbapi.removeUserForward = function(email, devicePort) { - return db.run(r.table('users').get(email).update({ - forwards: r.row('forwards').default([]).filter(function(forward) { - return forward('devicePort').ne(devicePort) - }) - })) -} - dbapi.loadGroup = function(email) { return db.run(r.table('devices').getAll(email, { index: 'owner' diff --git a/lib/units/device/plugins/forward.js b/lib/units/device/plugins/forward.js index d8949175..33526d3a 100644 --- a/lib/units/device/plugins/forward.js +++ b/lib/units/device/plugins/forward.js @@ -1,157 +1,174 @@ var net = require('net') -var util = require('util') -var syrup = require('syrup') var Promise = require('bluebird') -var split = require('split') +var syrup = require('syrup') -var logger = require('../../../util/logger') -var devutil = require('../../../util/devutil') -var streamutil = require('../../../util/streamutil') -var lifecycle = require('../../../util/lifecycle') var wire = require('../../../wire') +var logger = require('../../../util/logger') +var lifecycle = require('../../../util/lifecycle') +var streamutil = require('../../../util/streamutil') +var forwardutil = require('../../../util/forwardutil') var wireutil = require('../../../wire/util') module.exports = syrup.serial() .dependency(require('../support/adb')) .dependency(require('../support/router')) .dependency(require('../support/push')) - .dependency(require('../resources/remote')) - .define(function(options, adb, router, push, remote) { + .dependency(require('../resources/minirev')) + .define(function(options, adb, router, push, minirev) { var log = logger.createLogger('device:plugins:forward') + var plugin = Object.create(null) - var service = { - port: 2810 - , privatePorts: (function() { - var ports = [] - for (var i = 2520; i <= 2540; ++i) { - ports.push(i) - } - return ports - })() - , forwards: Object.create(null) - } + function ForwardManager() { + var forwards = Object.create(null) - function openService() { - log.info('Launching reverse port forwarding service') - return devutil.ensureUnusedPort(adb, options.serial, service.port) - .timeout(10000) - .then(function() { - return adb.shell(options.serial, [ - 'exec' - , remote.bin - , '--lib', remote.lib - , '--listen-forward', service.port - ]) - .timeout(10000) - .then(function(out) { - lifecycle.share('Forward shell', out) - streamutil.talk(log, 'Forward shell says: "%s"', out) - }) - .then(function() { - return devutil.waitForPort(adb, options.serial, service.port) - .timeout(10000) - }) - .then(function(conn) { - conn.end() - }) - }) - } + function Forward(conn, to) { + var proxies = Object.create(null) - function createForward(data) { - log.info( - 'Reverse port forwarding port %d to %s:%d' - , data.devicePort - , data.targetHost - , data.targetPort - ) - - var forward = service.forwards[data.devicePort] - - if (forward) { - if (forward.targetHost === data.targetHost && - forward.targetPort === data.targetPort) { - return Promise.resolve() - } - else if (forward.system) { - return Promise.reject(new Error('Cannot rebind system port')) - } - else { - removeForward(forward) - } - } - - return adb.openTcp(options.serial, service.port) - .timeout(10000) - .then(function(conn) { - var resolver = Promise.defer() - - var forward = { - devicePort: data.devicePort - , targetHost: data.targetHost - , targetPort: data.targetPort - , system: !!data.system - , privatePort: service.privatePorts.pop() - , connection: conn + function Proxy(fd) { + function maybeSend() { + var chunk + while ((chunk = this.read())) { + if (!conn.write(chunk)) { + break + } + } } - var parser = conn.pipe(split()) + function killListeners() { + src.removeListener('readable', maybeSend) + conn.removeListener('drain', maybeSend) + conn.removeListener('end', killListeners) + } - parser.on('data', function(chunk) { - var cmd = chunk.toString().trim() - switch (cmd) { - case 'OKAY': - resolver.resolve(forward) - break - case 'FAIL': - resolver.reject(new Error('Remote replied with FAIL')) - break - case 'CNCT': - adb.openTcp(options.serial, forward.privatePort) - .done(function(dstream) { - return tryConnect(forward) - .then(function(ustream) { - ustream.pipe(dstream) - dstream.pipe(ustream) - }) - }) - break + var src = new forwardutil.ForwardWriter(fd) + .on('readable', maybeSend) + .on('error', function(err) { + log.error('Proxy writer %d had an error', fd, to, err.stack) + }) + + conn.on('drain', maybeSend) + conn.on('end', killListeners) + + this.dest = net.connect(to) + .once('end', function() { + delete proxies[fd] + killListeners() + }) + .on('error', function(err) { + log.error('Proxy reader %d had an error', fd, to, err.stack) + }) + + this.dest.pipe(src) + + this.stop = function() { + //this.dest.unpipe(this.src) + this.dest.end() + }.bind(this) + } + + conn.pipe(new forwardutil.ForwardParser()) + .on('packet', function(fd, packet) { + var proxy = proxies[fd] + + if (!proxy) { + // New connection + proxy = proxies[fd] = new Proxy(fd) + } + + proxy.dest.write(packet) + }) + .on('fin', function(fd) { + // The connection ended + if (proxies[fd]) { + proxies[fd].stop() } }) - // Keep this around - function endListener() { - removeForward(forward) - } + this.end = function() { + conn.end() + } + } - conn.on('end', endListener) + this.add = function(port, conn, to) { + forwards[port] = new Forward(conn, to) + } - conn.write(util.format( - 'FRWD %d %d\n' - , forward.devicePort - , forward.privatePort - )) - - return resolver.promise - }) - } - - function removeForward(data) { - log.info('Removing reverse port forwarding on port %d', data.devicePort) - var forward = service.forwards[data.devicePort] - if (forward) { - forward.connection.end() - delete service.forwards[data.devicePort] + this.remove = function(port) { + if (forwards[port]) { + forwards[port].end() + } } } - function tryConnect(data) { + var manager = new ForwardManager() + + function startService() { + log.info('Launching reverse port forwarding service') + return adb.shell(options.serial, [ + 'exec' + , minirev.bin + ]) + .timeout(10000) + .then(function(out) { + lifecycle.share('Forward shell', out) + streamutil.talk(log, 'Forward shell says: "%s"', out) + }) + } + + function connectService(times) { + function tryConnect(times, delay) { + return adb.openLocal(options.serial, 'localabstract:minirev') + .timeout(10000) + .catch(function(err) { + if (/closed/.test(err.message) && times > 1) { + return Promise.delay(delay) + .then(function() { + return tryConnect(--times, delay * 2) + }) + } + return Promise.reject(err) + }) + } + log.info('Connecting to reverse port forwarding service') + return tryConnect(times, 100) + } + + function awaitServer() { + return connectService(5) + .then(function(conn) { + conn.end() + return true + }) + } + + plugin.createForward = function(port, to) { + log.info( + 'Creating reverse port forward from ":%d" to "%s:%d"' + , port + , to.host + , to.port + ) + return connectService(1) + .then(function(out) { + var header = new Buffer(4) + header.writeUInt16LE(0, 0) + header.writeUInt16LE(port, 2) + out.write(header) + return manager.add(port, out, to) + }) + } + + plugin.removeForward = function(port) { + log.info('Removing reverse port forward ":%d"', port) + manager.remove(port) + return Promise.resolve() + } + + plugin.connect = function(options) { var resolver = Promise.defer() - var conn = net.connect({ - host: data.targetHost - , port: data.targetPort - }) + var conn = net.connect(options) function connectListener() { resolver.resolve(conn) @@ -170,31 +187,16 @@ module.exports = syrup.serial() }) } - function resetForwards() { - Object.keys(service.forwards).forEach(function(privatePort) { - service.forwards[privatePort].connection.end() - delete service.forwards[privatePort] - }) - } - - function listForwards() { - return Object.keys(service.forwards).map(function(privatePort) { - var forward = service.forwards[privatePort] - return { - devicePort: forward.devicePort - , targetHost: forward.targetHost - , targetPort: forward.targetPort - , system: !!forward.system - } - }) - } - - return openService() + return startService() + .then(awaitServer) .then(function() { router .on(wire.ForwardTestMessage, function(channel, message) { var reply = wireutil.reply(options.serial) - tryConnect(message) + plugin.connect({ + host: message.targetHost + , port: message.targetPort + }) .then(function(conn) { conn.end() push.send([ @@ -211,7 +213,10 @@ module.exports = syrup.serial() }) .on(wire.ForwardCreateMessage, function(channel, message) { var reply = wireutil.reply(options.serial) - createForward(message) + plugin.createForward(message.devicePort, { + host: message.targetHost + , port: message.targetPort + }) .then(function() { push.send([ channel @@ -228,11 +233,21 @@ module.exports = syrup.serial() }) .on(wire.ForwardRemoveMessage, function(channel, message) { var reply = wireutil.reply(options.serial) - removeForward(message) - push.send([ - channel - , reply.okay('success') - ]) + plugin.removeForward(message.devicePort) + .then(function() { + push.send([ + channel + , reply.okay('success') + ]) + }) + .catch(function(err) { + log.error('Reverse port unforwarding failed', err.stack) + push.send([ + channel + , reply.fail('fail') + ]) + }) }) }) + .return(plugin) }) diff --git a/lib/units/device/resources/minirev.js b/lib/units/device/resources/minirev.js new file mode 100644 index 00000000..b0778fe0 --- /dev/null +++ b/lib/units/device/resources/minirev.js @@ -0,0 +1,93 @@ +var util = require('util') + +var Promise = require('bluebird') +var syrup = require('syrup') + +var logger = require('../../../util/logger') +var pathutil = require('../../../util/pathutil') +var devutil = require('../../../util/devutil') +var streamutil = require('../../../util/streamutil') + +module.exports = syrup.serial() + .dependency(require('../support/adb')) + .dependency(require('../support/properties')) + .define(function(options, adb, properties) { + var log = logger.createLogger('device:resources:minirev') + + var resources = { + bin: { + src: pathutil.vendor(util.format( + 'minirev/%s/minirev' + , properties['ro.product.cpu.abi'] + )) + , dest: '/data/local/tmp/minirev' + , comm: 'minirev' + , mode: 0755 + } + } + + function removeResource(res) { + return adb.shell(options.serial, ['rm', res.dest]) + .timeout(10000) + .then(function(out) { + return streamutil.readAll(out) + }) + .return(res) + } + + function installResource(res) { + return adb.push(options.serial, res.src, res.dest, res.mode) + .timeout(10000) + .then(function(transfer) { + return new Promise(function(resolve, reject) { + transfer.on('error', reject) + transfer.on('end', resolve) + }) + }) + .return(res) + } + + function ensureNotBusy(res) { + return adb.shell(options.serial, [res.dest, '-h']) + .timeout(10000) + .then(function(out) { + // Can be "Text is busy", "text busy" + return streamutil.findLine(out, (/busy/i)) + .timeout(10000) + .then(function() { + log.info('Binary is busy, will retry') + return Promise.delay(1000) + }) + .then(function() { + return ensureNotBusy(res) + }) + .catch(streamutil.NoSuchLineError, function() { + return res + }) + }) + } + + function installAll() { + return Promise.all([ + removeResource(resources.bin).then(installResource).then(ensureNotBusy) + ]) + } + + function stop() { + return devutil.killProcsByComm( + adb + , options.serial + , resources.bin.comm + , resources.bin.dest + ) + .timeout(15000) + } + + return stop() + .then(installAll) + .then(function() { + return { + bin: resources.bin.dest + } + }) + }) diff --git a/lib/units/websocket/index.js b/lib/units/websocket/index.js index 21992b10..bae570b2 100644 --- a/lib/units/websocket/index.js +++ b/lib/units/websocket/index.js @@ -676,32 +676,26 @@ module.exports = function(options) { if (!data.targetHost || data.targetHost === 'localhost') { data.targetHost = socket.request.ip } - dbapi.addUserForward(user.email, data) - .then(function() { - socket.emit('forward.create', data) - joinChannel(responseChannel) - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.ForwardCreateMessage(data) - ) - ]) - }) + socket.emit('forward.create', data) + joinChannel(responseChannel) + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.ForwardCreateMessage(data) + ) + ]) }) .on('forward.remove', function(channel, responseChannel, data) { - dbapi.removeUserForward(user.email, data.devicePort) - .then(function() { - socket.emit('forward.remove', data) - joinChannel(responseChannel) - push.send([ - channel - , wireutil.transaction( - responseChannel - , new wire.ForwardRemoveMessage(data) - ) - ]) - }) + socket.emit('forward.remove', data) + joinChannel(responseChannel) + push.send([ + channel + , wireutil.transaction( + responseChannel + , new wire.ForwardRemoveMessage(data) + ) + ]) }) .on('logcat.start', function(channel, responseChannel, data) { joinChannel(responseChannel) diff --git a/lib/util/forwardutil.js b/lib/util/forwardutil.js new file mode 100644 index 00000000..7314cf07 --- /dev/null +++ b/lib/util/forwardutil.js @@ -0,0 +1,116 @@ +var util = require('util') +var stream = require('stream') + +var HEADER_SIZE = 4 +var MAX_PACKET_SIZE = 0xFFFF + +function ForwardParser() { + stream.Transform.call(this) + this._header = new Buffer(4) + this._needLength = -HEADER_SIZE + this._target = 0 +} + +util.inherits(ForwardParser, stream.Transform) + +ForwardParser.prototype._transform = function(chunk, encoding, done) { + var cursor = 0 + + while (cursor < chunk.length) { + var diff = chunk.length - cursor + + // Do we need a header? + if (this._needLength < 0) { + // Still missing a header? + if (chunk.length < -this._needLength) { + // Save what we're received so far. + chunk.copy( + this._header + , HEADER_SIZE + this._needLength + , cursor + , cursor + -this._needLength + ) + break + } + + // Combine previous and current chunk in case the header was split. + chunk.copy( + this._header + , HEADER_SIZE + this._needLength + , cursor + , cursor + -this._needLength + ) + + cursor += -this._needLength + + this._target = this._header.readUInt16LE(0) + this._needLength = this._header.readUInt16LE(2) + + if (this._needLength === 0) { + // This is a fin packet + this.emit('fin', this._target) + this._needLength = -HEADER_SIZE + } + } + // Do we have a full data packet? + else if (diff >= this._needLength) { + this.emit( + 'packet' + , this._target + , chunk.slice(cursor, cursor + this._needLength) + ) + cursor += this._needLength + this._needLength = -HEADER_SIZE + } + // We have a partial data packet. + else { + this.emit('packet', this._target, chunk.slice(cursor, cursor + diff)) + this._needLength -= diff + cursor += diff + } + } + + done() +} + +module.exports.ForwardParser = ForwardParser + +function ForwardWriter(target) { + stream.Transform.call(this) + this._target = target +} + +util.inherits(ForwardWriter, stream.Transform) + +ForwardWriter.prototype._transform = function(chunk, encoding, done) { + var header + , length + + do { + length = Math.min(MAX_PACKET_SIZE, chunk.length) + + header = new Buffer(HEADER_SIZE) + header.writeUInt16LE(this._target, 0) + header.writeUInt16LE(length, 2) + + this.push(header) + this.push(chunk.slice(0, length)) + + chunk = chunk.slice(length) + } + while (chunk.length) + + done() +} + +ForwardWriter.prototype._flush = function(done) { + var header = new Buffer(HEADER_SIZE) + header.writeUInt16LE(this._target, 0) + header.writeUInt16LE(0, 2) + + this.push(header) + + done() +} + +module.exports.ForwardWriter = ForwardWriter diff --git a/lib/wire/wire.proto b/lib/wire/wire.proto index 941ad235..6a64dbf7 100644 --- a/lib/wire/wire.proto +++ b/lib/wire/wire.proto @@ -400,7 +400,6 @@ message ForwardCreateMessage { required uint32 devicePort = 1; required string targetHost = 2; required uint32 targetPort = 3; - optional bool system = 4; } message ForwardRemoveMessage { diff --git a/vendor/minirev/armeabi-v7a/minirev b/vendor/minirev/armeabi-v7a/minirev new file mode 100755 index 00000000..a70f9cc8 Binary files /dev/null and b/vendor/minirev/armeabi-v7a/minirev differ diff --git a/vendor/minirev/armeabi/minirev b/vendor/minirev/armeabi/minirev new file mode 100755 index 00000000..29453885 Binary files /dev/null and b/vendor/minirev/armeabi/minirev differ diff --git a/vendor/minirev/mips/minirev b/vendor/minirev/mips/minirev new file mode 100755 index 00000000..60c126c9 Binary files /dev/null and b/vendor/minirev/mips/minirev differ diff --git a/vendor/minirev/x86/minirev b/vendor/minirev/x86/minirev new file mode 100755 index 00000000..8a35397f Binary files /dev/null and b/vendor/minirev/x86/minirev differ