mirror of
https://github.com/Chocobozzz/PeerTube.git
synced 2025-10-03 09:49:20 +02:00
Add custom upload ability for runners
This commit is contained in:
parent
33a68f74dd
commit
5b4c7fc20d
25 changed files with 421 additions and 87 deletions
|
@ -61,8 +61,12 @@ export function scheduleTranscodingProgress (options: {
|
||||||
: 60000
|
: 60000
|
||||||
|
|
||||||
const update = () => {
|
const update = () => {
|
||||||
server.runnerJobs.update({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken, progress: progressGetter() })
|
server.runnerJobs.update({
|
||||||
.catch(err => logger.error({ err }, 'Cannot send job progress'))
|
jobToken: job.jobToken,
|
||||||
|
jobUUID: job.uuid,
|
||||||
|
runnerToken,
|
||||||
|
progress: progressGetter()
|
||||||
|
}).catch(err => logger.error({ err }, 'Cannot send job progress'))
|
||||||
}
|
}
|
||||||
|
|
||||||
const interval = setInterval(() => {
|
const interval = setInterval(() => {
|
||||||
|
|
|
@ -50,22 +50,23 @@ export class ProcessLiveRTMPHLSTranscoding {
|
||||||
logger.debug(`Using ${this.outputPath} to process live rtmp hls transcoding job ${options.job.uuid}`)
|
logger.debug(`Using ${this.outputPath} to process live rtmp hls transcoding job ${options.job.uuid}`)
|
||||||
}
|
}
|
||||||
|
|
||||||
process () {
|
private get payload () {
|
||||||
const job = this.options.job
|
return this.options.job.payload
|
||||||
const payload = job.payload
|
}
|
||||||
|
|
||||||
|
process () {
|
||||||
return new Promise<void>(async (res, rej) => {
|
return new Promise<void>(async (res, rej) => {
|
||||||
try {
|
try {
|
||||||
await ensureDir(this.outputPath)
|
await ensureDir(this.outputPath)
|
||||||
|
|
||||||
logger.info(`Probing ${payload.input.rtmpUrl}`)
|
logger.info(`Probing ${this.payload.input.rtmpUrl}`)
|
||||||
const probe = await ffprobePromise(payload.input.rtmpUrl)
|
const probe = await ffprobePromise(this.payload.input.rtmpUrl)
|
||||||
logger.info({ probe }, `Probed ${payload.input.rtmpUrl}`)
|
logger.info({ probe }, `Probed ${this.payload.input.rtmpUrl}`)
|
||||||
|
|
||||||
const hasAudio = await hasAudioStream(payload.input.rtmpUrl, probe)
|
const hasAudio = await hasAudioStream(this.payload.input.rtmpUrl, probe)
|
||||||
const hasVideo = await hasVideoStream(payload.input.rtmpUrl, probe)
|
const hasVideo = await hasVideoStream(this.payload.input.rtmpUrl, probe)
|
||||||
const bitrate = await getVideoStreamBitrate(payload.input.rtmpUrl, probe)
|
const bitrate = await getVideoStreamBitrate(this.payload.input.rtmpUrl, probe)
|
||||||
const { ratio } = await getVideoStreamDimensionsInfo(payload.input.rtmpUrl, probe)
|
const { ratio } = await getVideoStreamDimensionsInfo(this.payload.input.rtmpUrl, probe)
|
||||||
|
|
||||||
const m3u8Watcher = watch(this.outputPath + '/*.m3u8')
|
const m3u8Watcher = watch(this.outputPath + '/*.m3u8')
|
||||||
this.fsWatchers.push(m3u8Watcher)
|
this.fsWatchers.push(m3u8Watcher)
|
||||||
|
@ -107,15 +108,15 @@ export class ProcessLiveRTMPHLSTranscoding {
|
||||||
})
|
})
|
||||||
|
|
||||||
this.ffmpegCommand = await buildFFmpegLive().getLiveTranscodingCommand({
|
this.ffmpegCommand = await buildFFmpegLive().getLiveTranscodingCommand({
|
||||||
inputUrl: payload.input.rtmpUrl,
|
inputUrl: this.payload.input.rtmpUrl,
|
||||||
|
|
||||||
outPath: this.outputPath,
|
outPath: this.outputPath,
|
||||||
masterPlaylistName: 'master.m3u8',
|
masterPlaylistName: 'master.m3u8',
|
||||||
|
|
||||||
segmentListSize: payload.output.segmentListSize,
|
segmentListSize: this.payload.output.segmentListSize,
|
||||||
segmentDuration: payload.output.segmentDuration,
|
segmentDuration: this.payload.output.segmentDuration,
|
||||||
|
|
||||||
toTranscode: payload.output.toTranscode,
|
toTranscode: this.payload.output.toTranscode,
|
||||||
splitAudioAndVideo: true,
|
splitAudioAndVideo: true,
|
||||||
|
|
||||||
bitrate,
|
bitrate,
|
||||||
|
@ -126,7 +127,7 @@ export class ProcessLiveRTMPHLSTranscoding {
|
||||||
probe
|
probe
|
||||||
})
|
})
|
||||||
|
|
||||||
logger.info(`Running live transcoding for ${payload.input.rtmpUrl}`)
|
logger.info(`Running live transcoding for ${this.payload.input.rtmpUrl}`)
|
||||||
|
|
||||||
this.ffmpegCommand.on('error', (err, stdout, stderr) => {
|
this.ffmpegCommand.on('error', (err, stdout, stderr) => {
|
||||||
this.onFFmpegError({ err, stdout, stderr })
|
this.onFFmpegError({ err, stdout, stderr })
|
||||||
|
@ -241,7 +242,8 @@ export class ProcessLiveRTMPHLSTranscoding {
|
||||||
jobToken: this.options.job.jobToken,
|
jobToken: this.options.job.jobToken,
|
||||||
jobUUID: this.options.job.uuid,
|
jobUUID: this.options.job.uuid,
|
||||||
runnerToken: this.options.runnerToken,
|
runnerToken: this.options.runnerToken,
|
||||||
payload: successBody
|
payload: successBody,
|
||||||
|
reqPayload: this.payload
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -324,7 +326,7 @@ export class ProcessLiveRTMPHLSTranscoding {
|
||||||
await Promise.all(parallelPromises)
|
await Promise.all(parallelPromises)
|
||||||
}
|
}
|
||||||
|
|
||||||
private async updateWithRetry (payload: CustomLiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise<any> {
|
private async updateWithRetry (updatePayload: CustomLiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise<any> {
|
||||||
if (this.ended || this.errored) return
|
if (this.ended || this.errored) return
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -332,7 +334,8 @@ export class ProcessLiveRTMPHLSTranscoding {
|
||||||
jobToken: this.options.job.jobToken,
|
jobToken: this.options.job.jobToken,
|
||||||
jobUUID: this.options.job.uuid,
|
jobUUID: this.options.job.uuid,
|
||||||
runnerToken: this.options.runnerToken,
|
runnerToken: this.options.runnerToken,
|
||||||
payload: payload as any
|
payload: updatePayload as any,
|
||||||
|
reqPayload: this.payload
|
||||||
})
|
})
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
if (currentTry >= 3) throw err
|
if (currentTry >= 3) throw err
|
||||||
|
@ -341,7 +344,7 @@ export class ProcessLiveRTMPHLSTranscoding {
|
||||||
logger.warn({ err }, 'Will retry update after error')
|
logger.warn({ err }, 'Will retry update after error')
|
||||||
await wait(250)
|
await wait(250)
|
||||||
|
|
||||||
return this.updateWithRetry(payload, currentTry + 1)
|
return this.updateWithRetry(updatePayload, currentTry + 1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -86,7 +86,8 @@ export async function processStudioTranscoding (options: ProcessOptions<RunnerJo
|
||||||
jobToken: job.jobToken,
|
jobToken: job.jobToken,
|
||||||
jobUUID: job.uuid,
|
jobUUID: job.uuid,
|
||||||
runnerToken,
|
runnerToken,
|
||||||
payload: successBody
|
payload: successBody,
|
||||||
|
reqPayload: payload
|
||||||
})
|
})
|
||||||
} finally {
|
} finally {
|
||||||
if (tmpVideoInputFilePath) await remove(tmpVideoInputFilePath)
|
if (tmpVideoInputFilePath) await remove(tmpVideoInputFilePath)
|
||||||
|
|
|
@ -69,7 +69,8 @@ export async function processVideoTranscription (options: ProcessOptions<RunnerJ
|
||||||
jobToken: job.jobToken,
|
jobToken: job.jobToken,
|
||||||
jobUUID: job.uuid,
|
jobUUID: job.uuid,
|
||||||
runnerToken,
|
runnerToken,
|
||||||
payload: successBody
|
payload: successBody,
|
||||||
|
reqPayload: payload
|
||||||
})
|
})
|
||||||
} finally {
|
} finally {
|
||||||
if (inputPath) await remove(inputPath)
|
if (inputPath) await remove(inputPath)
|
||||||
|
|
|
@ -71,7 +71,8 @@ export async function processWebVideoTranscoding (options: ProcessOptions<Runner
|
||||||
jobToken: job.jobToken,
|
jobToken: job.jobToken,
|
||||||
jobUUID: job.uuid,
|
jobUUID: job.uuid,
|
||||||
runnerToken,
|
runnerToken,
|
||||||
payload: successBody
|
payload: successBody,
|
||||||
|
reqPayload: payload
|
||||||
})
|
})
|
||||||
} finally {
|
} finally {
|
||||||
if (videoInputPath) await remove(videoInputPath)
|
if (videoInputPath) await remove(videoInputPath)
|
||||||
|
@ -139,7 +140,8 @@ export async function processHLSTranscoding (options: ProcessOptions<RunnerJobVO
|
||||||
jobToken: job.jobToken,
|
jobToken: job.jobToken,
|
||||||
jobUUID: job.uuid,
|
jobUUID: job.uuid,
|
||||||
runnerToken,
|
runnerToken,
|
||||||
payload: successBody
|
payload: successBody,
|
||||||
|
reqPayload: payload
|
||||||
})
|
})
|
||||||
} finally {
|
} finally {
|
||||||
if (videoInputPath) await remove(videoInputPath)
|
if (videoInputPath) await remove(videoInputPath)
|
||||||
|
@ -207,7 +209,8 @@ export async function processAudioMergeTranscoding (options: ProcessOptions<Runn
|
||||||
jobToken: job.jobToken,
|
jobToken: job.jobToken,
|
||||||
jobUUID: job.uuid,
|
jobUUID: job.uuid,
|
||||||
runnerToken,
|
runnerToken,
|
||||||
payload: successBody
|
payload: successBody,
|
||||||
|
reqPayload: payload
|
||||||
})
|
})
|
||||||
} finally {
|
} finally {
|
||||||
if (audioPath) await remove(audioPath)
|
if (audioPath) await remove(audioPath)
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
import { RunnerJobPayload } from './runner-job-payload.model.js'
|
import { RunnerJobPayload } from './runner-jobs/runner-job-payload.model.js'
|
||||||
import { RunnerJob } from './runner-job.model.js'
|
import { RunnerJob } from './runner-jobs/runner-job.model.js'
|
||||||
|
|
||||||
export interface AcceptRunnerJobResult <T extends RunnerJobPayload = RunnerJobPayload> {
|
export interface AcceptRunnerJobResult <T extends RunnerJobPayload = RunnerJobPayload> {
|
||||||
job: RunnerJob<T> & { jobToken: string }
|
job: RunnerJob<T> & { jobToken: string }
|
||||||
|
|
|
@ -9,13 +9,13 @@ export * from './register-runner-body.model.js'
|
||||||
export * from './register-runner-result.model.js'
|
export * from './register-runner-result.model.js'
|
||||||
export * from './request-runner-job-body.model.js'
|
export * from './request-runner-job-body.model.js'
|
||||||
export * from './request-runner-job-result.model.js'
|
export * from './request-runner-job-result.model.js'
|
||||||
export * from './runner-job-payload.model.js'
|
export * from './runner-jobs/runner-job-payload.model.js'
|
||||||
export * from './runner-job-private-payload.model.js'
|
export * from './runner-jobs/runner-job-private-payload.model.js'
|
||||||
export * from './runner-job-state.model.js'
|
export * from './runner-jobs/runner-job-state.model.js'
|
||||||
export * from './runner-job-success-body.model.js'
|
export * from './runner-jobs/runner-job-success-body.model.js'
|
||||||
export * from './runner-job-type.type.js'
|
export * from './runner-jobs/runner-job-type.type.js'
|
||||||
export * from './runner-job-update-body.model.js'
|
export * from './runner-jobs/runner-job-update-body.model.js'
|
||||||
export * from './runner-job.model.js'
|
export * from './runner-jobs/runner-job.model.js'
|
||||||
export * from './runner-registration-token.js'
|
export * from './runner-registration-token.js'
|
||||||
export * from './runner.model.js'
|
export * from './runner.model.js'
|
||||||
export * from './unregister-runner-body.model.js'
|
export * from './unregister-runner-body.model.js'
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
import { RunnerJobStateType } from './runner-job-state.model.js'
|
import { RunnerJobStateType } from './runner-jobs/runner-job-state.model.js'
|
||||||
|
|
||||||
export interface ListRunnerJobsQuery {
|
export interface ListRunnerJobsQuery {
|
||||||
start?: number
|
start?: number
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
import { RunnerJobType } from './runner-job-type.type.js'
|
import { RunnerJobType } from './runner-jobs/runner-job-type.type.js'
|
||||||
|
|
||||||
export interface RequestRunnerJobBody {
|
export interface RequestRunnerJobBody {
|
||||||
runnerToken: string
|
runnerToken: string
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
import { RunnerJobPayload } from './runner-job-payload.model.js'
|
import { RunnerJobPayload } from './runner-jobs/runner-job-payload.model.js'
|
||||||
import { RunnerJobType } from './runner-job-type.type.js'
|
import { RunnerJobType } from './runner-jobs/runner-job-type.type.js'
|
||||||
|
|
||||||
export interface RequestRunnerJobResult <P extends RunnerJobPayload = RunnerJobPayload> {
|
export interface RequestRunnerJobResult <P extends RunnerJobPayload = RunnerJobPayload> {
|
||||||
availableJobs: {
|
availableJobs: {
|
||||||
|
|
|
@ -1,4 +1,9 @@
|
||||||
import { VideoStudioTaskPayload } from '../server/index.js'
|
import { VideoStudioTaskPayload } from '../../server/index.js'
|
||||||
|
|
||||||
|
export type RunnerJobCustomUpload = {
|
||||||
|
url: string
|
||||||
|
method?: 'PUT' | 'POST' // default 'PUT'
|
||||||
|
}
|
||||||
|
|
||||||
export type RunnerJobVODPayload =
|
export type RunnerJobVODPayload =
|
||||||
RunnerJobVODWebVideoTranscodingPayload |
|
RunnerJobVODWebVideoTranscodingPayload |
|
||||||
|
@ -22,6 +27,9 @@ export interface RunnerJobVODWebVideoTranscodingPayload {
|
||||||
output: {
|
output: {
|
||||||
resolution: number
|
resolution: number
|
||||||
fps: number
|
fps: number
|
||||||
|
|
||||||
|
// To upload on an external URL
|
||||||
|
videoFileCustomUpload?: RunnerJobCustomUpload
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -35,6 +43,10 @@ export interface RunnerJobVODHLSTranscodingPayload {
|
||||||
resolution: number
|
resolution: number
|
||||||
fps: number
|
fps: number
|
||||||
separatedAudio: boolean
|
separatedAudio: boolean
|
||||||
|
|
||||||
|
// To upload on an external URL
|
||||||
|
videoFileCustomUpload?: RunnerJobCustomUpload
|
||||||
|
resolutionPlaylistFileCustomUpload?: RunnerJobCustomUpload
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -47,6 +59,9 @@ export interface RunnerJobVODAudioMergeTranscodingPayload {
|
||||||
output: {
|
output: {
|
||||||
resolution: number
|
resolution: number
|
||||||
fps: number
|
fps: number
|
||||||
|
|
||||||
|
// To upload on an external URL
|
||||||
|
videoFileCustomUpload?: RunnerJobCustomUpload
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -57,12 +72,22 @@ export interface RunnerJobStudioTranscodingPayload {
|
||||||
}
|
}
|
||||||
|
|
||||||
tasks: VideoStudioTaskPayload[]
|
tasks: VideoStudioTaskPayload[]
|
||||||
|
|
||||||
|
output: {
|
||||||
|
// To upload on an external URL
|
||||||
|
videoFileCustomUpload?: RunnerJobCustomUpload
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface RunnerJobTranscriptionPayload {
|
export interface RunnerJobTranscriptionPayload {
|
||||||
input: {
|
input: {
|
||||||
videoFileUrl: string
|
videoFileUrl: string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
output: {
|
||||||
|
// To upload on an external URL
|
||||||
|
vttFileCustomUpload?: RunnerJobCustomUpload
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
@ -86,5 +111,10 @@ export interface RunnerJobLiveRTMPHLSTranscodingPayload {
|
||||||
|
|
||||||
segmentDuration: number
|
segmentDuration: number
|
||||||
segmentListSize: number
|
segmentListSize: number
|
||||||
|
|
||||||
|
// To upload on an external URL
|
||||||
|
masterPlaylistFileCustomUpload?: RunnerJobCustomUpload
|
||||||
|
resolutionPlaylistFileCustomUpload?: RunnerJobCustomUpload
|
||||||
|
videoChunkFileCustomUpload?: RunnerJobCustomUpload
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
import { VideoStudioTaskPayload } from '../server/index.js'
|
import { VideoStudioTaskPayload } from '../../server/index.js'
|
||||||
|
|
||||||
export type RunnerJobVODPrivatePayload =
|
export type RunnerJobVODPrivatePayload =
|
||||||
RunnerJobVODWebVideoTranscodingPrivatePayload |
|
RunnerJobVODWebVideoTranscodingPrivatePayload |
|
|
@ -1,4 +1,4 @@
|
||||||
import { VideoConstant } from '../videos/index.js'
|
import { VideoConstant } from '../../videos/index.js'
|
||||||
import { RunnerJobPayload } from './runner-job-payload.model.js'
|
import { RunnerJobPayload } from './runner-job-payload.model.js'
|
||||||
import { RunnerJobPrivatePayload } from './runner-job-private-payload.model.js'
|
import { RunnerJobPrivatePayload } from './runner-job-private-payload.model.js'
|
||||||
import { RunnerJobStateType } from './runner-job-state.model.js'
|
import { RunnerJobStateType } from './runner-job-state.model.js'
|
|
@ -10,14 +10,18 @@ import {
|
||||||
RequestRunnerJobResult,
|
RequestRunnerJobResult,
|
||||||
ResultList,
|
ResultList,
|
||||||
RunnerJobAdmin,
|
RunnerJobAdmin,
|
||||||
|
RunnerJobCustomUpload,
|
||||||
RunnerJobLiveRTMPHLSTranscodingPayload,
|
RunnerJobLiveRTMPHLSTranscodingPayload,
|
||||||
RunnerJobPayload,
|
RunnerJobPayload,
|
||||||
RunnerJobState,
|
RunnerJobState,
|
||||||
RunnerJobStateType,
|
RunnerJobStateType,
|
||||||
RunnerJobSuccessBody,
|
RunnerJobSuccessBody,
|
||||||
RunnerJobSuccessPayload,
|
RunnerJobSuccessPayload,
|
||||||
|
RunnerJobTranscriptionPayload,
|
||||||
RunnerJobType,
|
RunnerJobType,
|
||||||
RunnerJobUpdateBody,
|
RunnerJobUpdateBody,
|
||||||
|
RunnerJobVODAudioMergeTranscodingPayload,
|
||||||
|
RunnerJobVODHLSTranscodingPayload,
|
||||||
RunnerJobVODPayload,
|
RunnerJobVODPayload,
|
||||||
TranscriptionSuccess,
|
TranscriptionSuccess,
|
||||||
VODHLSTranscodingSuccess,
|
VODHLSTranscodingSuccess,
|
||||||
|
@ -133,39 +137,6 @@ export class RunnerJobsCommand extends AbstractCommand {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
update (options: OverrideCommandOptions & RunnerJobUpdateBody & { jobUUID: string }) {
|
|
||||||
const path = '/api/v1/runners/jobs/' + options.jobUUID + '/update'
|
|
||||||
|
|
||||||
const { payload } = options
|
|
||||||
const attaches: { [id: string]: any } = {}
|
|
||||||
let payloadWithoutFiles = payload
|
|
||||||
|
|
||||||
if (isLiveRTMPHLSTranscodingUpdatePayload(payload)) {
|
|
||||||
if (payload.masterPlaylistFile) {
|
|
||||||
attaches[`payload[masterPlaylistFile]`] = payload.masterPlaylistFile
|
|
||||||
}
|
|
||||||
|
|
||||||
attaches[`payload[resolutionPlaylistFile]`] = payload.resolutionPlaylistFile
|
|
||||||
attaches[`payload[videoChunkFile]`] = payload.videoChunkFile
|
|
||||||
|
|
||||||
payloadWithoutFiles = omit(payloadWithoutFiles, [ 'masterPlaylistFile', 'resolutionPlaylistFile', 'videoChunkFile' ])
|
|
||||||
}
|
|
||||||
|
|
||||||
return this.postUploadRequest({
|
|
||||||
...options,
|
|
||||||
|
|
||||||
path,
|
|
||||||
fields: {
|
|
||||||
...pick(options, [ 'progress', 'jobToken', 'runnerToken' ]),
|
|
||||||
|
|
||||||
payload: payloadWithoutFiles
|
|
||||||
},
|
|
||||||
attaches,
|
|
||||||
implicitToken: false,
|
|
||||||
defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
error (options: OverrideCommandOptions & ErrorRunnerJobBody & { jobUUID: string }) {
|
error (options: OverrideCommandOptions & ErrorRunnerJobBody & { jobUUID: string }) {
|
||||||
const path = '/api/v1/runners/jobs/' + options.jobUUID + '/error'
|
const path = '/api/v1/runners/jobs/' + options.jobUUID + '/error'
|
||||||
|
|
||||||
|
@ -179,32 +150,123 @@ export class RunnerJobsCommand extends AbstractCommand {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
success (options: OverrideCommandOptions & RunnerJobSuccessBody & { jobUUID: string }) {
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
update (options: OverrideCommandOptions & RunnerJobUpdateBody & { jobUUID: string, reqPayload?: RunnerJobPayload }) {
|
||||||
|
const path = '/api/v1/runners/jobs/' + options.jobUUID + '/update'
|
||||||
|
|
||||||
|
const { payload } = options
|
||||||
|
const attaches: { [id: string]: any } = {}
|
||||||
|
const customUploads: (RunnerJobCustomUpload & { file: Blob | string })[] = []
|
||||||
|
|
||||||
|
let payloadWithoutFiles = payload
|
||||||
|
|
||||||
|
if (isLiveRTMPHLSTranscodingUpdatePayload(payload)) {
|
||||||
|
const reqPayload = options.reqPayload as RunnerJobLiveRTMPHLSTranscodingPayload
|
||||||
|
|
||||||
|
if (payload.masterPlaylistFile) {
|
||||||
|
this.updateUploadPayloads({
|
||||||
|
attachesStore: attaches,
|
||||||
|
customUploadsStore: customUploads,
|
||||||
|
|
||||||
|
file: payload.masterPlaylistFile,
|
||||||
|
attachName: 'masterPlaylistFile',
|
||||||
|
customUpload: reqPayload?.output?.masterPlaylistFileCustomUpload
|
||||||
|
})
|
||||||
|
|
||||||
|
attaches[`payload[masterPlaylistFile]`] = payload.masterPlaylistFile
|
||||||
|
}
|
||||||
|
|
||||||
|
this.updateUploadPayloads({
|
||||||
|
attachesStore: attaches,
|
||||||
|
customUploadsStore: customUploads,
|
||||||
|
|
||||||
|
file: payload.resolutionPlaylistFile,
|
||||||
|
attachName: 'resolutionPlaylistFile',
|
||||||
|
customUpload: reqPayload?.output?.resolutionPlaylistFileCustomUpload
|
||||||
|
})
|
||||||
|
|
||||||
|
this.updateUploadPayloads({
|
||||||
|
attachesStore: attaches,
|
||||||
|
customUploadsStore: customUploads,
|
||||||
|
|
||||||
|
file: payload.videoChunkFile,
|
||||||
|
attachName: 'videoChunkFile',
|
||||||
|
customUpload: reqPayload?.output?.videoChunkFileCustomUpload
|
||||||
|
})
|
||||||
|
|
||||||
|
payloadWithoutFiles = omit(payloadWithoutFiles, [ 'masterPlaylistFile', 'resolutionPlaylistFile', 'videoChunkFile' ])
|
||||||
|
}
|
||||||
|
|
||||||
|
return this.uploadRunnerJobRequest({
|
||||||
|
...options,
|
||||||
|
|
||||||
|
path,
|
||||||
|
fields: {
|
||||||
|
...pick(options, [ 'progress', 'jobToken', 'runnerToken' ]),
|
||||||
|
|
||||||
|
payload: payloadWithoutFiles
|
||||||
|
},
|
||||||
|
attaches,
|
||||||
|
customUploads
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
success (options: OverrideCommandOptions & RunnerJobSuccessBody & { jobUUID: string, reqPayload?: RunnerJobPayload }) {
|
||||||
const { payload } = options
|
const { payload } = options
|
||||||
|
|
||||||
const path = '/api/v1/runners/jobs/' + options.jobUUID + '/success'
|
const path = '/api/v1/runners/jobs/' + options.jobUUID + '/success'
|
||||||
const attaches: { [id: string]: any } = {}
|
const attaches: { [id: string]: any } = {}
|
||||||
|
const customUploads: (RunnerJobCustomUpload & { file: Blob | string })[] = []
|
||||||
|
|
||||||
let payloadWithoutFiles = payload
|
let payloadWithoutFiles = payload
|
||||||
|
|
||||||
if ((isWebVideoOrAudioMergeTranscodingPayloadSuccess(payload) || isHLSTranscodingPayloadSuccess(payload)) && payload.videoFile) {
|
if ((isWebVideoOrAudioMergeTranscodingPayloadSuccess(payload) || isHLSTranscodingPayloadSuccess(payload)) && payload.videoFile) {
|
||||||
attaches[`payload[videoFile]`] = payload.videoFile
|
const reqPayload = options.reqPayload as RunnerJobVODAudioMergeTranscodingPayload | RunnerJobVODHLSTranscodingPayload
|
||||||
|
|
||||||
|
this.updateUploadPayloads({
|
||||||
|
attachesStore: attaches,
|
||||||
|
customUploadsStore: customUploads,
|
||||||
|
|
||||||
|
file: payload.videoFile,
|
||||||
|
attachName: 'videoFile',
|
||||||
|
customUpload: reqPayload?.output?.videoFileCustomUpload
|
||||||
|
})
|
||||||
|
|
||||||
payloadWithoutFiles = omit(payloadWithoutFiles as VODWebVideoTranscodingSuccess, [ 'videoFile' ])
|
payloadWithoutFiles = omit(payloadWithoutFiles as VODWebVideoTranscodingSuccess, [ 'videoFile' ])
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isHLSTranscodingPayloadSuccess(payload) && payload.resolutionPlaylistFile) {
|
if (isHLSTranscodingPayloadSuccess(payload) && payload.resolutionPlaylistFile) {
|
||||||
attaches[`payload[resolutionPlaylistFile]`] = payload.resolutionPlaylistFile
|
const reqPayload = options.reqPayload as RunnerJobVODHLSTranscodingPayload
|
||||||
|
|
||||||
|
this.updateUploadPayloads({
|
||||||
|
attachesStore: attaches,
|
||||||
|
customUploadsStore: customUploads,
|
||||||
|
|
||||||
|
file: payload.resolutionPlaylistFile,
|
||||||
|
attachName: 'resolutionPlaylistFile',
|
||||||
|
customUpload: reqPayload?.output?.resolutionPlaylistFileCustomUpload
|
||||||
|
})
|
||||||
|
|
||||||
payloadWithoutFiles = omit(payloadWithoutFiles as VODHLSTranscodingSuccess, [ 'resolutionPlaylistFile' ])
|
payloadWithoutFiles = omit(payloadWithoutFiles as VODHLSTranscodingSuccess, [ 'resolutionPlaylistFile' ])
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isTranscriptionPayloadSuccess(payload) && payload.vttFile) {
|
if (isTranscriptionPayloadSuccess(payload) && payload.vttFile) {
|
||||||
attaches[`payload[vttFile]`] = payload.vttFile
|
const reqPayload = options.reqPayload as RunnerJobTranscriptionPayload
|
||||||
|
|
||||||
|
this.updateUploadPayloads({
|
||||||
|
attachesStore: attaches,
|
||||||
|
customUploadsStore: customUploads,
|
||||||
|
|
||||||
|
file: payload.vttFile,
|
||||||
|
attachName: 'vttFile',
|
||||||
|
customUpload: reqPayload?.output?.vttFileCustomUpload
|
||||||
|
})
|
||||||
|
|
||||||
payloadWithoutFiles = omit(payloadWithoutFiles as TranscriptionSuccess, [ 'vttFile' ])
|
payloadWithoutFiles = omit(payloadWithoutFiles as TranscriptionSuccess, [ 'vttFile' ])
|
||||||
}
|
}
|
||||||
|
|
||||||
return this.postUploadRequest({
|
return this.uploadRunnerJobRequest({
|
||||||
...options,
|
...options,
|
||||||
|
|
||||||
path,
|
path,
|
||||||
|
@ -214,11 +276,63 @@ export class RunnerJobsCommand extends AbstractCommand {
|
||||||
|
|
||||||
payload: payloadWithoutFiles
|
payload: payloadWithoutFiles
|
||||||
},
|
},
|
||||||
|
customUploads
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
private updateUploadPayloads (options: {
|
||||||
|
file: Blob | string
|
||||||
|
customUpload?: RunnerJobCustomUpload
|
||||||
|
attachName: string
|
||||||
|
|
||||||
|
attachesStore: Record<string, string | Blob>
|
||||||
|
customUploadsStore: (RunnerJobCustomUpload & { file: Blob | string })[]
|
||||||
|
}) {
|
||||||
|
if (options.customUpload) {
|
||||||
|
options.customUploadsStore.push({ ...options.customUpload, file: options.file })
|
||||||
|
} else {
|
||||||
|
options.attachesStore[`payload[${options.attachName}]`] = options.file
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async uploadRunnerJobRequest (options: OverrideCommandOptions & {
|
||||||
|
path: string
|
||||||
|
|
||||||
|
fields: { [ fieldName: string ]: any }
|
||||||
|
attaches: { [ fieldName: string ]: any }
|
||||||
|
|
||||||
|
customUploads?: (RunnerJobCustomUpload & { file: string | Blob })[]
|
||||||
|
}) {
|
||||||
|
for (const customUpload of (options.customUploads || [])) {
|
||||||
|
await this.customUpload(customUpload)
|
||||||
|
}
|
||||||
|
|
||||||
|
await this.postUploadRequest({
|
||||||
|
...omit(options, [ 'customUploads' ]),
|
||||||
|
|
||||||
implicitToken: false,
|
implicitToken: false,
|
||||||
defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
|
defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private customUpload (options: RunnerJobCustomUpload & { file: Blob | string }) {
|
||||||
|
const parsedUrl = new URL(options.url)
|
||||||
|
|
||||||
|
const reqOptions = {
|
||||||
|
url: parsedUrl.origin,
|
||||||
|
path: parsedUrl.pathname,
|
||||||
|
attaches: { file: options.file },
|
||||||
|
implicitToken: false,
|
||||||
|
defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
|
||||||
|
}
|
||||||
|
|
||||||
|
if (options.method === 'POST') return this.postUploadRequest(reqOptions)
|
||||||
|
|
||||||
|
return this.putUploadRequest(reqOptions)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
getJobFile (options: OverrideCommandOptions & { url: string, jobToken: string, runnerToken: string }) {
|
getJobFile (options: OverrideCommandOptions & { url: string, jobToken: string, runnerToken: string }) {
|
||||||
const { host, protocol, pathname } = new URL(options.url)
|
const { host, protocol, pathname } = new URL(options.url)
|
||||||
|
|
||||||
|
@ -256,7 +370,7 @@ export class RunnerJobsCommand extends AbstractCommand {
|
||||||
const jobToken = job.jobToken
|
const jobToken = job.jobToken
|
||||||
|
|
||||||
const payload: RunnerJobSuccessPayload = { videoFile: 'video_short.mp4' }
|
const payload: RunnerJobSuccessPayload = { videoFile: 'video_short.mp4' }
|
||||||
await this.success({ runnerToken, jobUUID, jobToken, payload })
|
await this.success({ runnerToken, jobUUID, jobToken, payload, reqPayload: undefined })
|
||||||
|
|
||||||
await waitJobs([ this.server ])
|
await waitJobs([ this.server ])
|
||||||
|
|
||||||
|
|
122
packages/tests/src/peertube-runner/custom-upload.ts
Normal file
122
packages/tests/src/peertube-runner/custom-upload.ts
Normal file
|
@ -0,0 +1,122 @@
|
||||||
|
/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
|
||||||
|
|
||||||
|
import { HttpStatusCode, RunnerJobState, RunnerJobVODPayload } from '@peertube/peertube-models'
|
||||||
|
import {
|
||||||
|
cleanupTests,
|
||||||
|
createSingleServer,
|
||||||
|
makeGetRequest,
|
||||||
|
PeerTubeServer,
|
||||||
|
setAccessTokensToServers,
|
||||||
|
setDefaultVideoChannel,
|
||||||
|
waitJobs
|
||||||
|
} from '@peertube/peertube-server-commands'
|
||||||
|
import { MockUpload } from '@tests/shared/mock-servers/mock-upload.js'
|
||||||
|
import { PeerTubeRunnerProcess } from '@tests/shared/peertube-runner-process.js'
|
||||||
|
import { SQLCommand } from '@tests/shared/sql-command.js'
|
||||||
|
import { wait } from '../../../core-utils/src/common/time.js'
|
||||||
|
|
||||||
|
describe('Test peertube-runner custom upload', function () {
|
||||||
|
let server: PeerTubeServer
|
||||||
|
let peertubeRunner: PeerTubeRunnerProcess
|
||||||
|
|
||||||
|
let sqlCommand: SQLCommand
|
||||||
|
let mockUploadServerUrl: string
|
||||||
|
let transcoded: string
|
||||||
|
|
||||||
|
const mockUpload = new MockUpload()
|
||||||
|
|
||||||
|
async function registerRunner () {
|
||||||
|
const registrationToken = await server.runnerRegistrationTokens.getFirstRegistrationToken()
|
||||||
|
await peertubeRunner.registerPeerTubeInstance({ registrationToken, runnerName: 'runner' })
|
||||||
|
}
|
||||||
|
|
||||||
|
async function unregisterRunner () {
|
||||||
|
await peertubeRunner.unregisterPeerTubeInstance({ runnerName: 'runner' })
|
||||||
|
}
|
||||||
|
|
||||||
|
async function updatePayload (method?: 'PUT' | 'POST') {
|
||||||
|
const { data } = await server.runnerJobs.list({ stateOneOf: [ RunnerJobState.PENDING ] })
|
||||||
|
|
||||||
|
for (const job of data) {
|
||||||
|
const payload = job.payload as RunnerJobVODPayload
|
||||||
|
|
||||||
|
payload.output.videoFileCustomUpload = {
|
||||||
|
method,
|
||||||
|
url: mockUploadServerUrl + '/upload-file'
|
||||||
|
}
|
||||||
|
|
||||||
|
await sqlCommand.setRunnerJobPayload(job.uuid, payload)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
before(async function () {
|
||||||
|
this.timeout(120_000)
|
||||||
|
|
||||||
|
server = await createSingleServer(1)
|
||||||
|
|
||||||
|
await setAccessTokensToServers([ server ])
|
||||||
|
await setDefaultVideoChannel([ server ])
|
||||||
|
|
||||||
|
await server.config.enableTranscoding()
|
||||||
|
const { uuid } = await server.videos.quickUpload({ name: 'transcoded' })
|
||||||
|
transcoded = uuid
|
||||||
|
await waitJobs([ server ])
|
||||||
|
|
||||||
|
await server.config.enableRemoteTranscoding()
|
||||||
|
|
||||||
|
peertubeRunner = new PeerTubeRunnerProcess(server)
|
||||||
|
await peertubeRunner.runServer()
|
||||||
|
|
||||||
|
const uploadPort = await mockUpload.initialize()
|
||||||
|
mockUploadServerUrl = 'http://127.0.0.1:' + uploadPort
|
||||||
|
|
||||||
|
sqlCommand = new SQLCommand(server)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('Should upload the file on another endpoint for web video', async function () {
|
||||||
|
await server.videos.quickUpload({ name: 'video 1' })
|
||||||
|
await server.videos.quickUpload({ name: 'video 2' })
|
||||||
|
await waitJobs([ server ])
|
||||||
|
|
||||||
|
await updatePayload('POST')
|
||||||
|
await registerRunner()
|
||||||
|
|
||||||
|
do {
|
||||||
|
const { body } = await makeGetRequest({ url: mockUploadServerUrl, path: '/uploaded-files', expectedStatus: HttpStatusCode.OK_200 })
|
||||||
|
|
||||||
|
// 2 x 5 retries because the server doesn't accept non existing files
|
||||||
|
if (body.length === 10 && body.every(f => f.method === 'POST')) break
|
||||||
|
await wait(500)
|
||||||
|
} while (true)
|
||||||
|
|
||||||
|
await unregisterRunner()
|
||||||
|
mockUpload.cleanUpload()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('Should upload the file on another endpoint for HLS', async function () {
|
||||||
|
await server.videos.runTranscoding({ transcodingType: 'hls', videoId: transcoded })
|
||||||
|
await waitJobs([ server ])
|
||||||
|
|
||||||
|
await updatePayload()
|
||||||
|
await registerRunner()
|
||||||
|
|
||||||
|
do {
|
||||||
|
const { body } = await makeGetRequest({ url: mockUploadServerUrl, path: '/uploaded-files', expectedStatus: HttpStatusCode.OK_200 })
|
||||||
|
|
||||||
|
// 5 retries because the server doesn't accept non existing files
|
||||||
|
if (body.length === 5 && body.every(f => f.method === 'PUT')) break
|
||||||
|
await wait(500)
|
||||||
|
} while (true)
|
||||||
|
|
||||||
|
await unregisterRunner()
|
||||||
|
mockUpload.cleanUpload()
|
||||||
|
})
|
||||||
|
|
||||||
|
after(async function () {
|
||||||
|
peertubeRunner.kill()
|
||||||
|
|
||||||
|
await mockUpload.terminate()
|
||||||
|
await sqlCommand.cleanup()
|
||||||
|
await cleanupTests([ server ])
|
||||||
|
})
|
||||||
|
})
|
|
@ -1,4 +1,5 @@
|
||||||
export * from './client-cli.js'
|
export * from './client-cli.js'
|
||||||
|
export * from './custom-upload.js'
|
||||||
export * from './live-transcoding.js'
|
export * from './live-transcoding.js'
|
||||||
export * from './replace-file.js'
|
export * from './replace-file.js'
|
||||||
export * from './shutdown.js'
|
export * from './shutdown.js'
|
||||||
|
|
42
packages/tests/src/shared/mock-servers/mock-upload.ts
Normal file
42
packages/tests/src/shared/mock-servers/mock-upload.ts
Normal file
|
@ -0,0 +1,42 @@
|
||||||
|
import express from 'express'
|
||||||
|
import { Server } from 'http'
|
||||||
|
import multer from 'multer'
|
||||||
|
import { getPort, randomListen, terminateServer } from './shared.js'
|
||||||
|
import { HttpStatusCode } from '../../../../models/src/http/http-status-codes.js'
|
||||||
|
|
||||||
|
export class MockUpload {
|
||||||
|
private server: Server
|
||||||
|
|
||||||
|
private uploads: { method: string, file: Buffer }[] = []
|
||||||
|
|
||||||
|
async initialize () {
|
||||||
|
const app = express()
|
||||||
|
|
||||||
|
app.all(
|
||||||
|
'/upload-file',
|
||||||
|
multer({ storage: multer.memoryStorage() }).single('file'),
|
||||||
|
(req: express.Request, res: express.Response, next: express.NextFunction) => {
|
||||||
|
if (process.env.DEBUG) console.log('Receiving request on upload mock server.', req.url)
|
||||||
|
|
||||||
|
this.uploads.push({ method: req.method, file: req.file.buffer })
|
||||||
|
|
||||||
|
return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
|
||||||
|
})
|
||||||
|
|
||||||
|
app.get('/uploaded-files', (req: express.Request, res: express.Response) => {
|
||||||
|
return res.json(this.uploads)
|
||||||
|
})
|
||||||
|
|
||||||
|
this.server = await randomListen(app)
|
||||||
|
|
||||||
|
return getPort(this.server)
|
||||||
|
}
|
||||||
|
|
||||||
|
cleanUpload () {
|
||||||
|
this.uploads = []
|
||||||
|
}
|
||||||
|
|
||||||
|
terminate () {
|
||||||
|
return terminateServer(this.server)
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,9 +1,9 @@
|
||||||
|
import { RunnerJobType } from '@peertube/peertube-models'
|
||||||
|
import { root } from '@peertube/peertube-node-utils'
|
||||||
|
import { PeerTubeServer } from '@peertube/peertube-server-commands'
|
||||||
import { ChildProcess, fork, ForkOptions } from 'child_process'
|
import { ChildProcess, fork, ForkOptions } from 'child_process'
|
||||||
import { execaNode } from 'execa'
|
import { execaNode } from 'execa'
|
||||||
import { join } from 'path'
|
import { join } from 'path'
|
||||||
import { root } from '@peertube/peertube-node-utils'
|
|
||||||
import { PeerTubeServer } from '@peertube/peertube-server-commands'
|
|
||||||
import { RunnerJobType } from '../../../models/src/runners/runner-job-type.type.js'
|
|
||||||
|
|
||||||
export class PeerTubeRunnerProcess {
|
export class PeerTubeRunnerProcess {
|
||||||
private app?: ChildProcess
|
private app?: ChildProcess
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
import { QueryTypes, Sequelize } from 'sequelize'
|
|
||||||
import { forceNumber } from '@peertube/peertube-core-utils'
|
import { forceNumber } from '@peertube/peertube-core-utils'
|
||||||
|
import { FileStorageType, RunnerJobPayload } from '@peertube/peertube-models'
|
||||||
import { PeerTubeServer } from '@peertube/peertube-server-commands'
|
import { PeerTubeServer } from '@peertube/peertube-server-commands'
|
||||||
import { FileStorageType } from '@peertube/peertube-models'
|
import { QueryTypes, Sequelize } from 'sequelize'
|
||||||
|
|
||||||
export class SQLCommand {
|
export class SQLCommand {
|
||||||
private sequelize: Sequelize
|
private sequelize: Sequelize
|
||||||
|
@ -141,6 +141,17 @@ export class SQLCommand {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
setRunnerJobPayload (uuid: string, payload: RunnerJobPayload) {
|
||||||
|
return this.updateQuery(
|
||||||
|
`UPDATE "runnerJob" SET "payload" = :payload WHERE "uuid" = :uuid`,
|
||||||
|
{ uuid, payload: JSON.stringify(payload) }
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
async cleanup () {
|
async cleanup () {
|
||||||
if (!this.sequelize) return
|
if (!this.sequelize) return
|
||||||
|
|
||||||
|
|
|
@ -60,7 +60,8 @@ export class TranscriptionJobHandler extends AbstractJobHandler<CreateOptions, R
|
||||||
const payload: RunnerJobTranscriptionPayload = {
|
const payload: RunnerJobTranscriptionPayload = {
|
||||||
input: {
|
input: {
|
||||||
videoFileUrl: generateRunnerTranscodingAudioInputFileUrl(jobUUID, video.uuid)
|
videoFileUrl: generateRunnerTranscodingAudioInputFileUrl(jobUUID, video.uuid)
|
||||||
}
|
},
|
||||||
|
output: {}
|
||||||
}
|
}
|
||||||
|
|
||||||
const privatePayload: RunnerJobTranscriptionPrivatePayload = {
|
const privatePayload: RunnerJobTranscriptionPrivatePayload = {
|
||||||
|
|
|
@ -48,6 +48,7 @@ export class VideoStudioTranscodingJobHandler extends AbstractJobHandler<CreateO
|
||||||
? [ generateRunnerTranscodingAudioInputFileUrl(jobUUID, video.uuid) ]
|
? [ generateRunnerTranscodingAudioInputFileUrl(jobUUID, video.uuid) ]
|
||||||
: []
|
: []
|
||||||
},
|
},
|
||||||
|
output: {},
|
||||||
tasks: tasks.map(t => {
|
tasks: tasks.map(t => {
|
||||||
if (isVideoStudioTaskIntro(t) || isVideoStudioTaskOutro(t)) {
|
if (isVideoStudioTaskIntro(t) || isVideoStudioTaskOutro(t)) {
|
||||||
return {
|
return {
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue