diff --git a/lib/cli.js b/lib/cli.js index 483523bf..f6272dce 100644 --- a/lib/cli.js +++ b/lib/cli.js @@ -380,31 +380,6 @@ program }) }) -program - .command('console') - .description('start console') - .option('-s, --connect-sub ' - , 'sub endpoint' - , cliutil.list) - .option('-p, --connect-push ' - , 'push endpoint' - , cliutil.list) - .action(function(options) { - if (!options.connectSub) { - this.missingArgument('--connect-sub') - } - if (!options.connectPush) { - this.missingArgument('--connect-push') - } - - require('./roles/console')({ - endpoints: { - sub: options.connectSub - , push: options.connectPush - } - }) - }) - program .command('local [serial..]') .description('start everything locally') diff --git a/lib/roles/console.js b/lib/roles/console.js deleted file mode 100644 index 1ee3746f..00000000 --- a/lib/roles/console.js +++ /dev/null @@ -1,161 +0,0 @@ -var readline = require('readline') - -var zmq = require('zmq') -var Promise = require('bluebird') - -var logger = require('../util/logger') -var tx = require('../util/tx') -var wire = require('../wire') -var wireutil = require('../wire/util') - -module.exports = function(options) { - var log = logger.createLogger('console') - - // Input - var sub = zmq.socket('sub') - sub.subscribe('ALL') - options.endpoints.sub.forEach(function(endpoint) { - log.info('SUB connected to %s', endpoint) - sub.connect(endpoint) - }) - - // Output - var push = zmq.socket('push') - options.endpoints.push.forEach(function(endpoint) { - log.info('PUSH connected to %s', endpoint) - push.connect(endpoint) - }) - - // User input - var rl = readline.createInterface({ - input: process.stdin - , output: process.stdout - }) - rl.setPrompt('stf> ') - rl.prompt() - - var clients = [] - , group = wireutil.global - - rl.on('line', function(line) { - var args = line.trim().split(/\s+/g) - switch (args.shift()) { - case 'help': - console.log() - console.log('Available commands:') - console.log() - console.log(' help - show help') - console.log() - rl.prompt() - break - case 'invite': - if (clients.length) { - log.error('We already have clients') - break - } - - var ours = group = wireutil.makePrivateChannel() - - var messageListener = function(theirs, data) { - if (theirs.toString() === ours) { - var wrapper = wire.Envelope.decode(data) - switch (wrapper.type) { - case wire.MessageType.JOIN_GROUP: - var message = wire.JoinGroupMessage.decode(wrapper.message) - clients.push(message.serial) - log.info('"%s" joined', message.serial) - break - case wire.MessageType.LEAVE_GROUP: - var message = wire.LeaveGroupMessage.decode(wrapper.message) - , index = clients.indexOf(message.serial) - clients.splice(index, 1) - log.info('"%s" left', message.serial) - break - default: - throw new Error('Unexpected response') - } - } - } - - sub.subscribe(ours) - sub.on('message', messageListener) - - push.send([wireutil.global, wireutil.makeGroupMessage( - ours - , 10000 - , [] - )]) - - Promise.delay(1000) - .then(function() { - rl.prompt() - }) - break - case 'shell': - var resolvers = {} - , ours = wireutil.makePrivateChannel() - , counter = 0 - - log.debug('Using channel "%s"', ours) - - clients.forEach(function(client) { - resolvers[client] = Promise.defer() - }) - - var messageListener = function(theirs, data) { - if (theirs.toString() === ours) { - var wrapper = wire.Envelope.decode(data) - switch (wrapper.type) { - case wire.MessageType.DEVICE_DATA: - var message = wire.DeviceDataMessage.decode(wrapper.message) - log.info('[%s] DATA: %s' - , message.serial, message.data.toUTF8().trim()) - break - case wire.MessageType.DEVICE_DONE: - var message = wire.DeviceDoneMessage.decode(wrapper.message) - log.info('[%s] DONE', message.serial) - resolvers[message.serial].resolve() - break - case wire.MessageType.DEVICE_FAIL: - var message = wire.DeviceFailMessage.decode(wrapper.message) - log.error('[%s] FAIL: ', message.serial, message.reason) - resolvers[message.serial].reject(new Error(message.reason)) - break - default: - throw new Error('Unexpected response') - } - } - } - - - sub.subscribe(ours) - sub.on('message', messageListener) - - push.send([group, wireutil.makeShellCommandMessage( - ours - , args - )]) - - var promises = Object.keys(resolvers).map(function(serial) { - return resolvers[serial].promise - }) - - Promise.settle(promises) - .then(function() { - sub.unsubscribe(ours) - sub.removeListener('message', messageListener) - rl.prompt() - }) - break - case 'exit': - case 'quit': - process.exit(0) - break - default: - console.log('Unknown command') - rl.prompt() - break - } - }) - -} diff --git a/lib/util/tx.js b/lib/util/tx.js deleted file mode 100644 index b2bf9222..00000000 --- a/lib/util/tx.js +++ /dev/null @@ -1,87 +0,0 @@ -var uuid = require('node-uuid') -var Promise = require('bluebird') - -function newId() { - return uuid.v4(null, new Buffer(16)) -} - -module.exports.newId = newId - -function compareId(id1, id2) { - for (var i = 0; i < 16; ++i) { - if (id1[i] < id2[i]) return -1 - if (id1[i] > id2[i]) return 1 - } - return 0 -} - -module.exports.compareId = compareId - -function q(output, input, channel, args) { - var deferred = Promise.defer() - , ourId = newId() - , results = [] - , mapping = {} - , remaining = 0 // @todo pass expected number to query - - function onMessage(theirId, serial, state, data) { - if (compareId(ourId, theirId) === 0) { - serial = serial.toString() - state = state.toString() - - var mapped = mapping[serial] - - if (!mapped) { - results.push(mapped = mapping[serial] = { - serial: serial - , state: state - , progress: 0 - , value: null - }) - } - else { - mapped.state = state - } - - switch (state) { - case 'ACK': - deferred.progress(results) - ++remaining - break - case 'PRG': - mapped.progress = +data - deferred.progress(results) - break - case 'ERR': - mapped.value = data - --remaining - break - case 'OKY': - mapped.progress = 100 - mapped.value = data - --remaining - break - } - - if (remaining) { - deferred.progress(results) - } - else { - deferred.resolve(results) - } - } - } - - input.on('message', onMessage) - input.subscribe(ourId) - - output.send([channel, ourId].concat(args)) - - return deferred.promise.finally(function() { - input.unsubscribe(ourId) - input.removeListener('message', onMessage) - mapping = results = null - }) -} - -module.exports.q = q