diff --git a/server/core/helpers/database-utils.ts b/server/core/helpers/database-utils.ts index 30ecfa6f2..94ab78e2f 100644 --- a/server/core/helpers/database-utils.ts +++ b/server/core/helpers/database-utils.ts @@ -5,59 +5,59 @@ import { Model } from 'sequelize-typescript' import { sequelizeTypescript } from '@server/initializers/database.js' import { logger } from './logger.js' -function retryTransactionWrapper ( +function retryTransactionWrapper ( functionToRetry: (arg1: A, arg2: B, arg3: C, arg4: D) => Promise, arg1: A, arg2: B, arg3: C, - arg4: D, + arg4: D ): Promise -function retryTransactionWrapper ( +function retryTransactionWrapper ( functionToRetry: (arg1: A, arg2: B, arg3: C) => Promise, arg1: A, arg2: B, arg3: C ): Promise -function retryTransactionWrapper ( +function retryTransactionWrapper ( functionToRetry: (arg1: A, arg2: B) => Promise, arg1: A, arg2: B ): Promise -function retryTransactionWrapper ( +function retryTransactionWrapper ( functionToRetry: (arg1: A) => Promise, arg1: A ): Promise -function retryTransactionWrapper ( +function retryTransactionWrapper ( functionToRetry: () => Promise | Bluebird ): Promise -function retryTransactionWrapper ( +function retryTransactionWrapper ( functionToRetry: (...args: any[]) => Promise, ...args: any[] ): Promise { return transactionRetryer(callback => { functionToRetry.apply(null, args) - .then((result: T) => callback(null, result)) - .catch(err => callback(err)) - }) - .catch(err => { - logger.warn(`Cannot execute ${functionToRetry.name} with many retries.`, { err }) - throw err + .then((result: T) => callback(null, result)) + .catch(err => callback(err)) }) + .catch(err => { + logger.warn(`Cannot execute ${functionToRetry.name || 'function'} with many retries.`, { err }) + throw err + }) } -function transactionRetryer (func: (err: any, data: T) => any) { +function transactionRetryer (func: (err: any, data: T) => any) { return new Promise((res, rej) => { retry( { times: 5, errorFilter: err => { - const willRetry = (err.name === 'SequelizeDatabaseError') + const willRetry = err.name === 'SequelizeDatabaseError' logger.debug('Maybe retrying the transaction function.', { willRetry, err, tags: [ 'sql', 'retry' ] }) return willRetry } @@ -68,7 +68,7 @@ function transactionRetryer (func: (err: any, data: T) => any) { }) } -function saveInTransactionWithRetries > (model: T) { +function saveInTransactionWithRetries> (model: T) { const changedKeys = model.changed() || [] return retryTransactionWrapper(() => { @@ -89,24 +89,24 @@ function saveInTransactionWithRetries // --------------------------------------------------------------------------- -function resetSequelizeInstance (instance: Model) { +function resetSequelizeInstance (instance: Model) { return instance.reload() } -function filterNonExistingModels ( +function filterNonExistingModels ( fromDatabase: T[], newModels: T[] ) { return fromDatabase.filter(f => !newModels.find(newModel => newModel.hasSameUniqueKeysThan(f))) } -function deleteAllModels > (models: T[], transaction: Transaction) { +function deleteAllModels> (models: T[], transaction: Transaction) { return Promise.all(models.map(f => f.destroy({ transaction }))) } // --------------------------------------------------------------------------- -function runInReadCommittedTransaction (fn: (t: Transaction) => Promise) { +function runInReadCommittedTransaction (fn: (t: Transaction) => Promise) { const options = { isolationLevel: Transaction.ISOLATION_LEVELS.READ_COMMITTED } return sequelizeTypescript.transaction(options, t => fn(t)) diff --git a/server/core/lib/runners/runner.ts b/server/core/lib/runners/runner.ts index 2486fc8db..7e242ff65 100644 --- a/server/core/lib/runners/runner.ts +++ b/server/core/lib/runners/runner.ts @@ -1,8 +1,7 @@ import { RunnerJobState, RunnerJobStateType } from '@peertube/peertube-models' -import { retryTransactionWrapper } from '@server/helpers/database-utils.js' +import { runInReadCommittedTransaction } from '@server/helpers/database-utils.js' import { logger, loggerTagsFactory } from '@server/helpers/logger.js' import { RUNNER_JOBS } from '@server/initializers/constants.js' -import { sequelizeTypescript } from '@server/initializers/database.js' import { MRunner, MRunnerJob } from '@server/types/models/runners/index.js' import express from 'express' @@ -10,7 +9,7 @@ const lTags = loggerTagsFactory('runner') const updatingRunner = new Set() -function updateLastRunnerContact (req: express.Request, runner: MRunner) { +export function updateLastRunnerContact (req: express.Request, runner: MRunner) { const now = new Date() // Don't update last runner contact too often @@ -24,15 +23,13 @@ function updateLastRunnerContact (req: express.Request, runner: MRunner) { logger.debug('Updating last runner contact for %s', runner.name, lTags(runner.name)) - retryTransactionWrapper(() => { - return sequelizeTypescript.transaction(async transaction => { - return runner.save({ transaction }) - }) + return runInReadCommittedTransaction(async transaction => { + return runner.save({ transaction }) }).catch(err => logger.error('Cannot update last runner contact for %s', runner.name, { err, ...lTags(runner.name) })) .finally(() => updatingRunner.delete(runner.id)) } -function runnerJobCanBeCancelled (runnerJob: MRunnerJob) { +export function runnerJobCanBeCancelled (runnerJob: MRunnerJob) { const allowedStates = new Set([ RunnerJobState.PENDING, RunnerJobState.PROCESSING, @@ -41,8 +38,3 @@ function runnerJobCanBeCancelled (runnerJob: MRunnerJob) { return allowedStates.has(runnerJob.state) } - -export { - updateLastRunnerContact, - runnerJobCanBeCancelled -}