1
0
Fork 0
mirror of https://github.com/Chocobozzz/PeerTube.git synced 2025-10-05 02:39:33 +02:00

Move to promises

Closes https://github.com/Chocobozzz/PeerTube/issues/74
This commit is contained in:
Chocobozzz 2017-07-05 13:26:25 +02:00
parent 5fe7e89831
commit 6fcd19ba73
88 changed files with 1980 additions and 2505 deletions

View file

@ -1,5 +1,4 @@
import * as express from 'express'
import * as waterfall from 'async/waterfall'
import { database as db } from '../../../initializers/database'
import { checkSignature, signatureValidator } from '../../../middlewares'
@ -24,17 +23,10 @@ export {
function removePods (req: express.Request, res: express.Response, next: express.NextFunction) {
const host = req.body.signature.host
waterfall([
function loadPod (callback) {
db.Pod.loadByHost(host, callback)
},
function deletePod (pod, callback) {
pod.destroy().asCallback(callback)
}
], function (err) {
if (err) return next(err)
return res.type('json').status(204).end()
})
db.Pod.loadByHost(host)
.then(pod => {
return pod.destroy()
})
.then(() => res.type('json').status(204).end())
.catch(err => next(err))
}

View file

@ -1,6 +1,5 @@
import * as express from 'express'
import * as Sequelize from 'sequelize'
import { eachSeries, waterfall } from 'async'
import * as Promise from 'bluebird'
import { database as db } from '../../../initializers/database'
import {
@ -16,20 +15,14 @@ import {
remoteQaduVideosValidator,
remoteEventsVideosValidator
} from '../../../middlewares'
import {
logger,
commitTransaction,
retryTransactionWrapper,
rollbackTransaction,
startSerializableTransaction
} from '../../../helpers'
import { logger, retryTransactionWrapper } from '../../../helpers'
import { quickAndDirtyUpdatesVideoToFriends } from '../../../lib'
import { PodInstance, VideoInstance } from '../../../models'
const ENDPOINT_ACTIONS = REQUEST_ENDPOINT_ACTIONS[REQUEST_ENDPOINTS.VIDEOS]
// Functions to call when processing a remote request
const functionsHash = {}
const functionsHash: { [ id: string ]: (...args) => Promise<any> } = {}
functionsHash[ENDPOINT_ACTIONS.ADD] = addRemoteVideoRetryWrapper
functionsHash[ENDPOINT_ACTIONS.UPDATE] = updateRemoteVideoRetryWrapper
functionsHash[ENDPOINT_ACTIONS.REMOVE] = removeRemoteVideo
@ -72,20 +65,19 @@ function remoteVideos (req: express.Request, res: express.Response, next: expres
// We need to process in the same order to keep consistency
// TODO: optimization
eachSeries(requests, function (request: any, callbackEach) {
Promise.mapSeries(requests, (request: any) => {
const data = request.data
// Get the function we need to call in order to process the request
const fun = functionsHash[request.type]
if (fun === undefined) {
logger.error('Unkown remote request type %s.', request.type)
return callbackEach(null)
return
}
fun.call(this, data, fromPod, callbackEach)
}, function (err) {
if (err) logger.error('Error managing remote videos.', { error: err })
return fun.call(this, data, fromPod)
})
.catch(err => logger.error('Error managing remote videos.', { error: err }))
// We don't need to keep the other pod waiting
return res.type('json').status(204).end()
@ -95,13 +87,12 @@ function remoteVideosQadu (req: express.Request, res: express.Response, next: ex
const requests = req.body.data
const fromPod = res.locals.secure.pod
eachSeries(requests, function (request: any, callbackEach) {
Promise.mapSeries(requests, (request: any) => {
const videoData = request.data
quickAndDirtyUpdateVideoRetryWrapper(videoData, fromPod, callbackEach)
}, function (err) {
if (err) logger.error('Error managing remote videos.', { error: err })
return quickAndDirtyUpdateVideoRetryWrapper(videoData, fromPod)
})
.catch(err => logger.error('Error managing remote videos.', { error: err }))
return res.type('json').status(204).end()
}
@ -110,414 +101,303 @@ function remoteVideosEvents (req: express.Request, res: express.Response, next:
const requests = req.body.data
const fromPod = res.locals.secure.pod
eachSeries(requests, function (request: any, callbackEach) {
Promise.mapSeries(requests, (request: any) => {
const eventData = request.data
processVideosEventsRetryWrapper(eventData, fromPod, callbackEach)
}, function (err) {
if (err) logger.error('Error managing remote videos.', { error: err })
return processVideosEventsRetryWrapper(eventData, fromPod)
})
.catch(err => logger.error('Error managing remote videos.', { error: err }))
return res.type('json').status(204).end()
}
function processVideosEventsRetryWrapper (eventData: any, fromPod: PodInstance, finalCallback: (err: Error) => void) {
function processVideosEventsRetryWrapper (eventData: any, fromPod: PodInstance) {
const options = {
arguments: [ eventData, fromPod ],
errorMessage: 'Cannot process videos events with many retries.'
}
retryTransactionWrapper(processVideosEvents, options, finalCallback)
return retryTransactionWrapper(processVideosEvents, options)
}
function processVideosEvents (eventData: any, fromPod: PodInstance, finalCallback: (err: Error) => void) {
waterfall([
startSerializableTransaction,
function processVideosEvents (eventData: any, fromPod: PodInstance) {
function findVideo (t, callback) {
fetchOwnedVideo(eventData.remoteId, function (err, videoInstance) {
return callback(err, t, videoInstance)
})
},
return db.sequelize.transaction(t => {
return fetchOwnedVideo(eventData.remoteId)
.then(videoInstance => {
const options = { transaction: t }
function updateVideoIntoDB (t, videoInstance, callback) {
const options = { transaction: t }
let columnToUpdate
let qaduType
let columnToUpdate
let qaduType
switch (eventData.eventType) {
case REQUEST_VIDEO_EVENT_TYPES.VIEWS:
columnToUpdate = 'views'
qaduType = REQUEST_VIDEO_QADU_TYPES.VIEWS
break
switch (eventData.eventType) {
case REQUEST_VIDEO_EVENT_TYPES.VIEWS:
columnToUpdate = 'views'
qaduType = REQUEST_VIDEO_QADU_TYPES.VIEWS
break
case REQUEST_VIDEO_EVENT_TYPES.LIKES:
columnToUpdate = 'likes'
qaduType = REQUEST_VIDEO_QADU_TYPES.LIKES
break
case REQUEST_VIDEO_EVENT_TYPES.LIKES:
columnToUpdate = 'likes'
qaduType = REQUEST_VIDEO_QADU_TYPES.LIKES
break
case REQUEST_VIDEO_EVENT_TYPES.DISLIKES:
columnToUpdate = 'dislikes'
qaduType = REQUEST_VIDEO_QADU_TYPES.DISLIKES
break
case REQUEST_VIDEO_EVENT_TYPES.DISLIKES:
columnToUpdate = 'dislikes'
qaduType = REQUEST_VIDEO_QADU_TYPES.DISLIKES
break
default:
return callback(new Error('Unknown video event type.'))
}
const query = {}
query[columnToUpdate] = eventData.count
videoInstance.increment(query, options).asCallback(function (err) {
return callback(err, t, videoInstance, qaduType)
})
},
function sendQaduToFriends (t, videoInstance, qaduType, callback) {
const qadusParams = [
{
videoId: videoInstance.id,
type: qaduType
default:
throw new Error('Unknown video event type.')
}
]
quickAndDirtyUpdatesVideoToFriends(qadusParams, t, function (err) {
return callback(err, t)
const query = {}
query[columnToUpdate] = eventData.count
return videoInstance.increment(query, options).then(() => ({ videoInstance, qaduType }))
})
},
.then(({ videoInstance, qaduType }) => {
const qadusParams = [
{
videoId: videoInstance.id,
type: qaduType
}
]
commitTransaction
], function (err: Error, t: Sequelize.Transaction) {
if (err) {
logger.debug('Cannot process a video event.', { error: err })
return rollbackTransaction(err, t, finalCallback)
}
logger.info('Remote video event processed for video %s.', eventData.remoteId)
return finalCallback(null)
return quickAndDirtyUpdatesVideoToFriends(qadusParams, t)
})
})
.then(() => logger.info('Remote video event processed for video %s.', eventData.remoteId))
.catch(err => {
logger.debug('Cannot process a video event.', { error: err })
throw err
})
}
function quickAndDirtyUpdateVideoRetryWrapper (videoData: any, fromPod: PodInstance, finalCallback: (err: Error) => void) {
function quickAndDirtyUpdateVideoRetryWrapper (videoData: any, fromPod: PodInstance) {
const options = {
arguments: [ videoData, fromPod ],
errorMessage: 'Cannot update quick and dirty the remote video with many retries.'
}
retryTransactionWrapper(quickAndDirtyUpdateVideo, options, finalCallback)
return retryTransactionWrapper(quickAndDirtyUpdateVideo, options)
}
function quickAndDirtyUpdateVideo (videoData: any, fromPod: PodInstance, finalCallback: (err: Error) => void) {
function quickAndDirtyUpdateVideo (videoData: any, fromPod: PodInstance) {
let videoName
waterfall([
startSerializableTransaction,
return db.sequelize.transaction(t => {
return fetchRemoteVideo(fromPod.host, videoData.remoteId)
.then(videoInstance => {
const options = { transaction: t }
function findVideo (t, callback) {
fetchRemoteVideo(fromPod.host, videoData.remoteId, function (err, videoInstance) {
return callback(err, t, videoInstance)
videoName = videoInstance.name
if (videoData.views) {
videoInstance.set('views', videoData.views)
}
if (videoData.likes) {
videoInstance.set('likes', videoData.likes)
}
if (videoData.dislikes) {
videoInstance.set('dislikes', videoData.dislikes)
}
return videoInstance.save(options)
})
},
function updateVideoIntoDB (t, videoInstance, callback) {
const options = { transaction: t }
videoName = videoInstance.name
if (videoData.views) {
videoInstance.set('views', videoData.views)
}
if (videoData.likes) {
videoInstance.set('likes', videoData.likes)
}
if (videoData.dislikes) {
videoInstance.set('dislikes', videoData.dislikes)
}
videoInstance.save(options).asCallback(function (err) {
return callback(err, t)
})
},
commitTransaction
], function (err: Error, t: Sequelize.Transaction) {
if (err) {
logger.debug('Cannot quick and dirty update the remote video.', { error: err })
return rollbackTransaction(err, t, finalCallback)
}
logger.info('Remote video %s quick and dirty updated', videoName)
return finalCallback(null)
})
.then(() => logger.info('Remote video %s quick and dirty updated', videoName))
.catch(err => logger.debug('Cannot quick and dirty update the remote video.', { error: err }))
}
// Handle retries on fail
function addRemoteVideoRetryWrapper (videoToCreateData: any, fromPod: PodInstance, finalCallback: (err: Error) => void) {
function addRemoteVideoRetryWrapper (videoToCreateData: any, fromPod: PodInstance) {
const options = {
arguments: [ videoToCreateData, fromPod ],
errorMessage: 'Cannot insert the remote video with many retries.'
}
retryTransactionWrapper(addRemoteVideo, options, finalCallback)
return retryTransactionWrapper(addRemoteVideo, options)
}
function addRemoteVideo (videoToCreateData: any, fromPod: PodInstance, finalCallback: (err: Error) => void) {
function addRemoteVideo (videoToCreateData: any, fromPod: PodInstance) {
logger.debug('Adding remote video "%s".', videoToCreateData.remoteId)
waterfall([
return db.sequelize.transaction(t => {
return db.Video.loadByHostAndRemoteId(fromPod.host, videoToCreateData.remoteId)
.then(video => {
if (video) throw new Error('RemoteId and host pair is not unique.')
startSerializableTransaction,
function assertRemoteIdAndHostUnique (t, callback) {
db.Video.loadByHostAndRemoteId(fromPod.host, videoToCreateData.remoteId, function (err, video) {
if (err) return callback(err)
if (video) return callback(new Error('RemoteId and host pair is not unique.'))
return callback(null, t)
return undefined
})
},
.then(() => {
const name = videoToCreateData.author
const podId = fromPod.id
// This author is from another pod so we do not associate a user
const userId = null
function findOrCreateAuthor (t, callback) {
const name = videoToCreateData.author
const podId = fromPod.id
// This author is from another pod so we do not associate a user
const userId = null
db.Author.findOrCreateAuthor(name, podId, userId, t, function (err, authorInstance) {
return callback(err, t, authorInstance)
return db.Author.findOrCreateAuthor(name, podId, userId, t)
})
},
.then(author => {
const tags = videoToCreateData.tags
function findOrCreateTags (t, author, callback) {
const tags = videoToCreateData.tags
db.Tag.findOrCreateTags(tags, t, function (err, tagInstances) {
return callback(err, t, author, tagInstances)
return db.Tag.findOrCreateTags(tags, t).then(tagInstances => ({ author, tagInstances }))
})
},
function createVideoObject (t, author, tagInstances, callback) {
const videoData = {
name: videoToCreateData.name,
remoteId: videoToCreateData.remoteId,
extname: videoToCreateData.extname,
infoHash: videoToCreateData.infoHash,
category: videoToCreateData.category,
licence: videoToCreateData.licence,
language: videoToCreateData.language,
nsfw: videoToCreateData.nsfw,
description: videoToCreateData.description,
authorId: author.id,
duration: videoToCreateData.duration,
createdAt: videoToCreateData.createdAt,
// FIXME: updatedAt does not seems to be considered by Sequelize
updatedAt: videoToCreateData.updatedAt,
views: videoToCreateData.views,
likes: videoToCreateData.likes,
dislikes: videoToCreateData.dislikes
}
const video = db.Video.build(videoData)
return callback(null, t, tagInstances, video)
},
function generateThumbnail (t, tagInstances, video, callback) {
db.Video.generateThumbnailFromData(video, videoToCreateData.thumbnailData, function (err) {
if (err) {
logger.error('Cannot generate thumbnail from data.', { error: err })
return callback(err)
.then(({ author, tagInstances }) => {
const videoData = {
name: videoToCreateData.name,
remoteId: videoToCreateData.remoteId,
extname: videoToCreateData.extname,
infoHash: videoToCreateData.infoHash,
category: videoToCreateData.category,
licence: videoToCreateData.licence,
language: videoToCreateData.language,
nsfw: videoToCreateData.nsfw,
description: videoToCreateData.description,
authorId: author.id,
duration: videoToCreateData.duration,
createdAt: videoToCreateData.createdAt,
// FIXME: updatedAt does not seems to be considered by Sequelize
updatedAt: videoToCreateData.updatedAt,
views: videoToCreateData.views,
likes: videoToCreateData.likes,
dislikes: videoToCreateData.dislikes
}
return callback(err, t, tagInstances, video)
const video = db.Video.build(videoData)
return { tagInstances, video }
})
},
function insertVideoIntoDB (t, tagInstances, video, callback) {
const options = {
transaction: t
}
video.save(options).asCallback(function (err, videoCreated) {
return callback(err, t, tagInstances, videoCreated)
.then(({ tagInstances, video }) => {
return db.Video.generateThumbnailFromData(video, videoToCreateData.thumbnailData).then(() => ({ tagInstances, video }))
})
},
.then(({ tagInstances, video }) => {
const options = {
transaction: t
}
function associateTagsToVideo (t, tagInstances, video, callback) {
const options = {
transaction: t
}
video.setTags(tagInstances, options).asCallback(function (err) {
return callback(err, t)
return video.save(options).then(videoCreated => ({ tagInstances, videoCreated }))
})
},
.then(({ tagInstances, videoCreated }) => {
const options = {
transaction: t
}
commitTransaction
], function (err: Error, t: Sequelize.Transaction) {
if (err) {
// This is just a debug because we will retry the insert
logger.debug('Cannot insert the remote video.', { error: err })
return rollbackTransaction(err, t, finalCallback)
}
logger.info('Remote video %s inserted.', videoToCreateData.name)
return finalCallback(null)
return videoCreated.setTags(tagInstances, options)
})
})
.then(() => logger.info('Remote video %s inserted.', videoToCreateData.name))
.catch(err => {
logger.debug('Cannot insert the remote video.', { error: err })
throw err
})
}
// Handle retries on fail
function updateRemoteVideoRetryWrapper (videoAttributesToUpdate: any, fromPod: PodInstance, finalCallback: (err: Error) => void) {
function updateRemoteVideoRetryWrapper (videoAttributesToUpdate: any, fromPod: PodInstance) {
const options = {
arguments: [ videoAttributesToUpdate, fromPod ],
errorMessage: 'Cannot update the remote video with many retries'
}
retryTransactionWrapper(updateRemoteVideo, options, finalCallback)
return retryTransactionWrapper(updateRemoteVideo, options)
}
function updateRemoteVideo (videoAttributesToUpdate: any, fromPod: PodInstance, finalCallback: (err: Error) => void) {
function updateRemoteVideo (videoAttributesToUpdate: any, fromPod: PodInstance) {
logger.debug('Updating remote video "%s".', videoAttributesToUpdate.remoteId)
waterfall([
return db.sequelize.transaction(t => {
return fetchRemoteVideo(fromPod.host, videoAttributesToUpdate.remoteId)
.then(videoInstance => {
const tags = videoAttributesToUpdate.tags
startSerializableTransaction,
function findVideo (t, callback) {
fetchRemoteVideo(fromPod.host, videoAttributesToUpdate.remoteId, function (err, videoInstance) {
return callback(err, t, videoInstance)
return db.Tag.findOrCreateTags(tags, t).then(tagInstances => ({ videoInstance, tagInstances }))
})
},
.then(({ videoInstance, tagInstances }) => {
const options = { transaction: t }
function findOrCreateTags (t, videoInstance, callback) {
const tags = videoAttributesToUpdate.tags
videoInstance.set('name', videoAttributesToUpdate.name)
videoInstance.set('category', videoAttributesToUpdate.category)
videoInstance.set('licence', videoAttributesToUpdate.licence)
videoInstance.set('language', videoAttributesToUpdate.language)
videoInstance.set('nsfw', videoAttributesToUpdate.nsfw)
videoInstance.set('description', videoAttributesToUpdate.description)
videoInstance.set('infoHash', videoAttributesToUpdate.infoHash)
videoInstance.set('duration', videoAttributesToUpdate.duration)
videoInstance.set('createdAt', videoAttributesToUpdate.createdAt)
videoInstance.set('updatedAt', videoAttributesToUpdate.updatedAt)
videoInstance.set('extname', videoAttributesToUpdate.extname)
videoInstance.set('views', videoAttributesToUpdate.views)
videoInstance.set('likes', videoAttributesToUpdate.likes)
videoInstance.set('dislikes', videoAttributesToUpdate.dislikes)
db.Tag.findOrCreateTags(tags, t, function (err, tagInstances) {
return callback(err, t, videoInstance, tagInstances)
return videoInstance.save(options).then(() => ({ videoInstance, tagInstances }))
})
},
.then(({ videoInstance, tagInstances }) => {
const options = { transaction: t }
function updateVideoIntoDB (t, videoInstance, tagInstances, callback) {
const options = { transaction: t }
videoInstance.set('name', videoAttributesToUpdate.name)
videoInstance.set('category', videoAttributesToUpdate.category)
videoInstance.set('licence', videoAttributesToUpdate.licence)
videoInstance.set('language', videoAttributesToUpdate.language)
videoInstance.set('nsfw', videoAttributesToUpdate.nsfw)
videoInstance.set('description', videoAttributesToUpdate.description)
videoInstance.set('infoHash', videoAttributesToUpdate.infoHash)
videoInstance.set('duration', videoAttributesToUpdate.duration)
videoInstance.set('createdAt', videoAttributesToUpdate.createdAt)
videoInstance.set('updatedAt', videoAttributesToUpdate.updatedAt)
videoInstance.set('extname', videoAttributesToUpdate.extname)
videoInstance.set('views', videoAttributesToUpdate.views)
videoInstance.set('likes', videoAttributesToUpdate.likes)
videoInstance.set('dislikes', videoAttributesToUpdate.dislikes)
videoInstance.save(options).asCallback(function (err) {
return callback(err, t, videoInstance, tagInstances)
return videoInstance.setTags(tagInstances, options)
})
},
function associateTagsToVideo (t, videoInstance, tagInstances, callback) {
const options = { transaction: t }
videoInstance.setTags(tagInstances, options).asCallback(function (err) {
return callback(err, t)
})
},
commitTransaction
], function (err: Error, t: Sequelize.Transaction) {
if (err) {
// This is just a debug because we will retry the insert
logger.debug('Cannot update the remote video.', { error: err })
return rollbackTransaction(err, t, finalCallback)
}
logger.info('Remote video %s updated', videoAttributesToUpdate.name)
return finalCallback(null)
})
.then(() => logger.info('Remote video %s updated', videoAttributesToUpdate.name))
.catch(err => {
// This is just a debug because we will retry the insert
logger.debug('Cannot update the remote video.', { error: err })
throw err
})
}
function removeRemoteVideo (videoToRemoveData: any, fromPod: PodInstance, callback: (err: Error) => void) {
function removeRemoteVideo (videoToRemoveData: any, fromPod: PodInstance) {
// We need the instance because we have to remove some other stuffs (thumbnail etc)
fetchRemoteVideo(fromPod.host, videoToRemoveData.remoteId, function (err, video) {
// Do not return the error, continue the process
if (err) return callback(null)
logger.debug('Removing remote video %s.', video.remoteId)
video.destroy().asCallback(function (err) {
// Do not return the error, continue the process
if (err) {
logger.error('Cannot remove remote video with id %s.', videoToRemoveData.remoteId, { error: err })
}
return callback(null)
return fetchRemoteVideo(fromPod.host, videoToRemoveData.remoteId)
.then(video => {
logger.debug('Removing remote video %s.', video.remoteId)
return video.destroy()
})
.catch(err => {
logger.debug('Could not fetch remote video.', { host: fromPod.host, remoteId: videoToRemoveData.remoteId, error: err })
})
})
}
function reportAbuseRemoteVideo (reportData: any, fromPod: PodInstance, callback: (err: Error) => void) {
fetchOwnedVideo(reportData.videoRemoteId, function (err, video) {
if (err || !video) {
if (!err) err = new Error('video not found')
function reportAbuseRemoteVideo (reportData: any, fromPod: PodInstance) {
return fetchOwnedVideo(reportData.videoRemoteId)
.then(video => {
logger.debug('Reporting remote abuse for video %s.', video.id)
logger.error('Cannot load video from id.', { error: err, id: reportData.videoRemoteId })
// Do not return the error, continue the process
return callback(null)
}
logger.debug('Reporting remote abuse for video %s.', video.id)
const videoAbuseData = {
reporterUsername: reportData.reporterUsername,
reason: reportData.reportReason,
reporterPodId: fromPod.id,
videoId: video.id
}
db.VideoAbuse.create(videoAbuseData).asCallback(function (err) {
if (err) {
logger.error('Cannot create remote abuse video.', { error: err })
const videoAbuseData = {
reporterUsername: reportData.reporterUsername,
reason: reportData.reportReason,
reporterPodId: fromPod.id,
videoId: video.id
}
return callback(null)
return db.VideoAbuse.create(videoAbuseData)
})
})
.catch(err => logger.error('Cannot create remote abuse video.', { error: err }))
}
function fetchOwnedVideo (id: string, callback: (err: Error, video?: VideoInstance) => void) {
db.Video.load(id, function (err, video) {
if (err || !video) {
if (!err) err = new Error('video not found')
function fetchOwnedVideo (id: string) {
return db.Video.load(id)
.then(video => {
if (!video) throw new Error('Video not found')
return video
})
.catch(err => {
logger.error('Cannot load owned video from id.', { error: err, id })
return callback(err)
}
return callback(null, video)
})
throw err
})
}
function fetchRemoteVideo (podHost: string, remoteId: string, callback: (err: Error, video?: VideoInstance) => void) {
db.Video.loadByHostAndRemoteId(podHost, remoteId, function (err, video) {
if (err || !video) {
if (!err) err = new Error('video not found')
function fetchRemoteVideo (podHost: string, remoteId: string) {
return db.Video.loadByHostAndRemoteId(podHost, remoteId)
.then(video => {
if (!video) throw new Error('Video not found')
return video
})
.catch(err => {
logger.error('Cannot load video from host and remote id.', { error: err, podHost, remoteId })
return callback(err)
}
return callback(null, video)
})
throw err
})
}