1
0
Fork 0
mirror of https://github.com/openstf/stf synced 2025-10-04 02:09:32 +02:00

Modify APIs from Aysnc -> Sync using timeout

This commit is contained in:
Vishal Banthia 2015-12-14 14:02:20 +09:00
parent d67d06a19f
commit a6266931ad
10 changed files with 354 additions and 239 deletions

View file

@ -874,6 +874,9 @@ program
.option('-c, --connect-push <endpoint>'
, 'push endpoint'
, cliutil.list)
.option('-u, --connect-sub <endpoint>'
, 'sub endpoint'
, cliutil.list)
.action(function(options) {
if (!options.secret) {
this.missingArgument('--secret')
@ -881,6 +884,9 @@ program
if (!options.connectPush) {
this.missingArgument('--connect-push')
}
if (!options.connectSub) {
this.missingArgument('--connect-sub')
}
require('./units/api')({
port: options.port
@ -888,6 +894,7 @@ program
, secret: options.secret
, endpoints: {
push: options.connectPush
, sub: options.connectSub
}
})
})
@ -1338,6 +1345,7 @@ program
, '--port', options.apiPort
, '--secret', options.authSecret
, '--connect-push', options.bindAppPull
, '--connect-sub', options.bindAppPub
])
// websocket
, procutil.fork(__filename, [

View file

@ -152,7 +152,7 @@ dbapi.saveDeviceInitialState = function(serial, device) {
, ready: false
, reverseForwards: []
, remoteConnect: false
, remoteDebugUrl: null
, remoteConnectUrl: null
}
return db.run(r.table('devices').get(serial).update(data))
.then(function(stats) {
@ -167,14 +167,14 @@ dbapi.saveDeviceInitialState = function(serial, device) {
dbapi.setDeviceConnectUrl = function(serial, url) {
return db.run(r.table('devices').get(serial).update({
remoteDebugUrl: url
remoteConnectUrl: url
, remoteConnect: true
}))
}
dbapi.unsetDeviceConnectUrl = function(serial, url) {
return db.run(r.table('devices').get(serial).update({
remoteDebugUrl: null
remoteConnectUrl: null
, remoteConnect: false
}))
}

View file

@ -49,9 +49,14 @@ function getDeviceBySerial(req, res) {
dbapi.loadDevice(serial)
.then(function(device) {
if (device) {
datautil.normalize(device, req.user)
if (!device) {
return res.status(404).json({
success: false
, description: 'Device not found'
})
}
datautil.normalize(device, req.user)
if(fields) {
device = _.pick(device, fields.split(','))
}
@ -60,13 +65,6 @@ function getDeviceBySerial(req, res) {
success: true
, device: device
})
}
else {
res.status(404).json({
success: false
, description: 'Device not found'
})
}
})
.catch(function(err) {
log.error('Failed to load device "%s": ', req.params.serial, err.stack)

View file

@ -1,13 +1,16 @@
var util = require('util')
var Promise = require('bluebird')
var _ = require('lodash')
var Promise = require('bluebird')
var uuid = require('node-uuid')
var dbapi = require('../../../db/api')
var logger = require('../../../util/logger')
var datautil = require('../../../util/datautil')
var deviceutil = require('../../../util/deviceutil')
var wire = require('../../../wire')
var wireutil = require('../../../wire/util')
var wirerouter = require('../../../wire/router')
var log = logger.createLogger('api:controllers:user')
@ -66,10 +69,21 @@ function getUserDeviceBySerial(req, res) {
dbapi.loadDevice(serial)
.then(function(device) {
if (device) {
datautil.normalize(device, req.user)
if (!device) {
return res.status(404).json({
success: false
, description: 'Device not found'
})
}
datautil.normalize(device, req.user)
if (!deviceutil.isOwnedByUser(device, req.user)) {
return res.status(403).json({
success: false
, description: 'Device is not owned by you'
})
}
if (device.owner && device.owner.email === req.user.email) {
if(fields) {
device = _.pick(device, fields.split(','))
}
@ -78,20 +92,6 @@ function getUserDeviceBySerial(req, res) {
success: true
, device: device
})
}
else {
res.status(401).json({
success: false
, description: 'Device is not owned by you'
})
}
}
else {
res.status(404).json({
success: false
, description: 'Device not found'
})
}
})
.catch(function(err) {
log.error('Failed to load device "%s": ', req.params.serial, err.stack)
@ -107,16 +107,45 @@ function addUserDevice(req, res) {
dbapi.loadDevice(serial)
.then(function(device) {
if (device) {
datautil.normalize(device, req.user)
if(device.present && device.ready && !device.using && !device.owner) {
if (!device) {
return res.status(404).json({
success: false
, description: 'Device not found'
})
}
var requirements = {
'serial': {
'value': serial,
'match': 'exact'
datautil.normalize(device, req.user)
if (!deviceutil.isAddable(device, req.user)) {
return res.status(403).json({
success: false
, description: 'Device is being used or not available'
})
}
// Timer will be called if no JoinGroupMessage is received till 5 seconds
var responseTimer = setTimeout(function() {
req.options.channelRouter.removeListener(wireutil.global, messageListener)
return res.status(504).json({
success: false
, description: 'Device is not responding'
})
}, 5000)
var messageListener = wirerouter()
.on(wire.JoinGroupMessage, function(channel, message) {
if (message.serial === serial && message.owner.email === req.user.email) {
clearTimeout(responseTimer)
req.options.channelRouter.removeListener(wireutil.global, messageListener)
return res.json({
success: true
, description: 'Device successfully added'
})
}
})
.handler()
req.options.channelRouter.on(wireutil.global, messageListener)
req.options.push.send([
device.channel
@ -128,34 +157,21 @@ function addUserDevice(req, res) {
, req.user.group
)
, timeout
, wireutil.toDeviceRequirements(requirements)
, wireutil.toDeviceRequirements({
'serial': {
'value': serial
, 'match': 'exact'
}
})
)
)
])
res.status(202).json({
success: true
, description: 'Device Add request is accepted. Check if device is successfully added using pollingUrl'
, pollingUrl: util.format('%s://%s%s/user/devices/%s'
, req.protocol
, req.get('host')
, req.swagger.operation.api.basePath
, serial
)
})
} else {
res.status(401).json({
.catch(function(err) {
log.error('Failed to load device "%s": ', req.params.serial, err.stack)
res.status(500).json({
success: false
, description: 'Device is being used or not available'
})
}
} else {
res.status(404).json({
success: false
, description: 'Bad device serial'
})
}
})
}
@ -164,49 +180,66 @@ function deleteUserDeviceBySerial(req, res) {
dbapi.loadDevice(serial)
.then(function(device) {
if (device) {
datautil.normalize(device, req.user)
if(device.using && device.owner.email == req.user.email) {
if (!device) {
return res.status(404).json({
success: false
, description: 'Device not found'
})
}
datautil.normalize(device, req.user)
if (!deviceutil.isOwnedByUser(device, req.user)) {
return res.status(403).json({
success: false
, description: 'You cannot release this device. Not owned by you'
})
var requirements = {
'serial': {
'value': serial,
'match': 'exact'
}
// Timer will be called if no JoinGroupMessage is received till 5 seconds
var responseTimer = setTimeout(function() {
req.options.channelRouter.removeListener(wireutil.global, messageListener)
return res.status(504).json({
success: false
, description: 'Device is not responding'
})
}, 5000)
var messageListener = wirerouter()
.on(wire.LeaveGroupMessage, function(channel, message) {
if (message.serial === serial && message.owner.email === req.user.email) {
clearTimeout(responseTimer)
req.options.channelRouter.removeListener(wireutil.global, messageListener)
return res.json({
success: true
, description: 'Device successfully removed'
})
}
})
.handler()
req.options.channelRouter.on(wireutil.global, messageListener)
req.options.push.send([
device.channel
, wireutil.envelope(
new wire.UngroupMessage(
wireutil.toDeviceRequirements(requirements)
wireutil.toDeviceRequirements({
'serial': {
'value': serial
, 'match': 'exact'
}
})
)
)
])
res.status(202).json({
success: true
, description: 'Device Release request is accepted. Check if device is successfully removed using pollingUrl'
, pollingUrl: util.format('%s://%s%s/user/devices/%s'
, req.protocol
, req.get('host')
, req.swagger.operation.api.basePath
, serial
)
})
} else {
res.status(401).json({
.catch(function(err) {
log.error('Failed to load device "%s": ', req.params.serial, err.stack)
res.status(500).json({
success: false
, description: 'You cannot kick this device'
})
}
} else {
res.status(404).json({
success: false
, description: 'Bad device serial'
})
}
})
}
@ -215,41 +248,58 @@ function remoteConnectUserDeviceBySerial(req, res) {
dbapi.loadDevice(serial)
.then(function(device) {
if (device) {
datautil.normalize(device, req.user)
if (device.present && device.ready && device.using && device.owner.email === req.user.email) {
req.options.push.send([
device.channel
, wireutil.envelope(
new wire.ConnectStartMessage()
)
])
res.status(202).json({
success: true
, description: 'Device Connect request is accepted. Check if device is successfully connected using pollingUrl'
, pollingUrl: util.format('%s://%s%s/user/devices/%s'
, req.protocol
, req.get('host')
, req.swagger.operation.api.basePath
, serial
)
})
}
else {
res.status(401).json({
success: false
, description: 'Device is not owned by you or is not available'
})
}
}
else {
res.status(404).json({
if (!device) {
return res.status(404).json({
success: false
, description: 'Device not found'
})
}
datautil.normalize(device, req.user)
if (!deviceutil.isOwnedByUser(device, req.user)) {
return res.status(403).json({
success: false
, description: 'Device is not owned by you or is not available'
})
}
var responseChannel = 'txn_' + uuid.v4()
req.options.sub.subscribe(responseChannel)
// Timer will be called if no JoinGroupMessage is received till 5 seconds
var timer = setTimeout(function() {
req.options.channelRouter.removeListener(responseChannel, messageListener)
req.options.sub.unsubscribe(responseChannel)
return res.status(504).json({
success: false
, description: 'Device is not responding'
})
}, 5000)
var messageListener = wirerouter()
.on(wire.ConnectStartedMessage, function(channel, message) {
if (message.serial === serial) {
clearTimeout(timer)
req.options.sub.unsubscribe(responseChannel)
req.options.channelRouter.removeListener(responseChannel, messageListener)
return res.json({
success: true
, remoteConnectUrl: message.url
})
}
})
.handler()
req.options.channelRouter.on(responseChannel, messageListener)
req.options.push.send([
device.channel
, wireutil.transaction(
responseChannel
, new wire.ConnectStartMessage()
)
])
})
.catch(function(err) {
log.error('Failed to load device "%s": ', req.params.serial, err.stack)
@ -264,41 +314,59 @@ function remoteDisconnectUserDeviceBySerial(req, res) {
dbapi.loadDevice(serial)
.then(function(device) {
if (device) {
datautil.normalize(device, req.user)
if (device.present && device.ready && device.using && device.owner.email == req.user.email) {
req.options.push.send([
device.channel
, wireutil.envelope(
new wire.ConnectStopMessage()
)
])
res.status(202).json({
success: true
, description: 'Device Disonnect request is accepted. Check if device is successfully disconnected using pollingUrl'
, pollingUrl: util.format('%s://%s%s/user/devices/%s'
, req.protocol
, req.get('host')
, req.swagger.operation.api.basePath
, serial
)
})
}
else {
res.status(401).json({
success: false
, description: 'Device is not owned by you or is not available'
})
}
}
else {
res.status(404).json({
if (!device) {
return res.status(404).json({
success: false
, description: 'Device not found'
})
}
datautil.normalize(device, req.user)
if (!deviceutil.isOwnedByUser(device, req.user)) {
return res.status(403).json({
success: false
, description: 'Device is not owned by you or is not available'
})
}
var responseChannel = 'txn_' + uuid.v4()
req.options.sub.subscribe(responseChannel)
// Timer will be called if no JoinGroupMessage is received till 5 seconds
var timer = setTimeout(function() {
req.options.channelRouter.removeListener(responseChannel, messageListener)
req.options.sub.unsubscribe(responseChannel)
return res.status(504).json({
success: false
, description: 'Device is not responding'
})
}, 5000)
var messageListener = wirerouter()
.on(wire.ConnectStoppedMessage, function(channel, message) {
if (message.serial === serial) {
clearTimeout(timer)
req.options.sub.unsubscribe(responseChannel)
req.options.channelRouter.removeListener(responseChannel, messageListener)
return res.json({
success: true
, description: 'Device remote disconnected successfully'
})
}
})
.handler()
req.options.channelRouter.on(responseChannel, messageListener)
req.options.push.send([
device.channel
, wireutil.transaction(
responseChannel
, new wire.ConnectStopMessage()
)
])
})
.catch(function(err) {
log.error('Failed to load device "%s": ', req.params.serial, err.stack)

View file

@ -1,5 +1,6 @@
var http = require('http')
var path = require('path')
var events = require('events')
var express = require('express')
var SwaggerExpress = require('swagger-express-mw')
@ -11,11 +12,13 @@ var logger = require('../../util/logger')
var zmqutil = require('../../util/zmqutil')
var srv = require('../../util/srv')
var lifecycle = require('../../util/lifecycle')
var wireutil = require('../../wire/util')
module.exports = function(options) {
var log = logger.createLogger('api')
, app = express()
, server = http.createServer(app)
, channelRouter = new events.EventEmitter()
var push = zmqutil.socket('push')
Promise.map(options.endpoints.push, function(endpoint) {
@ -27,18 +30,47 @@ module.exports = function(options) {
})
})
})
.catch(function(err) {
.catch(function(err) {w
log.fatal('Unable to connect to push endpoint', err)
lifecycle.fatal()
})
// Input
var sub = zmqutil.socket('sub')
Promise.map(options.endpoints.sub, function(endpoint) {
return srv.resolve(endpoint).then(function(records) {
return srv.attempt(records, function(record) {
log.info('Receiving input from "%s"', record.url)
sub.connect(record.url)
return Promise.resolve(true)
})
})
})
.catch(function(err) {
log.fatal('Unable to connect to sub endpoint', err)
lifecycle.fatal()
})
// Establish always-on channels
;[wireutil.global].forEach(function(channel) {
log.info('Subscribing to permanent channel "%s"', channel)
sub.subscribe(channel)
})
sub.on('message', function(channel, data) {
channelRouter.emit(channel.toString(), channel, data)
})
// Swagger Express Config
var config = {
appRoot: __dirname
, swaggerFile: path.resolve(__dirname, 'swagger', 'api_v1.yaml')
};
SwaggerExpress.create(config, function(err, swaggerExpress) {
if (err) { throw err; }
if (err) {
throw err
}
swaggerExpress.register(app);
})
@ -47,6 +79,8 @@ module.exports = function(options) {
app.use(function(req, res, next) {
var reqOptions = _.merge(options, {
'push': push
, 'sub': sub
, 'channelRouter': channelRouter
})
req.options = reqOptions
@ -59,6 +93,17 @@ module.exports = function(options) {
, keys: [options.secret]
}))
lifecycle.observe(function() {
[push, sub].forEach(function(sock) {
try {
sock.close()
}
catch (err) {
// No-op
}
})
})
server.listen(options.port)
log.info('Listening on port %d', options.port)
}

View file

@ -46,7 +46,7 @@ paths:
/user/devices:
x-swagger-router-controller: user
get:
summary: List devices owned by current user
summary: User Devices
description: The User Devices endpoint returns device list owner by current authorized user
operationId: getUserDevices
tags:
@ -70,22 +70,20 @@ paths:
- accessTokenAuth: []
post:
summary: Add a device to a user
description: The User Devices endpoint will request stf server for a new device. It will return request accepted if device is usable
description: The User Devices endpoint will request stf server for a new device.
operationId: addUserDevice
tags:
- user
parameters:
- name: devices
- name: device
in: body
description: Device to add
required: true
schema:
$ref: "#/definitions/AddUserDevicePayload"
responses:
"202":
description: Add User Device Request Status and polling Url
schema:
$ref: "#/definitions/AddUserDeviceResponse"
"200":
description: Add User Device Status
default:
description: Unexpected Error
schema:
@ -95,8 +93,8 @@ paths:
/user/devices/{serial}:
x-swagger-router-controller: user
get:
summary: Device Information
description: The device enpoint return information about device owned by user
summary: User Device
description: The devices enpoint return information about device owned by user
operationId: getUserDeviceBySerial
tags:
- user
@ -113,9 +111,9 @@ paths:
type: string
responses:
"200":
description: Delete User Device Request Status and polling Url
description: Device Information owned by user
schema:
$ref: "#/definitions/DeleteUserDeviceBySerialResponse"
$ref: "#/definitions/DeviceResponse"
default:
description: Unexpected Error
schema:
@ -123,7 +121,7 @@ paths:
security:
- accessTokenAuth: []
delete:
summary: Release device from user
summary: Delete User Device
description: The User Devices endpoint will request for device release from stf server. It will return request accepted if device is being used by current user
operationId: deleteUserDeviceBySerial
tags:
@ -135,8 +133,8 @@ paths:
required: true
type: string
responses:
"202":
description: Device Release Request Status
"200":
description: Delete User Device Status
default:
description: Unexpected Error
schema:
@ -160,7 +158,7 @@ paths:
required: true
type: string
responses:
"202":
"200":
description: Remote Connect User Device Request Status
schema:
$ref: "#/definitions/RemoteConnectUserDeviceResponse"
@ -183,10 +181,8 @@ paths:
required: true
type: string
responses:
"202":
"200":
description: Remote Disonnect User Device Request Status
schema:
$ref: "#/definitions/RemoteDisconnectUserDeviceResponse"
default:
description: Unexpected Error
schema:
@ -194,7 +190,7 @@ paths:
security:
- accessTokenAuth: []
/user/accessTokens:
x-swagger-router-controller: token
x-swagger-router-controller: user
get:
summary: Access Tokens
description: The Access Tokens endpoints returns titles of all the valid access tokens
@ -298,35 +294,14 @@ definitions:
properties:
device:
type: object
AddUserDeviceResponse:
required:
- pollingUrl
properties:
pollingUrl:
type: string
DeleteUserDeviceBySerialResponse:
required:
- pollingUrl
properties:
pollingUrl:
type: string
RemoteDisconnectUserDeviceResponse:
required:
- pollingUrl
properties:
pollingUrl:
type: string
RemoteConnectUserDeviceResponse:
required:
- pollingUrl
- remoteConnectUrl
- serial
properties:
pollingUrl:
remoteConnectUrl:
type: string
ErrorResponse:
required:
- message
properties:
message:
serial:
type: string
AddUserDevicePayload:
description: payload object for adding device to user
@ -339,7 +314,12 @@ definitions:
timeout:
description: Device timeout in ms. If device is kept idle for this period, it will be automatically disconnected. Default is provider group timeout
type: integer
ErrorResponse:
required:
- message
properties:
message:
type: string
securityDefinitions:
accessTokenAuth:
type: apiKey

View file

@ -142,15 +142,16 @@ module.exports = syrup.serial()
channel
, reply.okay(url)
])
// Update DB
push.send([
wireutil.global
channel
, wireutil.envelope(new wire.ConnectStartedMessage(
options.serial
, url
))
])
log.info('Remote Connect Started for device "%s" at "%s"', options.serial, url)
log.important('Remote Connect Started for device "%s" at "%s"', options.serial, url)
})
.catch(function(err) {
log.error('Unable to start remote connect service', err.stack)
@ -170,12 +171,12 @@ module.exports = syrup.serial()
])
// Update DB
push.send([
wireutil.global
channel
, wireutil.envelope(new wire.ConnectStoppedMessage(
options.serial
))
])
log.info('Remote Connect Stopped for device "%s"', options.serial)
log.important('Remote Connect Stopped for device "%s"', options.serial)
})
.catch(function(err) {
log.error('Failed to stop connect service', err.stack)

View file

@ -167,9 +167,11 @@ module.exports = function(options) {
})
.on(wire.ConnectStartedMessage, function(channel, message, data) {
dbapi.setDeviceConnectUrl(message.serial, message.url)
appDealer.send([channel, data])
})
.on(wire.ConnectStoppedMessage, function(channel, message, data) {
dbapi.unsetDeviceConnectUrl(message.serial)
appDealer.send([channel, data])
})
.on(wire.JoinGroupMessage, function(channel, message, data) {
dbapi.setDeviceOwner(message.serial, message.owner)

View file

@ -59,7 +59,7 @@ datautil.applyOwnerOnlyInfo = function(device, user) {
if (device.owner && device.owner.email === user.email) {
} else {
device.remoteConnect = false
device.remoteDebugUrl = null
device.remoteConnectUrl = null
}
}

13
lib/util/deviceutil.js Normal file
View file

@ -0,0 +1,13 @@
var logger = require('./logger')
var log = logger.createLogger('util:deviceutil')
var deviceutil = module.exports = Object.create(null)
deviceutil.isOwnedByUser = function(device, user) {
return device.present && device.ready && device.owner && device.owner.email === user.email && device.using
}
deviceutil.isAddable = function(device, user) {
return device.present && device.ready && !device.using && !device.owner
}