diff --git a/lib/db/index.js b/lib/db/index.js index 55377fd7..bf3dbe81 100644 --- a/lib/db/index.js +++ b/lib/db/index.js @@ -1,42 +1,100 @@ var r = require('rethinkdb') +var Promise = require('bluebird') var setup = require('./setup') var logger = require('../util/logger') var lifecycle = require('../util/lifecycle') +var srv = require('../util/srv') var db = module.exports = Object.create(null) var log = logger.createLogger('db') function connect() { - return r.connect({ - // These environment variables are exposed when we --link to a - // RethinkDB container. - host: process.env.RETHINKDB_PORT_28015_TCP_ADDR || '127.0.0.1' - , port: process.env.RETHINKDB_PORT_28015_TCP_PORT || 28015 - , db: process.env.RETHINKDB_ENV_DATABASE || 'stf' - , authKey: process.env.RETHINKDB_ENV_AUTHKEY - }) - .then(function(conn) { - lifecycle.observe(function() { - return db.close() - }) + var options = { + // These environment variables are exposed when we --link to a + // RethinkDB container. + host: process.env.RETHINKDB_PORT_28015_TCP_ADDR || '127.0.0.1' + , port: process.env.RETHINKDB_PORT_28015_TCP_PORT || 28015 + , db: process.env.RETHINKDB_ENV_DATABASE || 'stf' + , authKey: process.env.RETHINKDB_ENV_AUTHKEY + } - return conn.on('error', function(err) { - log.fatal('Connection error', err.stack) - lifecycle.fatal() - }) - }) - .catch(function(err) { - log.fatal('Unable to connect to the database: "%s"', err.message) - lifecycle.fatal() + return srv.resolve(options.host, options.port) + .then(function(records) { + function next() { + var record = records.shift() + + if (!record) { + throw new Error('No hosts left to try') + } + + log.info('Connecting to %s:%d', record.name, record.port) + + return r.connect({ + host: record.name + , port: record.port + , db: options.db + , authKey: options.authKey + }) + .catch(r.Error.RqlDriverError, function() { + log.info('Unable to connect to %s:%d', record.name, record.port) + return next() + }) + } + + return next() }) } -// Export memoized connection as a Promise +// Export connection as a Promise db.connect = (function() { - var connection = connect() + var connection + , queue = [] + + lifecycle.observe(function() { + if (connection) { + return connection.close() + } + }) + + function createConnection() { + return connect() + .then(function(conn) { + connection = conn + + conn.on('close', function closeListener() { + log.warn('Connection closed') + connection = null + conn.removeListener('close', closeListener) + createConnection() + }) + + queue.splice(0).forEach(function(resolver) { + resolver.resolve(conn) + }) + + return conn + }) + .catch(function(err) { + log.fatal(err.message) + lifecycle.fatal() + }) + } + + createConnection() + return function() { - return connection + return new Promise(function(resolve, reject) { + if (connection) { + resolve(connection) + } + else { + queue.push({ + resolve: resolve + , reject: reject + }) + } + }) } })() diff --git a/lib/util/srv.js b/lib/util/srv.js new file mode 100644 index 00000000..2a3d1ea7 --- /dev/null +++ b/lib/util/srv.js @@ -0,0 +1,79 @@ +var Promise = require('bluebird') +var dns = Promise.promisifyAll(require('dns')) + +function groupByPriority(records) { + function sortByPriority(a, b) { + return a.priority - b.priority + } + + return records.sort(sortByPriority).reduce(function(acc, record) { + if (acc.length) { + var last = acc[acc.length - 1] + if (last[0].priority !== record.priority) { + acc.push([record]) + } + else { + last.push(record) + } + } + else { + acc.push([record]) + } + return acc + }, []) +} + +function shuffleWeighted(records) { + function sortByWeight(a, b) { + return b.weight - a.weight + } + + function totalWeight(records) { + return records.reduce(function(sum, record) { + return sum + record.weight + }, 0) + } + + function pick(records, sum) { + var rand = Math.random() * sum + , counter = 0 + + for (var i = 0, l = records.length; i < l; ++i) { + counter += records[i].weight + if (rand < counter) { + var picked = records.splice(i, 1) + return picked.concat(pick(records, sum - picked[0].weight)) + } + } + + return [] + } + + return pick(records.sort(sortByWeight), totalWeight(records)) +} + +function flatten(groupedRecords) { + return groupedRecords.reduce(function(acc, group) { + return acc.concat(group) + }, []) +} + +var RE_SRV = /^srv:(.*)$/ + +module.exports.sort = function(records) { + return flatten(groupByPriority(records).map(shuffleWeighted)) +} + +module.exports.resolve = function(domain, defaultPort) { + var match + if ((match = RE_SRV.exec(domain))) { + return dns.resolveSrvAsync(match[1]) + .then(module.exports.sort) + } + else { + return Promise.resolve([{ + name: domain + , port: defaultPort + }]) + } +}