mirror of
https://github.com/Chocobozzz/PeerTube.git
synced 2025-10-05 02:39:33 +02:00
Remove uneeded async
This commit is contained in:
parent
ed09acf14b
commit
7a9e420a02
4 changed files with 27 additions and 38 deletions
|
@ -1,4 +1,4 @@
|
|||
import { queue, QueueObject } from 'async'
|
||||
import PQueue from 'p-queue'
|
||||
import { logger } from '@server/helpers/logger'
|
||||
import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants'
|
||||
import { MActorDefault, MActorSignature } from '@server/types/models'
|
||||
|
@ -6,42 +6,33 @@ import { Activity } from '@shared/models'
|
|||
import { StatsManager } from '../stat-manager'
|
||||
import { processActivities } from './process'
|
||||
|
||||
type QueueParam = {
|
||||
activities: Activity[]
|
||||
signatureActor?: MActorSignature
|
||||
inboxActor?: MActorDefault
|
||||
}
|
||||
|
||||
class InboxManager {
|
||||
|
||||
private static instance: InboxManager
|
||||
|
||||
private readonly inboxQueue: QueueObject<QueueParam>
|
||||
private readonly inboxQueue: PQueue
|
||||
|
||||
private constructor () {
|
||||
this.inboxQueue = queue<QueueParam, Error>((task, cb) => {
|
||||
const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor }
|
||||
|
||||
processActivities(task.activities, options)
|
||||
.then(() => cb())
|
||||
.catch(err => {
|
||||
logger.error('Error in process activities.', { err })
|
||||
cb()
|
||||
})
|
||||
})
|
||||
this.inboxQueue = new PQueue({ concurrency: 1 })
|
||||
|
||||
setInterval(() => {
|
||||
StatsManager.Instance.updateInboxWaiting(this.getActivityPubMessagesWaiting())
|
||||
}, SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS)
|
||||
}
|
||||
|
||||
addInboxMessage (options: QueueParam) {
|
||||
this.inboxQueue.push(options)
|
||||
.catch(err => logger.error('Cannot add options in inbox queue.', { options, err }))
|
||||
addInboxMessage (param: {
|
||||
activities: Activity[]
|
||||
signatureActor?: MActorSignature
|
||||
inboxActor?: MActorDefault
|
||||
}) {
|
||||
this.inboxQueue.add(() => {
|
||||
const options = { signatureActor: param.signatureActor, inboxActor: param.inboxActor }
|
||||
|
||||
return processActivities(param.activities, options)
|
||||
}).catch(err => logger.error('Error with inbox queue.', { err }))
|
||||
}
|
||||
|
||||
getActivityPubMessagesWaiting () {
|
||||
return this.inboxQueue.length() + this.inboxQueue.running()
|
||||
return this.inboxQueue.size + this.inboxQueue.pending
|
||||
}
|
||||
|
||||
static get Instance () {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue