diff --git a/lib/units/device/support/push.js b/lib/units/device/support/push.js index d8564f13..c68bf47d 100644 --- a/lib/units/device/support/push.js +++ b/lib/units/device/support/push.js @@ -1,17 +1,17 @@ var syrup = require('stf-syrup') -var zmq = require('zmq') var Promise = require('bluebird') var logger = require('../../../util/logger') var srv = require('../../../util/srv') +var zmqutil = require('../../../util/zmqutil') module.exports = syrup.serial() .define(function(options) { var log = logger.createLogger('device:support:push') // Output - var push = zmq.socket('push') + var push = zmqutil.socket('push') return Promise.map(options.endpoints.push, function(endpoint) { return srv.resolve(endpoint).then(function(records) { diff --git a/lib/units/device/support/sub.js b/lib/units/device/support/sub.js index 39a226fb..b8322bef 100644 --- a/lib/units/device/support/sub.js +++ b/lib/units/device/support/sub.js @@ -1,19 +1,19 @@ var syrup = require('stf-syrup') -var zmq = require('zmq') var Promise = require('bluebird') var logger = require('../../../util/logger') var wireutil = require('../../../wire/util') var srv = require('../../../util/srv') var lifecycle = require('../../../util/lifecycle') +var zmqutil = require('../../../util/zmqutil') module.exports = syrup.serial() .define(function(options) { var log = logger.createLogger('device:support:sub') // Input - var sub = zmq.socket('sub') + var sub = zmqutil.socket('sub') return Promise.map(options.endpoints.sub, function(endpoint) { return srv.resolve(endpoint).then(function(records) { diff --git a/lib/units/log/rethinkdb.js b/lib/units/log/rethinkdb.js index 94215b5c..cbb60e88 100644 --- a/lib/units/log/rethinkdb.js +++ b/lib/units/log/rethinkdb.js @@ -1,5 +1,4 @@ var Promise = require('bluebird') -var zmq = require('zmq') var logger = require('../../util/logger') var wire = require('../../wire') @@ -8,12 +7,13 @@ var wireutil = require('../../wire/util') var lifecycle = require('../../util/lifecycle') var srv = require('../../util/srv') var dbapi = require('../../db/api') +var zmqutil = require('../../util/zmqutil') module.exports = function(options) { var log = logger.createLogger('log-db') // Input - var sub = zmq.socket('sub') + var sub = zmqutil.socket('sub') Promise.map(options.endpoints.sub, function(endpoint) { return srv.resolve(endpoint).then(function(records) { return srv.attempt(records, function(record) { diff --git a/lib/units/notify/hipchat.js b/lib/units/notify/hipchat.js index 71e81d7b..c7b9127e 100644 --- a/lib/units/notify/hipchat.js +++ b/lib/units/notify/hipchat.js @@ -2,7 +2,6 @@ var util = require('util') var Hipchatter = require('hipchatter') var Promise = require('bluebird') -var zmq = require('zmq') var logger = require('../../util/logger') var wire = require('../../wire') @@ -10,6 +9,7 @@ var wirerouter = require('../../wire/router') var wireutil = require('../../wire/util') var lifecycle = require('../../util/lifecycle') var srv = require('../../util/srv') +var zmqutil = require('../../util/zmqutil') var COLORS = { 1: 'gray' @@ -28,7 +28,7 @@ module.exports = function(options) { , timer // Input - var sub = zmq.socket('sub') + var sub = zmqutil.socket('sub') Promise.map(options.endpoints.sub, function(endpoint) { return srv.resolve(endpoint).then(function(records) { return srv.attempt(records, function(record) { diff --git a/lib/units/processor/index.js b/lib/units/processor/index.js index de11f7fa..78c974f3 100644 --- a/lib/units/processor/index.js +++ b/lib/units/processor/index.js @@ -1,5 +1,4 @@ var Promise = require('bluebird') -var zmq = require('zmq') var logger = require('../../util/logger') var wire = require('../../wire') @@ -8,6 +7,7 @@ var wireutil = require('../../wire/util') var dbapi = require('../../db/api') var lifecycle = require('../../util/lifecycle') var srv = require('../../util/srv') +var zmqutil = require('../../util/zmqutil') module.exports = function(options) { var log = logger.createLogger('processor') @@ -17,7 +17,7 @@ module.exports = function(options) { } // App side - var appDealer = zmq.socket('dealer') + var appDealer = zmqutil.socket('dealer') Promise.map(options.endpoints.appDealer, function(endpoint) { return srv.resolve(endpoint).then(function(records) { return srv.attempt(records, function(record) { @@ -37,7 +37,7 @@ module.exports = function(options) { }) // Device side - var devDealer = zmq.socket('dealer') + var devDealer = zmqutil.socket('dealer') Promise.map(options.endpoints.devDealer, function(endpoint) { return srv.resolve(endpoint).then(function(records) { return srv.attempt(records, function(record) { diff --git a/lib/units/provider/index.js b/lib/units/provider/index.js index 3d196b41..77e0c14a 100644 --- a/lib/units/provider/index.js +++ b/lib/units/provider/index.js @@ -1,6 +1,5 @@ var adb = require('adbkit') var Promise = require('bluebird') -var zmq = require('zmq') var _ = require('lodash') var EventEmitter = require('eventemitter3').EventEmitter @@ -11,6 +10,7 @@ var wirerouter = require('../../wire/router') var procutil = require('../../util/procutil') var lifecycle = require('../../util/lifecycle') var srv = require('../../util/srv') +var zmqutil = require('../../util/zmqutil') module.exports = function(options) { var log = logger.createLogger('provider') @@ -70,7 +70,7 @@ module.exports = function(options) { })() // Output - var push = zmq.socket('push') + var push = zmqutil.socket('push') Promise.map(options.endpoints.push, function(endpoint) { return srv.resolve(endpoint).then(function(records) { return srv.attempt(records, function(record) { @@ -86,7 +86,7 @@ module.exports = function(options) { }) // Input - var sub = zmq.socket('sub') + var sub = zmqutil.socket('sub') Promise.map(options.endpoints.sub, function(endpoint) { return srv.resolve(endpoint).then(function(records) { return srv.attempt(records, function(record) { diff --git a/lib/units/reaper/index.js b/lib/units/reaper/index.js index cff1dbe0..89c43a24 100644 --- a/lib/units/reaper/index.js +++ b/lib/units/reaper/index.js @@ -1,5 +1,4 @@ var Promise = require('bluebird') -var zmq = require('zmq') var logger = require('../../util/logger') var wire = require('../../wire') @@ -9,6 +8,7 @@ var dbapi = require('../../db/api') var lifecycle = require('../../util/lifecycle') var srv = require('../../util/srv') var TtlSet = require('../../util/ttlset') +var zmqutil = require('../../util/zmqutil') module.exports = function(options) { var log = logger.createLogger('reaper') @@ -19,7 +19,7 @@ module.exports = function(options) { } // Input - var sub = zmq.socket('sub') + var sub = zmqutil.socket('sub') Promise.map(options.endpoints.sub, function(endpoint) { return srv.resolve(endpoint).then(function(records) { return srv.attempt(records, function(record) { @@ -41,7 +41,7 @@ module.exports = function(options) { }) // Output - var push = zmq.socket('push') + var push = zmqutil.socket('push') Promise.map(options.endpoints.push, function(endpoint) { return srv.resolve(endpoint).then(function(records) { return srv.attempt(records, function(record) { diff --git a/lib/units/triproxy/index.js b/lib/units/triproxy/index.js index 0d0112bf..1632d181 100644 --- a/lib/units/triproxy/index.js +++ b/lib/units/triproxy/index.js @@ -1,7 +1,6 @@ -var zmq = require('zmq') - var logger = require('../../util/logger') var lifecycle = require('../../util/lifecycle') +var zmqutil = require('../../util/zmqutil') module.exports = function(options) { var log = logger.createLogger('triproxy') @@ -17,18 +16,18 @@ module.exports = function(options) { } // App/device output - var pub = zmq.socket('pub') + var pub = zmqutil.socket('pub') pub.bindSync(options.endpoints.pub) log.info('PUB socket bound on', options.endpoints.pub) // Coordinator input/output - var dealer = zmq.socket('dealer') + var dealer = zmqutil.socket('dealer') dealer.bindSync(options.endpoints.dealer) dealer.on('message', proxy(pub)) log.info('DEALER socket bound on', options.endpoints.dealer) // App/device input - var pull = zmq.socket('pull') + var pull = zmqutil.socket('pull') pull.bindSync(options.endpoints.pull) pull.on('message', proxy(dealer)) log.info('PULL socket bound on', options.endpoints.pull) diff --git a/lib/units/websocket/index.js b/lib/units/websocket/index.js index 9c0e26b0..f99ea509 100644 --- a/lib/units/websocket/index.js +++ b/lib/units/websocket/index.js @@ -3,7 +3,6 @@ var events = require('events') var util = require('util') var socketio = require('socket.io') -var zmq = require('zmq') var Promise = require('bluebird') var _ = require('lodash') var request = Promise.promisifyAll(require('request')) @@ -17,6 +16,7 @@ var dbapi = require('../../db/api') var datautil = require('../../util/datautil') var srv = require('../../util/srv') var lifecycle = require('../../util/lifecycle') +var zmqutil = require('../../util/zmqutil') var cookieSession = require('./middleware/cookie-session') var ip = require('./middleware/remote-ip') var auth = require('./middleware/auth') @@ -31,7 +31,7 @@ module.exports = function(options) { , channelRouter = new events.EventEmitter() // Output - var push = zmq.socket('push') + var push = zmqutil.socket('push') Promise.map(options.endpoints.push, function(endpoint) { return srv.resolve(endpoint).then(function(records) { return srv.attempt(records, function(record) { @@ -47,7 +47,7 @@ module.exports = function(options) { }) // Input - var sub = zmq.socket('sub') + var sub = zmqutil.socket('sub') Promise.map(options.endpoints.sub, function(endpoint) { return srv.resolve(endpoint).then(function(records) { return srv.attempt(records, function(record) { diff --git a/lib/util/zmqutil.js b/lib/util/zmqutil.js new file mode 100644 index 00000000..ef316fcb --- /dev/null +++ b/lib/util/zmqutil.js @@ -0,0 +1,14 @@ +// ISSUE-100 (https://github.com/openstf/stf/issues/100) + +// In some networks TCP Connection dies if kept idle for long. +// Setting TCP_KEEPALIVE option true, to all the zmq sockets +// won't let it die + +var zmq = require('zmq') + +module.exports.socket = function() { + var sock = zmq.socket.apply(zmq, arguments) + sock.setsockopt(zmq.ZMQ_TCP_KEEPALIVE, 1) + sock.setsockopt(zmq.ZMQ_TCP_KEEPALIVE_IDLE, 300000) + return sock +}