mirror of
https://github.com/Chocobozzz/PeerTube.git
synced 2025-10-05 02:39:33 +02:00
First version with PostgreSQL
This commit is contained in:
parent
108626609e
commit
feb4bdfd9b
68 changed files with 1171 additions and 730 deletions
|
@ -2,66 +2,58 @@
|
|||
|
||||
const each = require('async/each')
|
||||
const eachLimit = require('async/eachLimit')
|
||||
const values = require('lodash/values')
|
||||
const mongoose = require('mongoose')
|
||||
const waterfall = require('async/waterfall')
|
||||
|
||||
const constants = require('../initializers/constants')
|
||||
const logger = require('../helpers/logger')
|
||||
const requests = require('../helpers/requests')
|
||||
|
||||
const Pod = mongoose.model('Pod')
|
||||
|
||||
let timer = null
|
||||
let lastRequestTimestamp = 0
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const RequestSchema = mongoose.Schema({
|
||||
request: mongoose.Schema.Types.Mixed,
|
||||
endpoint: {
|
||||
type: String,
|
||||
enum: [ values(constants.REQUEST_ENDPOINTS) ]
|
||||
},
|
||||
to: [
|
||||
module.exports = function (sequelize, DataTypes) {
|
||||
const Request = sequelize.define('Request',
|
||||
{
|
||||
type: mongoose.Schema.Types.ObjectId,
|
||||
ref: 'Pod'
|
||||
}
|
||||
]
|
||||
})
|
||||
request: {
|
||||
type: DataTypes.JSON
|
||||
},
|
||||
endpoint: {
|
||||
// TODO: enum?
|
||||
type: DataTypes.STRING
|
||||
}
|
||||
},
|
||||
{
|
||||
classMethods: {
|
||||
associate,
|
||||
|
||||
RequestSchema.statics = {
|
||||
activate,
|
||||
deactivate,
|
||||
flush,
|
||||
forceSend,
|
||||
list,
|
||||
remainingMilliSeconds
|
||||
activate,
|
||||
countTotalRequests,
|
||||
deactivate,
|
||||
flush,
|
||||
forceSend,
|
||||
remainingMilliSeconds
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
return Request
|
||||
}
|
||||
|
||||
RequestSchema.pre('save', function (next) {
|
||||
const self = this
|
||||
|
||||
if (self.to.length === 0) {
|
||||
Pod.listAllIds(function (err, podIds) {
|
||||
if (err) return next(err)
|
||||
|
||||
// No friends
|
||||
if (podIds.length === 0) return
|
||||
|
||||
self.to = podIds
|
||||
return next()
|
||||
})
|
||||
} else {
|
||||
return next()
|
||||
}
|
||||
})
|
||||
|
||||
mongoose.model('Request', RequestSchema)
|
||||
|
||||
// ------------------------------ STATICS ------------------------------
|
||||
|
||||
function associate (models) {
|
||||
this.belongsToMany(models.Pod, {
|
||||
foreignKey: {
|
||||
name: 'requestId',
|
||||
allowNull: false
|
||||
},
|
||||
through: models.RequestToPod,
|
||||
onDelete: 'CASCADE'
|
||||
})
|
||||
}
|
||||
|
||||
function activate () {
|
||||
logger.info('Requests scheduler activated.')
|
||||
lastRequestTimestamp = Date.now()
|
||||
|
@ -73,6 +65,14 @@ function activate () {
|
|||
}, constants.REQUESTS_INTERVAL)
|
||||
}
|
||||
|
||||
function countTotalRequests (callback) {
|
||||
const query = {
|
||||
include: [ this.sequelize.models.Pod ]
|
||||
}
|
||||
|
||||
return this.count(query).asCallback(callback)
|
||||
}
|
||||
|
||||
function deactivate () {
|
||||
logger.info('Requests scheduler deactivated.')
|
||||
clearInterval(timer)
|
||||
|
@ -90,10 +90,6 @@ function forceSend () {
|
|||
makeRequests.call(this)
|
||||
}
|
||||
|
||||
function list (callback) {
|
||||
this.find({ }, callback)
|
||||
}
|
||||
|
||||
function remainingMilliSeconds () {
|
||||
if (timer === null) return -1
|
||||
|
||||
|
@ -136,6 +132,7 @@ function makeRequest (toPod, requestEndpoint, requestsToMake, callback) {
|
|||
// Make all the requests of the scheduler
|
||||
function makeRequests () {
|
||||
const self = this
|
||||
const RequestToPod = this.sequelize.models.RequestToPod
|
||||
|
||||
// We limit the size of the requests (REQUESTS_LIMIT)
|
||||
// We don't want to stuck with the same failing requests so we get a random list
|
||||
|
@ -156,20 +153,20 @@ function makeRequests () {
|
|||
// We want to group requests by destinations pod and endpoint
|
||||
const requestsToMakeGrouped = {}
|
||||
|
||||
requests.forEach(function (poolRequest) {
|
||||
poolRequest.to.forEach(function (toPodId) {
|
||||
const hashKey = toPodId + poolRequest.endpoint
|
||||
requests.forEach(function (request) {
|
||||
request.Pods.forEach(function (toPod) {
|
||||
const hashKey = toPod.id + request.endpoint
|
||||
if (!requestsToMakeGrouped[hashKey]) {
|
||||
requestsToMakeGrouped[hashKey] = {
|
||||
toPodId,
|
||||
endpoint: poolRequest.endpoint,
|
||||
ids: [], // pool request ids, to delete them from the DB in the future
|
||||
toPodId: toPod.id,
|
||||
endpoint: request.endpoint,
|
||||
ids: [], // request ids, to delete them from the DB in the future
|
||||
datas: [] // requests data,
|
||||
}
|
||||
}
|
||||
|
||||
requestsToMakeGrouped[hashKey].ids.push(poolRequest._id)
|
||||
requestsToMakeGrouped[hashKey].datas.push(poolRequest.request)
|
||||
requestsToMakeGrouped[hashKey].ids.push(request.id)
|
||||
requestsToMakeGrouped[hashKey].datas.push(request.request)
|
||||
})
|
||||
})
|
||||
|
||||
|
@ -179,8 +176,8 @@ function makeRequests () {
|
|||
eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, function (hashKey, callbackEach) {
|
||||
const requestToMake = requestsToMakeGrouped[hashKey]
|
||||
|
||||
// FIXME: mongodb request inside a loop :/
|
||||
Pod.load(requestToMake.toPodId, function (err, toPod) {
|
||||
// FIXME: SQL request inside a loop :/
|
||||
self.sequelize.models.Pod.load(requestToMake.toPodId, function (err, toPod) {
|
||||
if (err) {
|
||||
logger.error('Error finding pod by id.', { err: err })
|
||||
return callbackEach()
|
||||
|
@ -191,7 +188,7 @@ function makeRequests () {
|
|||
const requestIdsToDelete = requestToMake.ids
|
||||
|
||||
logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPodId)
|
||||
removePodOf.call(self, requestIdsToDelete, requestToMake.toPodId)
|
||||
RequestToPod.removePodOf.call(self, requestIdsToDelete, requestToMake.toPodId)
|
||||
return callbackEach()
|
||||
}
|
||||
|
||||
|
@ -202,7 +199,7 @@ function makeRequests () {
|
|||
goodPods.push(requestToMake.toPodId)
|
||||
|
||||
// Remove the pod id of these request ids
|
||||
removePodOf.call(self, requestToMake.ids, requestToMake.toPodId, callbackEach)
|
||||
RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPodId, callbackEach)
|
||||
} else {
|
||||
badPods.push(requestToMake.toPodId)
|
||||
callbackEach()
|
||||
|
@ -211,18 +208,22 @@ function makeRequests () {
|
|||
})
|
||||
}, function () {
|
||||
// All the requests were made, we update the pods score
|
||||
updatePodsScore(goodPods, badPods)
|
||||
updatePodsScore.call(self, goodPods, badPods)
|
||||
// Flush requests with no pod
|
||||
removeWithEmptyTo.call(self)
|
||||
removeWithEmptyTo.call(self, function (err) {
|
||||
if (err) logger.error('Error when removing requests with no pods.', { error: err })
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// Remove pods with a score of 0 (too many requests where they were unreachable)
|
||||
function removeBadPods () {
|
||||
const self = this
|
||||
|
||||
waterfall([
|
||||
function findBadPods (callback) {
|
||||
Pod.listBadPods(function (err, pods) {
|
||||
self.sequelize.models.Pod.listBadPods(function (err, pods) {
|
||||
if (err) {
|
||||
logger.error('Cannot find bad pods.', { error: err })
|
||||
return callback(err)
|
||||
|
@ -233,10 +234,8 @@ function removeBadPods () {
|
|||
},
|
||||
|
||||
function removeTheseBadPods (pods, callback) {
|
||||
if (pods.length === 0) return callback(null, 0)
|
||||
|
||||
each(pods, function (pod, callbackEach) {
|
||||
pod.remove(callbackEach)
|
||||
pod.destroy().asCallback(callbackEach)
|
||||
}, function (err) {
|
||||
return callback(err, pods.length)
|
||||
})
|
||||
|
@ -253,43 +252,67 @@ function removeBadPods () {
|
|||
}
|
||||
|
||||
function updatePodsScore (goodPods, badPods) {
|
||||
const self = this
|
||||
const Pod = this.sequelize.models.Pod
|
||||
|
||||
logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length)
|
||||
|
||||
Pod.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) {
|
||||
if (err) logger.error('Cannot increment scores of good pods.')
|
||||
})
|
||||
if (goodPods.length !== 0) {
|
||||
Pod.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) {
|
||||
if (err) logger.error('Cannot increment scores of good pods.')
|
||||
})
|
||||
}
|
||||
|
||||
Pod.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) {
|
||||
if (err) logger.error('Cannot decrement scores of bad pods.')
|
||||
removeBadPods()
|
||||
})
|
||||
if (badPods.length !== 0) {
|
||||
Pod.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) {
|
||||
if (err) logger.error('Cannot decrement scores of bad pods.')
|
||||
removeBadPods.call(self)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
function listWithLimitAndRandom (limit, callback) {
|
||||
const self = this
|
||||
|
||||
self.count(function (err, count) {
|
||||
self.count().asCallback(function (err, count) {
|
||||
if (err) return callback(err)
|
||||
|
||||
// Optimization...
|
||||
if (count === 0) return callback(null, [])
|
||||
|
||||
let start = Math.floor(Math.random() * count) - limit
|
||||
if (start < 0) start = 0
|
||||
|
||||
self.find().sort({ _id: 1 }).skip(start).limit(limit).exec(callback)
|
||||
const query = {
|
||||
order: [
|
||||
[ 'id', 'ASC' ]
|
||||
],
|
||||
offset: start,
|
||||
limit: limit,
|
||||
include: [ this.sequelize.models.Pod ]
|
||||
}
|
||||
|
||||
self.findAll(query).asCallback(callback)
|
||||
})
|
||||
}
|
||||
|
||||
function removeAll (callback) {
|
||||
this.remove({ }, callback)
|
||||
}
|
||||
|
||||
function removePodOf (requestsIds, podId, callback) {
|
||||
if (!callback) callback = function () {}
|
||||
|
||||
this.update({ _id: { $in: requestsIds } }, { $pull: { to: podId } }, { multi: true }, callback)
|
||||
// Delete all requests
|
||||
this.destroy({ truncate: true }).asCallback(callback)
|
||||
}
|
||||
|
||||
function removeWithEmptyTo (callback) {
|
||||
if (!callback) callback = function () {}
|
||||
|
||||
this.remove({ to: { $size: 0 } }, callback)
|
||||
const query = {
|
||||
where: {
|
||||
id: {
|
||||
$notIn: [
|
||||
this.sequelize.literal('SELECT "requestId" FROM "RequestToPods"')
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.destroy(query).asCallback(callback)
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue