mirror of
https://github.com/openstf/stf
synced 2025-10-04 10:19:30 +02:00
The DB should now be able to connect to SRV records. It will also attempt to fail over once if a connection closes.
This commit is contained in:
parent
26deca8d9f
commit
23621d0bdd
2 changed files with 160 additions and 23 deletions
|
@ -1,42 +1,100 @@
|
||||||
var r = require('rethinkdb')
|
var r = require('rethinkdb')
|
||||||
|
var Promise = require('bluebird')
|
||||||
|
|
||||||
var setup = require('./setup')
|
var setup = require('./setup')
|
||||||
var logger = require('../util/logger')
|
var logger = require('../util/logger')
|
||||||
var lifecycle = require('../util/lifecycle')
|
var lifecycle = require('../util/lifecycle')
|
||||||
|
var srv = require('../util/srv')
|
||||||
|
|
||||||
var db = module.exports = Object.create(null)
|
var db = module.exports = Object.create(null)
|
||||||
var log = logger.createLogger('db')
|
var log = logger.createLogger('db')
|
||||||
|
|
||||||
function connect() {
|
function connect() {
|
||||||
return r.connect({
|
var options = {
|
||||||
// These environment variables are exposed when we --link to a
|
// These environment variables are exposed when we --link to a
|
||||||
// RethinkDB container.
|
// RethinkDB container.
|
||||||
host: process.env.RETHINKDB_PORT_28015_TCP_ADDR || '127.0.0.1'
|
host: process.env.RETHINKDB_PORT_28015_TCP_ADDR || '127.0.0.1'
|
||||||
, port: process.env.RETHINKDB_PORT_28015_TCP_PORT || 28015
|
, port: process.env.RETHINKDB_PORT_28015_TCP_PORT || 28015
|
||||||
, db: process.env.RETHINKDB_ENV_DATABASE || 'stf'
|
, db: process.env.RETHINKDB_ENV_DATABASE || 'stf'
|
||||||
, authKey: process.env.RETHINKDB_ENV_AUTHKEY
|
, authKey: process.env.RETHINKDB_ENV_AUTHKEY
|
||||||
|
}
|
||||||
|
|
||||||
|
return srv.resolve(options.host, options.port)
|
||||||
|
.then(function(records) {
|
||||||
|
function next() {
|
||||||
|
var record = records.shift()
|
||||||
|
|
||||||
|
if (!record) {
|
||||||
|
throw new Error('No hosts left to try')
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info('Connecting to %s:%d', record.name, record.port)
|
||||||
|
|
||||||
|
return r.connect({
|
||||||
|
host: record.name
|
||||||
|
, port: record.port
|
||||||
|
, db: options.db
|
||||||
|
, authKey: options.authKey
|
||||||
})
|
})
|
||||||
.then(function(conn) {
|
.catch(r.Error.RqlDriverError, function() {
|
||||||
|
log.info('Unable to connect to %s:%d', record.name, record.port)
|
||||||
|
return next()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return next()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Export connection as a Promise
|
||||||
|
db.connect = (function() {
|
||||||
|
var connection
|
||||||
|
, queue = []
|
||||||
|
|
||||||
lifecycle.observe(function() {
|
lifecycle.observe(function() {
|
||||||
return db.close()
|
if (connection) {
|
||||||
|
return connection.close()
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
return conn.on('error', function(err) {
|
function createConnection() {
|
||||||
log.fatal('Connection error', err.stack)
|
return connect()
|
||||||
lifecycle.fatal()
|
.then(function(conn) {
|
||||||
|
connection = conn
|
||||||
|
|
||||||
|
conn.on('close', function closeListener() {
|
||||||
|
log.warn('Connection closed')
|
||||||
|
connection = null
|
||||||
|
conn.removeListener('close', closeListener)
|
||||||
|
createConnection()
|
||||||
})
|
})
|
||||||
|
|
||||||
|
queue.splice(0).forEach(function(resolver) {
|
||||||
|
resolver.resolve(conn)
|
||||||
|
})
|
||||||
|
|
||||||
|
return conn
|
||||||
})
|
})
|
||||||
.catch(function(err) {
|
.catch(function(err) {
|
||||||
log.fatal('Unable to connect to the database: "%s"', err.message)
|
log.fatal(err.message)
|
||||||
lifecycle.fatal()
|
lifecycle.fatal()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Export memoized connection as a Promise
|
createConnection()
|
||||||
db.connect = (function() {
|
|
||||||
var connection = connect()
|
|
||||||
return function() {
|
return function() {
|
||||||
return connection
|
return new Promise(function(resolve, reject) {
|
||||||
|
if (connection) {
|
||||||
|
resolve(connection)
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
queue.push({
|
||||||
|
resolve: resolve
|
||||||
|
, reject: reject
|
||||||
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
})()
|
})()
|
||||||
|
|
||||||
|
|
79
lib/util/srv.js
Normal file
79
lib/util/srv.js
Normal file
|
@ -0,0 +1,79 @@
|
||||||
|
var Promise = require('bluebird')
|
||||||
|
var dns = Promise.promisifyAll(require('dns'))
|
||||||
|
|
||||||
|
function groupByPriority(records) {
|
||||||
|
function sortByPriority(a, b) {
|
||||||
|
return a.priority - b.priority
|
||||||
|
}
|
||||||
|
|
||||||
|
return records.sort(sortByPriority).reduce(function(acc, record) {
|
||||||
|
if (acc.length) {
|
||||||
|
var last = acc[acc.length - 1]
|
||||||
|
if (last[0].priority !== record.priority) {
|
||||||
|
acc.push([record])
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
last.push(record)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
acc.push([record])
|
||||||
|
}
|
||||||
|
return acc
|
||||||
|
}, [])
|
||||||
|
}
|
||||||
|
|
||||||
|
function shuffleWeighted(records) {
|
||||||
|
function sortByWeight(a, b) {
|
||||||
|
return b.weight - a.weight
|
||||||
|
}
|
||||||
|
|
||||||
|
function totalWeight(records) {
|
||||||
|
return records.reduce(function(sum, record) {
|
||||||
|
return sum + record.weight
|
||||||
|
}, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
function pick(records, sum) {
|
||||||
|
var rand = Math.random() * sum
|
||||||
|
, counter = 0
|
||||||
|
|
||||||
|
for (var i = 0, l = records.length; i < l; ++i) {
|
||||||
|
counter += records[i].weight
|
||||||
|
if (rand < counter) {
|
||||||
|
var picked = records.splice(i, 1)
|
||||||
|
return picked.concat(pick(records, sum - picked[0].weight))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return []
|
||||||
|
}
|
||||||
|
|
||||||
|
return pick(records.sort(sortByWeight), totalWeight(records))
|
||||||
|
}
|
||||||
|
|
||||||
|
function flatten(groupedRecords) {
|
||||||
|
return groupedRecords.reduce(function(acc, group) {
|
||||||
|
return acc.concat(group)
|
||||||
|
}, [])
|
||||||
|
}
|
||||||
|
|
||||||
|
var RE_SRV = /^srv:(.*)$/
|
||||||
|
|
||||||
|
module.exports.sort = function(records) {
|
||||||
|
return flatten(groupByPriority(records).map(shuffleWeighted))
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports.resolve = function(domain, defaultPort) {
|
||||||
|
var match
|
||||||
|
if ((match = RE_SRV.exec(domain))) {
|
||||||
|
return dns.resolveSrvAsync(match[1])
|
||||||
|
.then(module.exports.sort)
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
return Promise.resolve([{
|
||||||
|
name: domain
|
||||||
|
, port: defaultPort
|
||||||
|
}])
|
||||||
|
}
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue