diff --git a/apps/peertube-runner/src/server/process/shared/common.ts b/apps/peertube-runner/src/server/process/shared/common.ts index 9569c6cd5..52ea0a217 100644 --- a/apps/peertube-runner/src/server/process/shared/common.ts +++ b/apps/peertube-runner/src/server/process/shared/common.ts @@ -61,8 +61,12 @@ export function scheduleTranscodingProgress (options: { : 60000 const update = () => { - server.runnerJobs.update({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken, progress: progressGetter() }) - .catch(err => logger.error({ err }, 'Cannot send job progress')) + server.runnerJobs.update({ + jobToken: job.jobToken, + jobUUID: job.uuid, + runnerToken, + progress: progressGetter() + }).catch(err => logger.error({ err }, 'Cannot send job progress')) } const interval = setInterval(() => { diff --git a/apps/peertube-runner/src/server/process/shared/process-live.ts b/apps/peertube-runner/src/server/process/shared/process-live.ts index fb071953f..f83a39d9b 100644 --- a/apps/peertube-runner/src/server/process/shared/process-live.ts +++ b/apps/peertube-runner/src/server/process/shared/process-live.ts @@ -50,22 +50,23 @@ export class ProcessLiveRTMPHLSTranscoding { logger.debug(`Using ${this.outputPath} to process live rtmp hls transcoding job ${options.job.uuid}`) } - process () { - const job = this.options.job - const payload = job.payload + private get payload () { + return this.options.job.payload + } + process () { return new Promise(async (res, rej) => { try { await ensureDir(this.outputPath) - logger.info(`Probing ${payload.input.rtmpUrl}`) - const probe = await ffprobePromise(payload.input.rtmpUrl) - logger.info({ probe }, `Probed ${payload.input.rtmpUrl}`) + logger.info(`Probing ${this.payload.input.rtmpUrl}`) + const probe = await ffprobePromise(this.payload.input.rtmpUrl) + logger.info({ probe }, `Probed ${this.payload.input.rtmpUrl}`) - const hasAudio = await hasAudioStream(payload.input.rtmpUrl, probe) - const hasVideo = await hasVideoStream(payload.input.rtmpUrl, probe) - const bitrate = await getVideoStreamBitrate(payload.input.rtmpUrl, probe) - const { ratio } = await getVideoStreamDimensionsInfo(payload.input.rtmpUrl, probe) + const hasAudio = await hasAudioStream(this.payload.input.rtmpUrl, probe) + const hasVideo = await hasVideoStream(this.payload.input.rtmpUrl, probe) + const bitrate = await getVideoStreamBitrate(this.payload.input.rtmpUrl, probe) + const { ratio } = await getVideoStreamDimensionsInfo(this.payload.input.rtmpUrl, probe) const m3u8Watcher = watch(this.outputPath + '/*.m3u8') this.fsWatchers.push(m3u8Watcher) @@ -107,15 +108,15 @@ export class ProcessLiveRTMPHLSTranscoding { }) this.ffmpegCommand = await buildFFmpegLive().getLiveTranscodingCommand({ - inputUrl: payload.input.rtmpUrl, + inputUrl: this.payload.input.rtmpUrl, outPath: this.outputPath, masterPlaylistName: 'master.m3u8', - segmentListSize: payload.output.segmentListSize, - segmentDuration: payload.output.segmentDuration, + segmentListSize: this.payload.output.segmentListSize, + segmentDuration: this.payload.output.segmentDuration, - toTranscode: payload.output.toTranscode, + toTranscode: this.payload.output.toTranscode, splitAudioAndVideo: true, bitrate, @@ -126,7 +127,7 @@ export class ProcessLiveRTMPHLSTranscoding { probe }) - logger.info(`Running live transcoding for ${payload.input.rtmpUrl}`) + logger.info(`Running live transcoding for ${this.payload.input.rtmpUrl}`) this.ffmpegCommand.on('error', (err, stdout, stderr) => { this.onFFmpegError({ err, stdout, stderr }) @@ -241,7 +242,8 @@ export class ProcessLiveRTMPHLSTranscoding { jobToken: this.options.job.jobToken, jobUUID: this.options.job.uuid, runnerToken: this.options.runnerToken, - payload: successBody + payload: successBody, + reqPayload: this.payload }) } @@ -324,7 +326,7 @@ export class ProcessLiveRTMPHLSTranscoding { await Promise.all(parallelPromises) } - private async updateWithRetry (payload: CustomLiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise { + private async updateWithRetry (updatePayload: CustomLiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise { if (this.ended || this.errored) return try { @@ -332,7 +334,8 @@ export class ProcessLiveRTMPHLSTranscoding { jobToken: this.options.job.jobToken, jobUUID: this.options.job.uuid, runnerToken: this.options.runnerToken, - payload: payload as any + payload: updatePayload as any, + reqPayload: this.payload }) } catch (err) { if (currentTry >= 3) throw err @@ -341,7 +344,7 @@ export class ProcessLiveRTMPHLSTranscoding { logger.warn({ err }, 'Will retry update after error') await wait(250) - return this.updateWithRetry(payload, currentTry + 1) + return this.updateWithRetry(updatePayload, currentTry + 1) } } diff --git a/apps/peertube-runner/src/server/process/shared/process-studio.ts b/apps/peertube-runner/src/server/process/shared/process-studio.ts index 56acad140..6a4140846 100644 --- a/apps/peertube-runner/src/server/process/shared/process-studio.ts +++ b/apps/peertube-runner/src/server/process/shared/process-studio.ts @@ -86,7 +86,8 @@ export async function processStudioTranscoding (options: ProcessOptions { job: RunnerJob & { jobToken: string } diff --git a/packages/models/src/runners/index.ts b/packages/models/src/runners/index.ts index cfe997b64..34c8797bf 100644 --- a/packages/models/src/runners/index.ts +++ b/packages/models/src/runners/index.ts @@ -9,13 +9,13 @@ export * from './register-runner-body.model.js' export * from './register-runner-result.model.js' export * from './request-runner-job-body.model.js' export * from './request-runner-job-result.model.js' -export * from './runner-job-payload.model.js' -export * from './runner-job-private-payload.model.js' -export * from './runner-job-state.model.js' -export * from './runner-job-success-body.model.js' -export * from './runner-job-type.type.js' -export * from './runner-job-update-body.model.js' -export * from './runner-job.model.js' +export * from './runner-jobs/runner-job-payload.model.js' +export * from './runner-jobs/runner-job-private-payload.model.js' +export * from './runner-jobs/runner-job-state.model.js' +export * from './runner-jobs/runner-job-success-body.model.js' +export * from './runner-jobs/runner-job-type.type.js' +export * from './runner-jobs/runner-job-update-body.model.js' +export * from './runner-jobs/runner-job.model.js' export * from './runner-registration-token.js' export * from './runner.model.js' export * from './unregister-runner-body.model.js' diff --git a/packages/models/src/runners/list-runner-jobs-query.model.ts b/packages/models/src/runners/list-runner-jobs-query.model.ts index 395fe4b92..76f2ab27b 100644 --- a/packages/models/src/runners/list-runner-jobs-query.model.ts +++ b/packages/models/src/runners/list-runner-jobs-query.model.ts @@ -1,4 +1,4 @@ -import { RunnerJobStateType } from './runner-job-state.model.js' +import { RunnerJobStateType } from './runner-jobs/runner-job-state.model.js' export interface ListRunnerJobsQuery { start?: number diff --git a/packages/models/src/runners/request-runner-job-body.model.ts b/packages/models/src/runners/request-runner-job-body.model.ts index 16f59a022..dfbd11105 100644 --- a/packages/models/src/runners/request-runner-job-body.model.ts +++ b/packages/models/src/runners/request-runner-job-body.model.ts @@ -1,4 +1,4 @@ -import { RunnerJobType } from './runner-job-type.type.js' +import { RunnerJobType } from './runner-jobs/runner-job-type.type.js' export interface RequestRunnerJobBody { runnerToken: string diff --git a/packages/models/src/runners/request-runner-job-result.model.ts b/packages/models/src/runners/request-runner-job-result.model.ts index 30c8c640c..f52166047 100644 --- a/packages/models/src/runners/request-runner-job-result.model.ts +++ b/packages/models/src/runners/request-runner-job-result.model.ts @@ -1,5 +1,5 @@ -import { RunnerJobPayload } from './runner-job-payload.model.js' -import { RunnerJobType } from './runner-job-type.type.js' +import { RunnerJobPayload } from './runner-jobs/runner-job-payload.model.js' +import { RunnerJobType } from './runner-jobs/runner-job-type.type.js' export interface RequestRunnerJobResult

{ availableJobs: { diff --git a/packages/models/src/runners/runner-job-payload.model.ts b/packages/models/src/runners/runner-jobs/runner-job-payload.model.ts similarity index 67% rename from packages/models/src/runners/runner-job-payload.model.ts rename to packages/models/src/runners/runner-jobs/runner-job-payload.model.ts index dc0fb3168..ddc8a33ec 100644 --- a/packages/models/src/runners/runner-job-payload.model.ts +++ b/packages/models/src/runners/runner-jobs/runner-job-payload.model.ts @@ -1,4 +1,9 @@ -import { VideoStudioTaskPayload } from '../server/index.js' +import { VideoStudioTaskPayload } from '../../server/index.js' + +export type RunnerJobCustomUpload = { + url: string + method?: 'PUT' | 'POST' // default 'PUT' +} export type RunnerJobVODPayload = RunnerJobVODWebVideoTranscodingPayload | @@ -22,6 +27,9 @@ export interface RunnerJobVODWebVideoTranscodingPayload { output: { resolution: number fps: number + + // To upload on an external URL + videoFileCustomUpload?: RunnerJobCustomUpload } } @@ -35,6 +43,10 @@ export interface RunnerJobVODHLSTranscodingPayload { resolution: number fps: number separatedAudio: boolean + + // To upload on an external URL + videoFileCustomUpload?: RunnerJobCustomUpload + resolutionPlaylistFileCustomUpload?: RunnerJobCustomUpload } } @@ -47,6 +59,9 @@ export interface RunnerJobVODAudioMergeTranscodingPayload { output: { resolution: number fps: number + + // To upload on an external URL + videoFileCustomUpload?: RunnerJobCustomUpload } } @@ -57,12 +72,22 @@ export interface RunnerJobStudioTranscodingPayload { } tasks: VideoStudioTaskPayload[] + + output: { + // To upload on an external URL + videoFileCustomUpload?: RunnerJobCustomUpload + } } export interface RunnerJobTranscriptionPayload { input: { videoFileUrl: string } + + output: { + // To upload on an external URL + vttFileCustomUpload?: RunnerJobCustomUpload + } } // --------------------------------------------------------------------------- @@ -86,5 +111,10 @@ export interface RunnerJobLiveRTMPHLSTranscodingPayload { segmentDuration: number segmentListSize: number + + // To upload on an external URL + masterPlaylistFileCustomUpload?: RunnerJobCustomUpload + resolutionPlaylistFileCustomUpload?: RunnerJobCustomUpload + videoChunkFileCustomUpload?: RunnerJobCustomUpload } } diff --git a/packages/models/src/runners/runner-job-private-payload.model.ts b/packages/models/src/runners/runner-jobs/runner-job-private-payload.model.ts similarity index 96% rename from packages/models/src/runners/runner-job-private-payload.model.ts rename to packages/models/src/runners/runner-jobs/runner-job-private-payload.model.ts index 3a1398bc7..6caf38fcd 100644 --- a/packages/models/src/runners/runner-job-private-payload.model.ts +++ b/packages/models/src/runners/runner-jobs/runner-job-private-payload.model.ts @@ -1,4 +1,4 @@ -import { VideoStudioTaskPayload } from '../server/index.js' +import { VideoStudioTaskPayload } from '../../server/index.js' export type RunnerJobVODPrivatePayload = RunnerJobVODWebVideoTranscodingPrivatePayload | diff --git a/packages/models/src/runners/runner-job-state.model.ts b/packages/models/src/runners/runner-jobs/runner-job-state.model.ts similarity index 100% rename from packages/models/src/runners/runner-job-state.model.ts rename to packages/models/src/runners/runner-jobs/runner-job-state.model.ts diff --git a/packages/models/src/runners/runner-job-success-body.model.ts b/packages/models/src/runners/runner-jobs/runner-job-success-body.model.ts similarity index 100% rename from packages/models/src/runners/runner-job-success-body.model.ts rename to packages/models/src/runners/runner-jobs/runner-job-success-body.model.ts diff --git a/packages/models/src/runners/runner-job-type.type.ts b/packages/models/src/runners/runner-jobs/runner-job-type.type.ts similarity index 100% rename from packages/models/src/runners/runner-job-type.type.ts rename to packages/models/src/runners/runner-jobs/runner-job-type.type.ts diff --git a/packages/models/src/runners/runner-job-update-body.model.ts b/packages/models/src/runners/runner-jobs/runner-job-update-body.model.ts similarity index 100% rename from packages/models/src/runners/runner-job-update-body.model.ts rename to packages/models/src/runners/runner-jobs/runner-job-update-body.model.ts diff --git a/packages/models/src/runners/runner-job.model.ts b/packages/models/src/runners/runner-jobs/runner-job.model.ts similarity index 95% rename from packages/models/src/runners/runner-job.model.ts rename to packages/models/src/runners/runner-jobs/runner-job.model.ts index 6d6427396..54ba906f0 100644 --- a/packages/models/src/runners/runner-job.model.ts +++ b/packages/models/src/runners/runner-jobs/runner-job.model.ts @@ -1,4 +1,4 @@ -import { VideoConstant } from '../videos/index.js' +import { VideoConstant } from '../../videos/index.js' import { RunnerJobPayload } from './runner-job-payload.model.js' import { RunnerJobPrivatePayload } from './runner-job-private-payload.model.js' import { RunnerJobStateType } from './runner-job-state.model.js' diff --git a/packages/server-commands/src/runners/runner-jobs-command.ts b/packages/server-commands/src/runners/runner-jobs-command.ts index 0aca00cec..aaec00790 100644 --- a/packages/server-commands/src/runners/runner-jobs-command.ts +++ b/packages/server-commands/src/runners/runner-jobs-command.ts @@ -10,14 +10,18 @@ import { RequestRunnerJobResult, ResultList, RunnerJobAdmin, + RunnerJobCustomUpload, RunnerJobLiveRTMPHLSTranscodingPayload, RunnerJobPayload, RunnerJobState, RunnerJobStateType, RunnerJobSuccessBody, RunnerJobSuccessPayload, + RunnerJobTranscriptionPayload, RunnerJobType, RunnerJobUpdateBody, + RunnerJobVODAudioMergeTranscodingPayload, + RunnerJobVODHLSTranscodingPayload, RunnerJobVODPayload, TranscriptionSuccess, VODHLSTranscodingSuccess, @@ -133,39 +137,6 @@ export class RunnerJobsCommand extends AbstractCommand { }) } - update (options: OverrideCommandOptions & RunnerJobUpdateBody & { jobUUID: string }) { - const path = '/api/v1/runners/jobs/' + options.jobUUID + '/update' - - const { payload } = options - const attaches: { [id: string]: any } = {} - let payloadWithoutFiles = payload - - if (isLiveRTMPHLSTranscodingUpdatePayload(payload)) { - if (payload.masterPlaylistFile) { - attaches[`payload[masterPlaylistFile]`] = payload.masterPlaylistFile - } - - attaches[`payload[resolutionPlaylistFile]`] = payload.resolutionPlaylistFile - attaches[`payload[videoChunkFile]`] = payload.videoChunkFile - - payloadWithoutFiles = omit(payloadWithoutFiles, [ 'masterPlaylistFile', 'resolutionPlaylistFile', 'videoChunkFile' ]) - } - - return this.postUploadRequest({ - ...options, - - path, - fields: { - ...pick(options, [ 'progress', 'jobToken', 'runnerToken' ]), - - payload: payloadWithoutFiles - }, - attaches, - implicitToken: false, - defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204 - }) - } - error (options: OverrideCommandOptions & ErrorRunnerJobBody & { jobUUID: string }) { const path = '/api/v1/runners/jobs/' + options.jobUUID + '/error' @@ -179,32 +150,123 @@ export class RunnerJobsCommand extends AbstractCommand { }) } - success (options: OverrideCommandOptions & RunnerJobSuccessBody & { jobUUID: string }) { + // --------------------------------------------------------------------------- + + update (options: OverrideCommandOptions & RunnerJobUpdateBody & { jobUUID: string, reqPayload?: RunnerJobPayload }) { + const path = '/api/v1/runners/jobs/' + options.jobUUID + '/update' + + const { payload } = options + const attaches: { [id: string]: any } = {} + const customUploads: (RunnerJobCustomUpload & { file: Blob | string })[] = [] + + let payloadWithoutFiles = payload + + if (isLiveRTMPHLSTranscodingUpdatePayload(payload)) { + const reqPayload = options.reqPayload as RunnerJobLiveRTMPHLSTranscodingPayload + + if (payload.masterPlaylistFile) { + this.updateUploadPayloads({ + attachesStore: attaches, + customUploadsStore: customUploads, + + file: payload.masterPlaylistFile, + attachName: 'masterPlaylistFile', + customUpload: reqPayload?.output?.masterPlaylistFileCustomUpload + }) + + attaches[`payload[masterPlaylistFile]`] = payload.masterPlaylistFile + } + + this.updateUploadPayloads({ + attachesStore: attaches, + customUploadsStore: customUploads, + + file: payload.resolutionPlaylistFile, + attachName: 'resolutionPlaylistFile', + customUpload: reqPayload?.output?.resolutionPlaylistFileCustomUpload + }) + + this.updateUploadPayloads({ + attachesStore: attaches, + customUploadsStore: customUploads, + + file: payload.videoChunkFile, + attachName: 'videoChunkFile', + customUpload: reqPayload?.output?.videoChunkFileCustomUpload + }) + + payloadWithoutFiles = omit(payloadWithoutFiles, [ 'masterPlaylistFile', 'resolutionPlaylistFile', 'videoChunkFile' ]) + } + + return this.uploadRunnerJobRequest({ + ...options, + + path, + fields: { + ...pick(options, [ 'progress', 'jobToken', 'runnerToken' ]), + + payload: payloadWithoutFiles + }, + attaches, + customUploads + }) + } + + success (options: OverrideCommandOptions & RunnerJobSuccessBody & { jobUUID: string, reqPayload?: RunnerJobPayload }) { const { payload } = options const path = '/api/v1/runners/jobs/' + options.jobUUID + '/success' const attaches: { [id: string]: any } = {} + const customUploads: (RunnerJobCustomUpload & { file: Blob | string })[] = [] + let payloadWithoutFiles = payload if ((isWebVideoOrAudioMergeTranscodingPayloadSuccess(payload) || isHLSTranscodingPayloadSuccess(payload)) && payload.videoFile) { - attaches[`payload[videoFile]`] = payload.videoFile + const reqPayload = options.reqPayload as RunnerJobVODAudioMergeTranscodingPayload | RunnerJobVODHLSTranscodingPayload + + this.updateUploadPayloads({ + attachesStore: attaches, + customUploadsStore: customUploads, + + file: payload.videoFile, + attachName: 'videoFile', + customUpload: reqPayload?.output?.videoFileCustomUpload + }) payloadWithoutFiles = omit(payloadWithoutFiles as VODWebVideoTranscodingSuccess, [ 'videoFile' ]) } if (isHLSTranscodingPayloadSuccess(payload) && payload.resolutionPlaylistFile) { - attaches[`payload[resolutionPlaylistFile]`] = payload.resolutionPlaylistFile + const reqPayload = options.reqPayload as RunnerJobVODHLSTranscodingPayload + + this.updateUploadPayloads({ + attachesStore: attaches, + customUploadsStore: customUploads, + + file: payload.resolutionPlaylistFile, + attachName: 'resolutionPlaylistFile', + customUpload: reqPayload?.output?.resolutionPlaylistFileCustomUpload + }) payloadWithoutFiles = omit(payloadWithoutFiles as VODHLSTranscodingSuccess, [ 'resolutionPlaylistFile' ]) } if (isTranscriptionPayloadSuccess(payload) && payload.vttFile) { - attaches[`payload[vttFile]`] = payload.vttFile + const reqPayload = options.reqPayload as RunnerJobTranscriptionPayload + + this.updateUploadPayloads({ + attachesStore: attaches, + customUploadsStore: customUploads, + + file: payload.vttFile, + attachName: 'vttFile', + customUpload: reqPayload?.output?.vttFileCustomUpload + }) payloadWithoutFiles = omit(payloadWithoutFiles as TranscriptionSuccess, [ 'vttFile' ]) } - return this.postUploadRequest({ + return this.uploadRunnerJobRequest({ ...options, path, @@ -214,11 +276,63 @@ export class RunnerJobsCommand extends AbstractCommand { payload: payloadWithoutFiles }, + customUploads + }) + } + + private updateUploadPayloads (options: { + file: Blob | string + customUpload?: RunnerJobCustomUpload + attachName: string + + attachesStore: Record + customUploadsStore: (RunnerJobCustomUpload & { file: Blob | string })[] + }) { + if (options.customUpload) { + options.customUploadsStore.push({ ...options.customUpload, file: options.file }) + } else { + options.attachesStore[`payload[${options.attachName}]`] = options.file + } + } + + private async uploadRunnerJobRequest (options: OverrideCommandOptions & { + path: string + + fields: { [ fieldName: string ]: any } + attaches: { [ fieldName: string ]: any } + + customUploads?: (RunnerJobCustomUpload & { file: string | Blob })[] + }) { + for (const customUpload of (options.customUploads || [])) { + await this.customUpload(customUpload) + } + + await this.postUploadRequest({ + ...omit(options, [ 'customUploads' ]), + implicitToken: false, defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204 }) } + private customUpload (options: RunnerJobCustomUpload & { file: Blob | string }) { + const parsedUrl = new URL(options.url) + + const reqOptions = { + url: parsedUrl.origin, + path: parsedUrl.pathname, + attaches: { file: options.file }, + implicitToken: false, + defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204 + } + + if (options.method === 'POST') return this.postUploadRequest(reqOptions) + + return this.putUploadRequest(reqOptions) + } + + // --------------------------------------------------------------------------- + getJobFile (options: OverrideCommandOptions & { url: string, jobToken: string, runnerToken: string }) { const { host, protocol, pathname } = new URL(options.url) @@ -256,7 +370,7 @@ export class RunnerJobsCommand extends AbstractCommand { const jobToken = job.jobToken const payload: RunnerJobSuccessPayload = { videoFile: 'video_short.mp4' } - await this.success({ runnerToken, jobUUID, jobToken, payload }) + await this.success({ runnerToken, jobUUID, jobToken, payload, reqPayload: undefined }) await waitJobs([ this.server ]) diff --git a/packages/tests/src/peertube-runner/custom-upload.ts b/packages/tests/src/peertube-runner/custom-upload.ts new file mode 100644 index 000000000..336ed5664 --- /dev/null +++ b/packages/tests/src/peertube-runner/custom-upload.ts @@ -0,0 +1,122 @@ +/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */ + +import { HttpStatusCode, RunnerJobState, RunnerJobVODPayload } from '@peertube/peertube-models' +import { + cleanupTests, + createSingleServer, + makeGetRequest, + PeerTubeServer, + setAccessTokensToServers, + setDefaultVideoChannel, + waitJobs +} from '@peertube/peertube-server-commands' +import { MockUpload } from '@tests/shared/mock-servers/mock-upload.js' +import { PeerTubeRunnerProcess } from '@tests/shared/peertube-runner-process.js' +import { SQLCommand } from '@tests/shared/sql-command.js' +import { wait } from '../../../core-utils/src/common/time.js' + +describe('Test peertube-runner custom upload', function () { + let server: PeerTubeServer + let peertubeRunner: PeerTubeRunnerProcess + + let sqlCommand: SQLCommand + let mockUploadServerUrl: string + let transcoded: string + + const mockUpload = new MockUpload() + + async function registerRunner () { + const registrationToken = await server.runnerRegistrationTokens.getFirstRegistrationToken() + await peertubeRunner.registerPeerTubeInstance({ registrationToken, runnerName: 'runner' }) + } + + async function unregisterRunner () { + await peertubeRunner.unregisterPeerTubeInstance({ runnerName: 'runner' }) + } + + async function updatePayload (method?: 'PUT' | 'POST') { + const { data } = await server.runnerJobs.list({ stateOneOf: [ RunnerJobState.PENDING ] }) + + for (const job of data) { + const payload = job.payload as RunnerJobVODPayload + + payload.output.videoFileCustomUpload = { + method, + url: mockUploadServerUrl + '/upload-file' + } + + await sqlCommand.setRunnerJobPayload(job.uuid, payload) + } + } + + before(async function () { + this.timeout(120_000) + + server = await createSingleServer(1) + + await setAccessTokensToServers([ server ]) + await setDefaultVideoChannel([ server ]) + + await server.config.enableTranscoding() + const { uuid } = await server.videos.quickUpload({ name: 'transcoded' }) + transcoded = uuid + await waitJobs([ server ]) + + await server.config.enableRemoteTranscoding() + + peertubeRunner = new PeerTubeRunnerProcess(server) + await peertubeRunner.runServer() + + const uploadPort = await mockUpload.initialize() + mockUploadServerUrl = 'http://127.0.0.1:' + uploadPort + + sqlCommand = new SQLCommand(server) + }) + + it('Should upload the file on another endpoint for web video', async function () { + await server.videos.quickUpload({ name: 'video 1' }) + await server.videos.quickUpload({ name: 'video 2' }) + await waitJobs([ server ]) + + await updatePayload('POST') + await registerRunner() + + do { + const { body } = await makeGetRequest({ url: mockUploadServerUrl, path: '/uploaded-files', expectedStatus: HttpStatusCode.OK_200 }) + + // 2 x 5 retries because the server doesn't accept non existing files + if (body.length === 10 && body.every(f => f.method === 'POST')) break + await wait(500) + } while (true) + + await unregisterRunner() + mockUpload.cleanUpload() + }) + + it('Should upload the file on another endpoint for HLS', async function () { + await server.videos.runTranscoding({ transcodingType: 'hls', videoId: transcoded }) + await waitJobs([ server ]) + + await updatePayload() + await registerRunner() + + do { + const { body } = await makeGetRequest({ url: mockUploadServerUrl, path: '/uploaded-files', expectedStatus: HttpStatusCode.OK_200 }) + + // 5 retries because the server doesn't accept non existing files + if (body.length === 5 && body.every(f => f.method === 'PUT')) break + await wait(500) + } while (true) + + await unregisterRunner() + mockUpload.cleanUpload() + }) + + after(async function () { + peertubeRunner.kill() + + await mockUpload.terminate() + await sqlCommand.cleanup() + await cleanupTests([ server ]) + }) +}) diff --git a/packages/tests/src/peertube-runner/index.ts b/packages/tests/src/peertube-runner/index.ts index c046e1ed2..41758deec 100644 --- a/packages/tests/src/peertube-runner/index.ts +++ b/packages/tests/src/peertube-runner/index.ts @@ -1,4 +1,5 @@ export * from './client-cli.js' +export * from './custom-upload.js' export * from './live-transcoding.js' export * from './replace-file.js' export * from './shutdown.js' diff --git a/packages/tests/src/shared/mock-servers/mock-upload.ts b/packages/tests/src/shared/mock-servers/mock-upload.ts new file mode 100644 index 000000000..9f25cd0f9 --- /dev/null +++ b/packages/tests/src/shared/mock-servers/mock-upload.ts @@ -0,0 +1,42 @@ +import express from 'express' +import { Server } from 'http' +import multer from 'multer' +import { getPort, randomListen, terminateServer } from './shared.js' +import { HttpStatusCode } from '../../../../models/src/http/http-status-codes.js' + +export class MockUpload { + private server: Server + + private uploads: { method: string, file: Buffer }[] = [] + + async initialize () { + const app = express() + + app.all( + '/upload-file', + multer({ storage: multer.memoryStorage() }).single('file'), + (req: express.Request, res: express.Response, next: express.NextFunction) => { + if (process.env.DEBUG) console.log('Receiving request on upload mock server.', req.url) + + this.uploads.push({ method: req.method, file: req.file.buffer }) + + return res.sendStatus(HttpStatusCode.NO_CONTENT_204) + }) + + app.get('/uploaded-files', (req: express.Request, res: express.Response) => { + return res.json(this.uploads) + }) + + this.server = await randomListen(app) + + return getPort(this.server) + } + + cleanUpload () { + this.uploads = [] + } + + terminate () { + return terminateServer(this.server) + } +} diff --git a/packages/tests/src/shared/peertube-runner-process.ts b/packages/tests/src/shared/peertube-runner-process.ts index 37ea350fd..e215fa826 100644 --- a/packages/tests/src/shared/peertube-runner-process.ts +++ b/packages/tests/src/shared/peertube-runner-process.ts @@ -1,9 +1,9 @@ +import { RunnerJobType } from '@peertube/peertube-models' +import { root } from '@peertube/peertube-node-utils' +import { PeerTubeServer } from '@peertube/peertube-server-commands' import { ChildProcess, fork, ForkOptions } from 'child_process' import { execaNode } from 'execa' import { join } from 'path' -import { root } from '@peertube/peertube-node-utils' -import { PeerTubeServer } from '@peertube/peertube-server-commands' -import { RunnerJobType } from '../../../models/src/runners/runner-job-type.type.js' export class PeerTubeRunnerProcess { private app?: ChildProcess diff --git a/packages/tests/src/shared/sql-command.ts b/packages/tests/src/shared/sql-command.ts index 2073efc2a..6c0365552 100644 --- a/packages/tests/src/shared/sql-command.ts +++ b/packages/tests/src/shared/sql-command.ts @@ -1,7 +1,7 @@ -import { QueryTypes, Sequelize } from 'sequelize' import { forceNumber } from '@peertube/peertube-core-utils' +import { FileStorageType, RunnerJobPayload } from '@peertube/peertube-models' import { PeerTubeServer } from '@peertube/peertube-server-commands' -import { FileStorageType } from '@peertube/peertube-models' +import { QueryTypes, Sequelize } from 'sequelize' export class SQLCommand { private sequelize: Sequelize @@ -141,6 +141,17 @@ export class SQLCommand { ) } + // --------------------------------------------------------------------------- + + setRunnerJobPayload (uuid: string, payload: RunnerJobPayload) { + return this.updateQuery( + `UPDATE "runnerJob" SET "payload" = :payload WHERE "uuid" = :uuid`, + { uuid, payload: JSON.stringify(payload) } + ) + } + + // --------------------------------------------------------------------------- + async cleanup () { if (!this.sequelize) return diff --git a/server/core/lib/runners/job-handlers/transcription-job-handler.ts b/server/core/lib/runners/job-handlers/transcription-job-handler.ts index 49bbf2bfe..9a87b3d4a 100644 --- a/server/core/lib/runners/job-handlers/transcription-job-handler.ts +++ b/server/core/lib/runners/job-handlers/transcription-job-handler.ts @@ -60,7 +60,8 @@ export class TranscriptionJobHandler extends AbstractJobHandler { if (isVideoStudioTaskIntro(t) || isVideoStudioTaskOutro(t)) { return {