diff --git a/lib/units/device/support/push.js b/lib/units/device/support/push.js index 098685ae..d8564f13 100644 --- a/lib/units/device/support/push.js +++ b/lib/units/device/support/push.js @@ -12,15 +12,15 @@ module.exports = syrup.serial() // Output var push = zmq.socket('push') - Promise.map(options.endpoints.push, function(endpoint) { - return srv.resolve(endpoint).then(function(records) { - return srv.attempt(records, function(record) { - log.info('Sending output to "%s"', record.url) - push.connect(record.url) - return Promise.resolve(true) + + return Promise.map(options.endpoints.push, function(endpoint) { + return srv.resolve(endpoint).then(function(records) { + return srv.attempt(records, function(record) { + log.info('Sending output to "%s"', record.url) + push.connect(record.url) + return Promise.resolve(true) + }) }) }) - }) - - return push + .return(push) }) diff --git a/lib/units/device/support/sub.js b/lib/units/device/support/sub.js index 802cce8a..39a226fb 100644 --- a/lib/units/device/support/sub.js +++ b/lib/units/device/support/sub.js @@ -6,6 +6,7 @@ var Promise = require('bluebird') var logger = require('../../../util/logger') var wireutil = require('../../../wire/util') var srv = require('../../../util/srv') +var lifecycle = require('../../../util/lifecycle') module.exports = syrup.serial() .define(function(options) { @@ -13,21 +14,22 @@ module.exports = syrup.serial() // Input var sub = zmq.socket('sub') - Promise.map(options.endpoints.sub, function(endpoint) { - return srv.resolve(endpoint).then(function(records) { - return srv.attempt(records, function(record) { - log.info('Receiving input from "%s"', record.url) - sub.connect(record.url) - return Promise.resolve(true) + + return Promise.map(options.endpoints.sub, function(endpoint) { + return srv.resolve(endpoint).then(function(records) { + return srv.attempt(records, function(record) { + log.info('Receiving input from "%s"', record.url) + sub.connect(record.url) + return Promise.resolve(true) + }) }) }) - }) - - // Establish always-on channels - ;[wireutil.global].forEach(function(channel) { - log.info('Subscribing to permanent channel "%s"', channel) - sub.subscribe(channel) - }) - - return sub + .then(function() { + // Establish always-on channels + ;[wireutil.global].forEach(function(channel) { + log.info('Subscribing to permanent channel "%s"', channel) + sub.subscribe(channel) + }) + }) + .return(sub) }) diff --git a/lib/units/processor/index.js b/lib/units/processor/index.js index c11110d6..a82bbe11 100644 --- a/lib/units/processor/index.js +++ b/lib/units/processor/index.js @@ -27,6 +27,10 @@ module.exports = function(options) { }) }) }) + .catch(function(err) { + log.fatal('Unable to connect to app dealer endpoint', err) + lifecycle.fatal() + }) appDealer.on('message', function(channel, data) { devDealer.send([channel, data]) @@ -43,6 +47,10 @@ module.exports = function(options) { }) }) }) + .catch(function(err) { + log.fatal('Unable to connect to dev dealer endpoint', err) + lifecycle.fatal() + }) devDealer.on('message', wirerouter() // Provider messages diff --git a/lib/units/provider/index.js b/lib/units/provider/index.js index 3f43115e..daed1754 100644 --- a/lib/units/provider/index.js +++ b/lib/units/provider/index.js @@ -81,6 +81,10 @@ module.exports = function(options) { }) }) }) + .catch(function(err) { + log.fatal('Unable to connect to push endpoint', err) + lifecycle.fatal() + }) // Input var sub = zmq.socket('sub') @@ -93,6 +97,10 @@ module.exports = function(options) { }) }) }) + .catch(function(err) { + log.fatal('Unable to connect to sub endpoint', err) + lifecycle.fatal() + }) // Establish always-on channels ;[solo].forEach(function(channel) { diff --git a/lib/units/reaper/index.js b/lib/units/reaper/index.js index 7ee574cc..a285cb44 100644 --- a/lib/units/reaper/index.js +++ b/lib/units/reaper/index.js @@ -27,6 +27,10 @@ module.exports = function(options) { }) }) }) + .catch(function(err) { + log.fatal('Unable to connect to push endpoint', err) + lifecycle.fatal() + }) function reap() { dbapi.getDeadDevices(options.heartbeatTimeout) diff --git a/lib/units/websocket/index.js b/lib/units/websocket/index.js index 8484952e..d39f8883 100644 --- a/lib/units/websocket/index.js +++ b/lib/units/websocket/index.js @@ -16,6 +16,7 @@ var wirerouter = require('../../wire/router') var dbapi = require('../../db/api') var datautil = require('../../util/datautil') var srv = require('../../util/srv') +var lifecycle = require('../../util/lifecycle') var cookieSession = require('./middleware/cookie-session') var ip = require('./middleware/remote-ip') var auth = require('./middleware/auth') @@ -40,6 +41,10 @@ module.exports = function(options) { }) }) }) + .catch(function(err) { + log.fatal('Unable to connect to push endpoint', err) + lifecycle.fatal() + }) // Input var sub = zmq.socket('sub') @@ -52,6 +57,10 @@ module.exports = function(options) { }) }) }) + .catch(function(err) { + log.fatal('Unable to connect to sub endpoint', err) + lifecycle.fatal() + }) // Establish always-on channels ;[wireutil.global].forEach(function(channel) { @@ -818,6 +827,17 @@ module.exports = function(options) { }) }) + lifecycle.observe(function() { + [push, sub].forEach(function(sock) { + try { + sock.close() + } + catch (err) { + // No-op + } + }) + }) + server.listen(options.port) log.info('Listening on port %d', options.port) }