diff --git a/lib/cli.js b/lib/cli.js index 93daea44..0af3aff2 100644 --- a/lib/cli.js +++ b/lib/cli.js @@ -874,6 +874,9 @@ program .option('-c, --connect-push ' , 'push endpoint' , cliutil.list) + .option('-u, --connect-sub ' + , 'sub endpoint' + , cliutil.list) .action(function(options) { if (!options.secret) { this.missingArgument('--secret') @@ -881,6 +884,9 @@ program if (!options.connectPush) { this.missingArgument('--connect-push') } + if (!options.connectSub) { + this.missingArgument('--connect-sub') + } require('./units/api')({ port: options.port @@ -888,6 +894,7 @@ program , secret: options.secret , endpoints: { push: options.connectPush + , sub: options.connectSub } }) }) @@ -1338,6 +1345,7 @@ program , '--port', options.apiPort , '--secret', options.authSecret , '--connect-push', options.bindAppPull + , '--connect-sub', options.bindAppPub ]) // websocket , procutil.fork(__filename, [ diff --git a/lib/db/api.js b/lib/db/api.js index c1aa36b8..c5d7adfc 100644 --- a/lib/db/api.js +++ b/lib/db/api.js @@ -152,7 +152,7 @@ dbapi.saveDeviceInitialState = function(serial, device) { , ready: false , reverseForwards: [] , remoteConnect: false - , remoteDebugUrl: null + , remoteConnectUrl: null } return db.run(r.table('devices').get(serial).update(data)) .then(function(stats) { @@ -167,14 +167,14 @@ dbapi.saveDeviceInitialState = function(serial, device) { dbapi.setDeviceConnectUrl = function(serial, url) { return db.run(r.table('devices').get(serial).update({ - remoteDebugUrl: url + remoteConnectUrl: url , remoteConnect: true })) } dbapi.unsetDeviceConnectUrl = function(serial, url) { return db.run(r.table('devices').get(serial).update({ - remoteDebugUrl: null + remoteConnectUrl: null , remoteConnect: false })) } diff --git a/lib/units/api/controllers/devices.js b/lib/units/api/controllers/devices.js index 4afc3b9f..39a68fbf 100644 --- a/lib/units/api/controllers/devices.js +++ b/lib/units/api/controllers/devices.js @@ -49,24 +49,22 @@ function getDeviceBySerial(req, res) { dbapi.loadDevice(serial) .then(function(device) { - if (device) { - datautil.normalize(device, req.user) - - if(fields) { - device = _.pick(device, fields.split(',')) - } - - res.json({ - success: true - , device: device - }) - } - else { - res.status(404).json({ + if (!device) { + return res.status(404).json({ success: false , description: 'Device not found' }) } + + datautil.normalize(device, req.user) + if(fields) { + device = _.pick(device, fields.split(',')) + } + + res.json({ + success: true + , device: device + }) }) .catch(function(err) { log.error('Failed to load device "%s": ', req.params.serial, err.stack) diff --git a/lib/units/api/controllers/user.js b/lib/units/api/controllers/user.js index fcab8155..581b9c64 100644 --- a/lib/units/api/controllers/user.js +++ b/lib/units/api/controllers/user.js @@ -1,13 +1,16 @@ var util = require('util') -var Promise = require('bluebird') var _ = require('lodash') +var Promise = require('bluebird') +var uuid = require('node-uuid') var dbapi = require('../../../db/api') var logger = require('../../../util/logger') var datautil = require('../../../util/datautil') +var deviceutil = require('../../../util/deviceutil') var wire = require('../../../wire') var wireutil = require('../../../wire/util') +var wirerouter = require('../../../wire/router') var log = logger.createLogger('api:controllers:user') @@ -66,32 +69,29 @@ function getUserDeviceBySerial(req, res) { dbapi.loadDevice(serial) .then(function(device) { - if (device) { - datautil.normalize(device, req.user) - - if (device.owner && device.owner.email === req.user.email) { - if(fields) { - device = _.pick(device, fields.split(',')) - } - - res.json({ - success: true - , device: device - }) - } - else { - res.status(401).json({ - success: false - , description: 'Device is not owned by you' - }) - } - } - else { - res.status(404).json({ + if (!device) { + return res.status(404).json({ success: false , description: 'Device not found' }) } + + datautil.normalize(device, req.user) + if (!deviceutil.isOwnedByUser(device, req.user)) { + return res.status(403).json({ + success: false + , description: 'Device is not owned by you' + }) + } + + if(fields) { + device = _.pick(device, fields.split(',')) + } + + res.json({ + success: true + , device: device + }) }) .catch(function(err) { log.error('Failed to load device "%s": ', req.params.serial, err.stack) @@ -107,55 +107,71 @@ function addUserDevice(req, res) { dbapi.loadDevice(serial) .then(function(device) { - if (device) { - datautil.normalize(device, req.user) - if(device.present && device.ready && !device.using && !device.owner) { - - var requirements = { - 'serial': { - 'value': serial, - 'match': 'exact' - } - } - - req.options.push.send([ - device.channel - , wireutil.envelope( - new wire.GroupMessage( - new wire.OwnerMessage( - req.user.email - , req.user.name - , req.user.group - ) - , timeout - , wireutil.toDeviceRequirements(requirements) - ) - ) - ]) - - res.status(202).json({ - success: true - , description: 'Device Add request is accepted. Check if device is successfully added using pollingUrl' - , pollingUrl: util.format('%s://%s%s/user/devices/%s' - , req.protocol - , req.get('host') - , req.swagger.operation.api.basePath - , serial - ) - }) - - } else { - res.status(401).json({ - success: false - , description: 'Device is being used or not available' - }) - } - } else { - res.status(404).json({ + if (!device) { + return res.status(404).json({ success: false - , description: 'Bad device serial' + , description: 'Device not found' }) } + + datautil.normalize(device, req.user) + if (!deviceutil.isAddable(device, req.user)) { + return res.status(403).json({ + success: false + , description: 'Device is being used or not available' + }) + } + + // Timer will be called if no JoinGroupMessage is received till 5 seconds + var responseTimer = setTimeout(function() { + req.options.channelRouter.removeListener(wireutil.global, messageListener) + return res.status(504).json({ + success: false + , description: 'Device is not responding' + }) + }, 5000) + + var messageListener = wirerouter() + .on(wire.JoinGroupMessage, function(channel, message) { + if (message.serial === serial && message.owner.email === req.user.email) { + clearTimeout(responseTimer) + req.options.channelRouter.removeListener(wireutil.global, messageListener) + + return res.json({ + success: true + , description: 'Device successfully added' + }) + } + }) + .handler() + + req.options.channelRouter.on(wireutil.global, messageListener) + + req.options.push.send([ + device.channel + , wireutil.envelope( + new wire.GroupMessage( + new wire.OwnerMessage( + req.user.email + , req.user.name + , req.user.group + ) + , timeout + , wireutil.toDeviceRequirements({ + 'serial': { + 'value': serial + , 'match': 'exact' + } + }) + ) + ) + ]) + }) + .catch(function(err) { + log.error('Failed to load device "%s": ', req.params.serial, err.stack) + res.status(500).json({ + success: false + }) }) } @@ -164,49 +180,66 @@ function deleteUserDeviceBySerial(req, res) { dbapi.loadDevice(serial) .then(function(device) { - if (device) { - datautil.normalize(device, req.user) - if(device.using && device.owner.email == req.user.email) { - - var requirements = { - 'serial': { - 'value': serial, - 'match': 'exact' - } - } - - req.options.push.send([ - device.channel - , wireutil.envelope( - new wire.UngroupMessage( - wireutil.toDeviceRequirements(requirements) - ) - ) - ]) - - res.status(202).json({ - success: true - , description: 'Device Release request is accepted. Check if device is successfully removed using pollingUrl' - , pollingUrl: util.format('%s://%s%s/user/devices/%s' - , req.protocol - , req.get('host') - , req.swagger.operation.api.basePath - , serial - ) - }) - - } else { - res.status(401).json({ - success: false - , description: 'You cannot kick this device' - }) - } - } else { - res.status(404).json({ + if (!device) { + return res.status(404).json({ success: false - , description: 'Bad device serial' + , description: 'Device not found' }) } + + datautil.normalize(device, req.user) + if (!deviceutil.isOwnedByUser(device, req.user)) { + return res.status(403).json({ + success: false + , description: 'You cannot release this device. Not owned by you' + }) + + } + + // Timer will be called if no JoinGroupMessage is received till 5 seconds + var responseTimer = setTimeout(function() { + req.options.channelRouter.removeListener(wireutil.global, messageListener) + return res.status(504).json({ + success: false + , description: 'Device is not responding' + }) + }, 5000) + + var messageListener = wirerouter() + .on(wire.LeaveGroupMessage, function(channel, message) { + if (message.serial === serial && message.owner.email === req.user.email) { + clearTimeout(responseTimer) + req.options.channelRouter.removeListener(wireutil.global, messageListener) + + return res.json({ + success: true + , description: 'Device successfully removed' + }) + } + }) + .handler() + + req.options.channelRouter.on(wireutil.global, messageListener) + + req.options.push.send([ + device.channel + , wireutil.envelope( + new wire.UngroupMessage( + wireutil.toDeviceRequirements({ + 'serial': { + 'value': serial + , 'match': 'exact' + } + }) + ) + ) + ]) + }) + .catch(function(err) { + log.error('Failed to load device "%s": ', req.params.serial, err.stack) + res.status(500).json({ + success: false + }) }) } @@ -215,41 +248,58 @@ function remoteConnectUserDeviceBySerial(req, res) { dbapi.loadDevice(serial) .then(function(device) { - if (device) { - datautil.normalize(device, req.user) - - if (device.present && device.ready && device.using && device.owner.email === req.user.email) { - req.options.push.send([ - device.channel - , wireutil.envelope( - new wire.ConnectStartMessage() - ) - ]) - - res.status(202).json({ - success: true - , description: 'Device Connect request is accepted. Check if device is successfully connected using pollingUrl' - , pollingUrl: util.format('%s://%s%s/user/devices/%s' - , req.protocol - , req.get('host') - , req.swagger.operation.api.basePath - , serial - ) - }) - } - else { - res.status(401).json({ - success: false - , description: 'Device is not owned by you or is not available' - }) - } - } - else { - res.status(404).json({ + if (!device) { + return res.status(404).json({ success: false , description: 'Device not found' }) } + + datautil.normalize(device, req.user) + if (!deviceutil.isOwnedByUser(device, req.user)) { + return res.status(403).json({ + success: false + , description: 'Device is not owned by you or is not available' + }) + } + + var responseChannel = 'txn_' + uuid.v4() + req.options.sub.subscribe(responseChannel) + + // Timer will be called if no JoinGroupMessage is received till 5 seconds + var timer = setTimeout(function() { + req.options.channelRouter.removeListener(responseChannel, messageListener) + req.options.sub.unsubscribe(responseChannel) + return res.status(504).json({ + success: false + , description: 'Device is not responding' + }) + }, 5000) + + var messageListener = wirerouter() + .on(wire.ConnectStartedMessage, function(channel, message) { + if (message.serial === serial) { + clearTimeout(timer) + req.options.sub.unsubscribe(responseChannel) + req.options.channelRouter.removeListener(responseChannel, messageListener) + + return res.json({ + success: true + , remoteConnectUrl: message.url + }) + } + }) + .handler() + + req.options.channelRouter.on(responseChannel, messageListener) + + req.options.push.send([ + device.channel + , wireutil.transaction( + responseChannel + , new wire.ConnectStartMessage() + ) + ]) }) .catch(function(err) { log.error('Failed to load device "%s": ', req.params.serial, err.stack) @@ -264,41 +314,59 @@ function remoteDisconnectUserDeviceBySerial(req, res) { dbapi.loadDevice(serial) .then(function(device) { - if (device) { - datautil.normalize(device, req.user) - if (device.present && device.ready && device.using && device.owner.email == req.user.email) { - req.options.push.send([ - device.channel - , wireutil.envelope( - new wire.ConnectStopMessage() - ) - ]) - - res.status(202).json({ - success: true - , description: 'Device Disonnect request is accepted. Check if device is successfully disconnected using pollingUrl' - , pollingUrl: util.format('%s://%s%s/user/devices/%s' - , req.protocol - , req.get('host') - , req.swagger.operation.api.basePath - , serial - ) - }) - } - else { - res.status(401).json({ - success: false - , description: 'Device is not owned by you or is not available' - }) - } - } - else { - res.status(404).json({ + if (!device) { + return res.status(404).json({ success: false , description: 'Device not found' }) } + + datautil.normalize(device, req.user) + if (!deviceutil.isOwnedByUser(device, req.user)) { + return res.status(403).json({ + success: false + , description: 'Device is not owned by you or is not available' + }) + } + + var responseChannel = 'txn_' + uuid.v4() + req.options.sub.subscribe(responseChannel) + + // Timer will be called if no JoinGroupMessage is received till 5 seconds + var timer = setTimeout(function() { + req.options.channelRouter.removeListener(responseChannel, messageListener) + req.options.sub.unsubscribe(responseChannel) + return res.status(504).json({ + success: false + , description: 'Device is not responding' + }) + }, 5000) + + var messageListener = wirerouter() + .on(wire.ConnectStoppedMessage, function(channel, message) { + if (message.serial === serial) { + clearTimeout(timer) + req.options.sub.unsubscribe(responseChannel) + req.options.channelRouter.removeListener(responseChannel, messageListener) + + return res.json({ + success: true + , description: 'Device remote disconnected successfully' + }) + } + }) + .handler() + + req.options.channelRouter.on(responseChannel, messageListener) + + req.options.push.send([ + device.channel + , wireutil.transaction( + responseChannel + , new wire.ConnectStopMessage() + ) + ]) }) .catch(function(err) { log.error('Failed to load device "%s": ', req.params.serial, err.stack) diff --git a/lib/units/api/index.js b/lib/units/api/index.js index 16fc1711..8bc3dd46 100644 --- a/lib/units/api/index.js +++ b/lib/units/api/index.js @@ -1,5 +1,6 @@ var http = require('http') var path = require('path') +var events = require('events') var express = require('express') var SwaggerExpress = require('swagger-express-mw') @@ -11,11 +12,13 @@ var logger = require('../../util/logger') var zmqutil = require('../../util/zmqutil') var srv = require('../../util/srv') var lifecycle = require('../../util/lifecycle') +var wireutil = require('../../wire/util') module.exports = function(options) { var log = logger.createLogger('api') , app = express() , server = http.createServer(app) + , channelRouter = new events.EventEmitter() var push = zmqutil.socket('push') Promise.map(options.endpoints.push, function(endpoint) { @@ -27,18 +30,47 @@ module.exports = function(options) { }) }) }) - .catch(function(err) { + .catch(function(err) {w log.fatal('Unable to connect to push endpoint', err) lifecycle.fatal() }) + // Input + var sub = zmqutil.socket('sub') + Promise.map(options.endpoints.sub, function(endpoint) { + return srv.resolve(endpoint).then(function(records) { + return srv.attempt(records, function(record) { + log.info('Receiving input from "%s"', record.url) + sub.connect(record.url) + return Promise.resolve(true) + }) + }) + }) + .catch(function(err) { + log.fatal('Unable to connect to sub endpoint', err) + lifecycle.fatal() + }) + + // Establish always-on channels + ;[wireutil.global].forEach(function(channel) { + log.info('Subscribing to permanent channel "%s"', channel) + sub.subscribe(channel) + }) + + sub.on('message', function(channel, data) { + channelRouter.emit(channel.toString(), channel, data) + }) + + // Swagger Express Config var config = { appRoot: __dirname , swaggerFile: path.resolve(__dirname, 'swagger', 'api_v1.yaml') }; SwaggerExpress.create(config, function(err, swaggerExpress) { - if (err) { throw err; } + if (err) { + throw err + } swaggerExpress.register(app); }) @@ -47,6 +79,8 @@ module.exports = function(options) { app.use(function(req, res, next) { var reqOptions = _.merge(options, { 'push': push + , 'sub': sub + , 'channelRouter': channelRouter }) req.options = reqOptions @@ -59,6 +93,17 @@ module.exports = function(options) { , keys: [options.secret] })) + lifecycle.observe(function() { + [push, sub].forEach(function(sock) { + try { + sock.close() + } + catch (err) { + // No-op + } + }) + }) + server.listen(options.port) log.info('Listening on port %d', options.port) } diff --git a/lib/units/api/swagger/api_v1.yaml b/lib/units/api/swagger/api_v1.yaml index aa455bdd..10a9c97f 100644 --- a/lib/units/api/swagger/api_v1.yaml +++ b/lib/units/api/swagger/api_v1.yaml @@ -46,7 +46,7 @@ paths: /user/devices: x-swagger-router-controller: user get: - summary: List devices owned by current user + summary: User Devices description: The User Devices endpoint returns device list owner by current authorized user operationId: getUserDevices tags: @@ -70,22 +70,20 @@ paths: - accessTokenAuth: [] post: summary: Add a device to a user - description: The User Devices endpoint will request stf server for a new device. It will return request accepted if device is usable + description: The User Devices endpoint will request stf server for a new device. operationId: addUserDevice tags: - user parameters: - - name: devices + - name: device in: body description: Device to add required: true schema: $ref: "#/definitions/AddUserDevicePayload" responses: - "202": - description: Add User Device Request Status and polling Url - schema: - $ref: "#/definitions/AddUserDeviceResponse" + "200": + description: Add User Device Status default: description: Unexpected Error schema: @@ -95,8 +93,8 @@ paths: /user/devices/{serial}: x-swagger-router-controller: user get: - summary: Device Information - description: The device enpoint return information about device owned by user + summary: User Device + description: The devices enpoint return information about device owned by user operationId: getUserDeviceBySerial tags: - user @@ -113,9 +111,9 @@ paths: type: string responses: "200": - description: Delete User Device Request Status and polling Url + description: Device Information owned by user schema: - $ref: "#/definitions/DeleteUserDeviceBySerialResponse" + $ref: "#/definitions/DeviceResponse" default: description: Unexpected Error schema: @@ -123,7 +121,7 @@ paths: security: - accessTokenAuth: [] delete: - summary: Release device from user + summary: Delete User Device description: The User Devices endpoint will request for device release from stf server. It will return request accepted if device is being used by current user operationId: deleteUserDeviceBySerial tags: @@ -135,8 +133,8 @@ paths: required: true type: string responses: - "202": - description: Device Release Request Status + "200": + description: Delete User Device Status default: description: Unexpected Error schema: @@ -160,7 +158,7 @@ paths: required: true type: string responses: - "202": + "200": description: Remote Connect User Device Request Status schema: $ref: "#/definitions/RemoteConnectUserDeviceResponse" @@ -183,10 +181,8 @@ paths: required: true type: string responses: - "202": + "200": description: Remote Disonnect User Device Request Status - schema: - $ref: "#/definitions/RemoteDisconnectUserDeviceResponse" default: description: Unexpected Error schema: @@ -194,7 +190,7 @@ paths: security: - accessTokenAuth: [] /user/accessTokens: - x-swagger-router-controller: token + x-swagger-router-controller: user get: summary: Access Tokens description: The Access Tokens endpoints returns titles of all the valid access tokens @@ -298,35 +294,14 @@ definitions: properties: device: type: object - AddUserDeviceResponse: - required: - - pollingUrl - properties: - pollingUrl: - type: string - DeleteUserDeviceBySerialResponse: - required: - - pollingUrl - properties: - pollingUrl: - type: string - RemoteDisconnectUserDeviceResponse: - required: - - pollingUrl - properties: - pollingUrl: - type: string RemoteConnectUserDeviceResponse: required: - - pollingUrl + - remoteConnectUrl + - serial properties: - pollingUrl: + remoteConnectUrl: type: string - ErrorResponse: - required: - - message - properties: - message: + serial: type: string AddUserDevicePayload: description: payload object for adding device to user @@ -339,7 +314,12 @@ definitions: timeout: description: Device timeout in ms. If device is kept idle for this period, it will be automatically disconnected. Default is provider group timeout type: integer - + ErrorResponse: + required: + - message + properties: + message: + type: string securityDefinitions: accessTokenAuth: type: apiKey diff --git a/lib/units/device/plugins/connect.js b/lib/units/device/plugins/connect.js index 55d93ef6..90bf3f79 100644 --- a/lib/units/device/plugins/connect.js +++ b/lib/units/device/plugins/connect.js @@ -142,15 +142,16 @@ module.exports = syrup.serial() channel , reply.okay(url) ]) + // Update DB push.send([ - wireutil.global + channel , wireutil.envelope(new wire.ConnectStartedMessage( options.serial , url )) ]) - log.info('Remote Connect Started for device "%s" at "%s"', options.serial, url) + log.important('Remote Connect Started for device "%s" at "%s"', options.serial, url) }) .catch(function(err) { log.error('Unable to start remote connect service', err.stack) @@ -170,12 +171,12 @@ module.exports = syrup.serial() ]) // Update DB push.send([ - wireutil.global + channel , wireutil.envelope(new wire.ConnectStoppedMessage( options.serial )) ]) - log.info('Remote Connect Stopped for device "%s"', options.serial) + log.important('Remote Connect Stopped for device "%s"', options.serial) }) .catch(function(err) { log.error('Failed to stop connect service', err.stack) diff --git a/lib/units/processor/index.js b/lib/units/processor/index.js index 733e6852..886fcdbb 100644 --- a/lib/units/processor/index.js +++ b/lib/units/processor/index.js @@ -167,9 +167,11 @@ module.exports = function(options) { }) .on(wire.ConnectStartedMessage, function(channel, message, data) { dbapi.setDeviceConnectUrl(message.serial, message.url) + appDealer.send([channel, data]) }) .on(wire.ConnectStoppedMessage, function(channel, message, data) { dbapi.unsetDeviceConnectUrl(message.serial) + appDealer.send([channel, data]) }) .on(wire.JoinGroupMessage, function(channel, message, data) { dbapi.setDeviceOwner(message.serial, message.owner) diff --git a/lib/util/datautil.js b/lib/util/datautil.js index 8b04b368..12dd9e42 100644 --- a/lib/util/datautil.js +++ b/lib/util/datautil.js @@ -59,7 +59,7 @@ datautil.applyOwnerOnlyInfo = function(device, user) { if (device.owner && device.owner.email === user.email) { } else { device.remoteConnect = false - device.remoteDebugUrl = null + device.remoteConnectUrl = null } } diff --git a/lib/util/deviceutil.js b/lib/util/deviceutil.js new file mode 100644 index 00000000..d5264830 --- /dev/null +++ b/lib/util/deviceutil.js @@ -0,0 +1,13 @@ +var logger = require('./logger') + +var log = logger.createLogger('util:deviceutil') + +var deviceutil = module.exports = Object.create(null) + +deviceutil.isOwnedByUser = function(device, user) { + return device.present && device.ready && device.owner && device.owner.email === user.email && device.using +} + +deviceutil.isAddable = function(device, user) { + return device.present && device.ready && !device.using && !device.owner +}