mirror of
https://github.com/Chocobozzz/PeerTube.git
synced 2025-10-04 02:09:37 +02:00
Use bullmq job dependency
This commit is contained in:
parent
5a921e7b74
commit
bd911b54b5
42 changed files with 314 additions and 152 deletions
|
@ -1,4 +1,6 @@
|
|||
import {
|
||||
FlowJob,
|
||||
FlowProducer,
|
||||
Job,
|
||||
JobsOptions,
|
||||
Queue,
|
||||
|
@ -13,7 +15,7 @@ import {
|
|||
import { jobStates } from '@server/helpers/custom-validators/jobs'
|
||||
import { CONFIG } from '@server/initializers/config'
|
||||
import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
|
||||
import { timeoutPromise } from '@shared/core-utils'
|
||||
import { pick, timeoutPromise } from '@shared/core-utils'
|
||||
import {
|
||||
ActivitypubFollowPayload,
|
||||
ActivitypubHttpBroadcastPayload,
|
||||
|
@ -22,10 +24,12 @@ import {
|
|||
ActorKeysPayload,
|
||||
DeleteResumableUploadMetaFilePayload,
|
||||
EmailPayload,
|
||||
FederateVideoPayload,
|
||||
JobState,
|
||||
JobType,
|
||||
ManageVideoTorrentPayload,
|
||||
MoveObjectStoragePayload,
|
||||
NotifyPayload,
|
||||
RefreshPayload,
|
||||
VideoFileImportPayload,
|
||||
VideoImportPayload,
|
||||
|
@ -45,8 +49,10 @@ import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unica
|
|||
import { refreshAPObject } from './handlers/activitypub-refresher'
|
||||
import { processActorKeys } from './handlers/actor-keys'
|
||||
import { processEmail } from './handlers/email'
|
||||
import { processFederateVideo } from './handlers/federate-video'
|
||||
import { processManageVideoTorrent } from './handlers/manage-video-torrent'
|
||||
import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage'
|
||||
import { processNotify } from './handlers/notify'
|
||||
import { processVideoFileImport } from './handlers/video-file-import'
|
||||
import { processVideoImport } from './handlers/video-import'
|
||||
import { processVideoLiveEnding } from './handlers/video-live-ending'
|
||||
|
@ -54,7 +60,7 @@ import { processVideoStudioEdition } from './handlers/video-studio-edition'
|
|||
import { processVideoTranscoding } from './handlers/video-transcoding'
|
||||
import { processVideosViewsStats } from './handlers/video-views-stats'
|
||||
|
||||
type CreateJobArgument =
|
||||
export type CreateJobArgument =
|
||||
{ type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
|
||||
{ type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } |
|
||||
{ type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
|
||||
|
@ -73,7 +79,9 @@ type CreateJobArgument =
|
|||
{ type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } |
|
||||
{ type: 'video-studio-edition', payload: VideoStudioEditionPayload } |
|
||||
{ type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } |
|
||||
{ type: 'move-to-object-storage', payload: MoveObjectStoragePayload }
|
||||
{ type: 'notify', payload: NotifyPayload } |
|
||||
{ type: 'move-to-object-storage', payload: MoveObjectStoragePayload } |
|
||||
{ type: 'federate-video', payload: FederateVideoPayload }
|
||||
|
||||
export type CreateJobOptions = {
|
||||
delay?: number
|
||||
|
@ -98,7 +106,9 @@ const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
|
|||
'video-redundancy': processVideoRedundancy,
|
||||
'move-to-object-storage': processMoveToObjectStorage,
|
||||
'manage-video-torrent': processManageVideoTorrent,
|
||||
'video-studio-edition': processVideoStudioEdition
|
||||
'notify': processNotify,
|
||||
'video-studio-edition': processVideoStudioEdition,
|
||||
'federate-video': processFederateVideo
|
||||
}
|
||||
|
||||
const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = {
|
||||
|
@ -123,7 +133,9 @@ const jobTypes: JobType[] = [
|
|||
'video-live-ending',
|
||||
'move-to-object-storage',
|
||||
'manage-video-torrent',
|
||||
'video-studio-edition'
|
||||
'video-studio-edition',
|
||||
'notify',
|
||||
'federate-video'
|
||||
]
|
||||
|
||||
const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ])
|
||||
|
@ -137,6 +149,8 @@ class JobQueue {
|
|||
private queueSchedulers: { [id in JobType]?: QueueScheduler } = {}
|
||||
private queueEvents: { [id in JobType]?: QueueEvents } = {}
|
||||
|
||||
private flowProducer: FlowProducer
|
||||
|
||||
private initialized = false
|
||||
private jobRedisPrefix: string
|
||||
|
||||
|
@ -157,6 +171,11 @@ class JobQueue {
|
|||
this.buildQueueEvent(handlerName, produceOnly)
|
||||
}
|
||||
|
||||
this.flowProducer = new FlowProducer({
|
||||
connection: this.getRedisConnection(),
|
||||
prefix: this.jobRedisPrefix
|
||||
})
|
||||
|
||||
this.addRepeatableJobs()
|
||||
}
|
||||
|
||||
|
@ -243,6 +262,8 @@ class JobQueue {
|
|||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
async terminate () {
|
||||
const promises = Object.keys(this.workers)
|
||||
.map(handlerName => {
|
||||
|
@ -278,28 +299,56 @@ class JobQueue {
|
|||
}
|
||||
}
|
||||
|
||||
createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void {
|
||||
this.createJobWithPromise(obj, options)
|
||||
.catch(err => logger.error('Cannot create job.', { err, obj }))
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
createJobAsync (options: CreateJobArgument & CreateJobOptions): void {
|
||||
this.createJob(options)
|
||||
.catch(err => logger.error('Cannot create job.', { err, options }))
|
||||
}
|
||||
|
||||
async createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) {
|
||||
const queue: Queue = this.queues[obj.type]
|
||||
async createJob (options: CreateJobArgument & CreateJobOptions) {
|
||||
const queue: Queue = this.queues[options.type]
|
||||
if (queue === undefined) {
|
||||
logger.error('Unknown queue %s: cannot create job.', obj.type)
|
||||
logger.error('Unknown queue %s: cannot create job.', options.type)
|
||||
return
|
||||
}
|
||||
|
||||
const jobArgs: JobsOptions = {
|
||||
const jobOptions = this.buildJobOptions(options.type as JobType, pick(options, [ 'priority', 'delay' ]))
|
||||
|
||||
return queue.add('job', options.payload, jobOptions)
|
||||
}
|
||||
|
||||
async createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) {
|
||||
let lastJob: FlowJob
|
||||
|
||||
for (const job of jobs) {
|
||||
if (!job) continue
|
||||
|
||||
lastJob = {
|
||||
name: 'job',
|
||||
data: job.payload,
|
||||
queueName: job.type,
|
||||
opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])),
|
||||
children: lastJob
|
||||
? [ lastJob ]
|
||||
: []
|
||||
}
|
||||
}
|
||||
|
||||
return this.flowProducer.add(lastJob)
|
||||
}
|
||||
|
||||
private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions {
|
||||
return {
|
||||
backoff: { delay: 60 * 1000, type: 'exponential' },
|
||||
attempts: JOB_ATTEMPTS[obj.type],
|
||||
attempts: JOB_ATTEMPTS[type],
|
||||
priority: options.priority,
|
||||
delay: options.delay
|
||||
}
|
||||
|
||||
return queue.add('job', obj.payload, jobArgs)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
async listForApi (options: {
|
||||
state?: JobState
|
||||
start: number
|
||||
|
@ -367,6 +416,8 @@ class JobQueue {
|
|||
return Promise.all(promises)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
async removeOldJobs () {
|
||||
for (const key of Object.keys(this.queues)) {
|
||||
const queue: Queue = this.queues[key]
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue