From a9069d0d0bca7890f3662976e9ab1d43b2c68fc9 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 13 May 2025 13:54:19 +0200 Subject: [PATCH] Fix crash on download stream error --- .../tests/src/api/videos/generate-download.ts | 33 +++- server/core/lib/video-file.ts | 157 +++++++++++------- 2 files changed, 123 insertions(+), 67 deletions(-) diff --git a/packages/tests/src/api/videos/generate-download.ts b/packages/tests/src/api/videos/generate-download.ts index 0745ed843..f635ba29a 100644 --- a/packages/tests/src/api/videos/generate-download.ts +++ b/packages/tests/src/api/videos/generate-download.ts @@ -1,7 +1,7 @@ /* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */ import { getHLS } from '@peertube/peertube-core-utils' -import { VideoDetails, VideoFile, VideoResolution } from '@peertube/peertube-models' +import { HttpStatusCode, VideoDetails, VideoFile, VideoResolution } from '@peertube/peertube-models' import { buildSUUID } from '@peertube/peertube-node-utils' import { ObjectStorageCommand, @@ -17,8 +17,11 @@ import { checkTmpIsEmpty } from '@tests/shared/directories.js' import { probeResBody } from '@tests/shared/videos.js' import { expect } from 'chai' import { FfprobeData } from 'fluent-ffmpeg' +import { remove } from 'fs-extra' +import { basename } from 'path' describe('Test generate download', function () { + const resolutions = [ VideoResolution.H_NOVIDEO, VideoResolution.H_144P ] let servers: PeerTubeServer[] before(async function () { @@ -48,8 +51,6 @@ describe('Test generate download', function () { await server.run(objectStorage.getDefaultMockConfig()) } - const resolutions = [ VideoResolution.H_NOVIDEO, VideoResolution.H_144P ] - { await server.config.enableTranscoding({ hls: true, webVideo: true, splitAudioAndVideo: false, resolutions }) await server.videos.quickUpload({ name: 'common-' + seed }) @@ -132,13 +133,37 @@ describe('Test generate download', function () { }) } + describe('Download crash', function () { + it('Should not crash the server on non existing file', async function () { + this.timeout(120000) + + await servers[0].config.enableTranscoding({ webVideo: false, hls: true, splitAudioAndVideo: true, resolutions }) + const { uuid } = await servers[0].videos.quickUpload({ name: 'crash' }) + await waitJobs(servers) + + for (const server of servers) { + const video = await server.videos.get({ id: uuid }) + const file = getHLS(video).files.find(f => f.hasVideo) + + await remove(servers[0].getDirectoryPath('streaming-playlists/hls/' + uuid + '/' + basename(file.fileUrl))) + + await server.videos.generateDownload({ + videoId: uuid, + videoFileIds: [ file.id ], + expectedStatus: server === servers[0] + ? HttpStatusCode.OK_200 + : HttpStatusCode.INTERNAL_SERVER_ERROR_500 + }) + } + }) + }) + for (const objectStorage of [ undefined, new ObjectStorageCommand() ]) { const testName = objectStorage ? 'On Object Storage' : 'On filesystem' describe(testName, function () { - describe('Videos on local server', function () { runSuite(() => servers[0], objectStorage) }) diff --git a/server/core/lib/video-file.ts b/server/core/lib/video-file.ts index 2a6dde2f0..bebe11533 100644 --- a/server/core/lib/video-file.ts +++ b/server/core/lib/video-file.ts @@ -272,84 +272,112 @@ export async function muxToMergeVideoFiles (options: { const inputs: (string | Readable)[] = [] const tmpDestinations: string[] = [] + let ffmpegContainer: FFmpegContainer - try { - let maxResolution = 0 + return new Promise(async (res, rej) => { + const cleanup = async () => { + for (const destination of tmpDestinations) { + await remove(destination) + } - for (const videoFile of videoFiles) { - if (!videoFile) continue + for (const input of inputs) { + if (input instanceof Readable) { + if (!input.destroyed) input.destroy() + } + } - maxResolution = Math.max(maxResolution, videoFile.resolution) - - const { input, isTmpDestination } = await buildMuxInput(video, videoFile) - - inputs.push(input) - - if (isTmpDestination === true) tmpDestinations.push(input) + if (ffmpegContainer) { + ffmpegContainer.forceKill() + ffmpegContainer = undefined + } } - // Include cover to audio file? - const { coverPath, isTmpDestination } = maxResolution === 0 - ? await buildCoverInput(video) - : { coverPath: undefined, isTmpDestination: false } - - if (coverPath && isTmpDestination) tmpDestinations.push(coverPath) - - const inputsToLog = inputs.map(i => { - if (typeof i === 'string') return i - - return 'ReadableStream' - }) - - logger.info(`Muxing files for video ${video.url}`, { inputs: inputsToLog, ...lTags(video.uuid) }) - - const ffmpegContainer = new FFmpegContainer(getFFmpegCommandWrapperOptions('vod')) - try { - await ffmpegContainer.mergeInputs({ - inputs, - output, - logError: false, + let maxResolution = 0 - // Include a cover if this is an audio file - coverPath + for (const videoFile of videoFiles) { + if (!videoFile) continue + + maxResolution = Math.max(maxResolution, videoFile.resolution) + + const { input, isTmpDestination } = await buildMuxInput( + video, + videoFile, + err => { + logger.warn(`Cannot build mux input of video ${video.url}`, { err, inputs: inputsToLog, ...lTags(video.uuid) }) + + cleanup() + .catch(cleanupErr => logger.error('Cannot cleanup after mux error', { err: cleanupErr, ...lTags(video.uuid) })) + + rej(buildRequestError(err as any)) + } + ) + + inputs.push(input) + + if (isTmpDestination === true) tmpDestinations.push(input) + } + + // Include cover to audio file? + const { coverPath, isTmpDestination } = maxResolution === 0 + ? await buildCoverInput(video) + : { coverPath: undefined, isTmpDestination: false } + + if (coverPath && isTmpDestination) tmpDestinations.push(coverPath) + + const inputsToLog = inputs.map(i => { + if (typeof i === 'string') return i + + return 'ReadableStream' }) - logger.info(`Mux ended for video ${video.url}`, { inputs: inputsToLog, ...lTags(video.uuid) }) + logger.info(`Muxing files for video ${video.url}`, { inputs: inputsToLog, ...lTags(video.uuid) }) + + ffmpegContainer = new FFmpegContainer(getFFmpegCommandWrapperOptions('vod')) + + try { + await ffmpegContainer.mergeInputs({ + inputs, + output, + logError: false, + + // Include a cover if this is an audio file + coverPath + }) + + logger.info(`Mux ended for video ${video.url}`, { inputs: inputsToLog, ...lTags(video.uuid) }) + + res() + } catch (err) { + const message = err?.message || '' + + if (message.includes('Output stream closed')) { + logger.info(`Client aborted mux for video ${video.url}`, lTags(video.uuid)) + return + } + + logger.warn(`Cannot mux files of video ${video.url}`, { err, inputs: inputsToLog, ...lTags(video.uuid) }) + + if (err.inputStreamError) { + err.inputStreamError = buildRequestError(err.inputStreamError) + } + + throw err + } finally { + ffmpegContainer.forceKill() + } } catch (err) { - const message = err?.message || '' - - if (message.includes('Output stream closed')) { - logger.info(`Client aborted mux for video ${video.url}`, lTags(video.uuid)) - return - } - - logger.warn(`Cannot mux files of video ${video.url}`, { err, inputs: inputsToLog, ...lTags(video.uuid) }) - - if (err.inputStreamError) { - err.inputStreamError = buildRequestError(err.inputStreamError) - } - - throw err + rej(err) } finally { - ffmpegContainer.forceKill() + await cleanup() } - } finally { - for (const destination of tmpDestinations) { - await remove(destination) - } - - for (const input of inputs) { - if (input instanceof Readable) { - if (!input.destroyed) input.destroy() - } - } - } + }) } async function buildMuxInput ( video: MVideo, - videoFile: MVideoFile + videoFile: MVideoFile, + onStreamError: (err: Error) => void ): Promise<{ input: Readable, isTmpDestination: false } | { input: string, isTmpDestination: boolean }> { // --------------------------------------------------------------------------- // Remote @@ -375,7 +403,10 @@ async function buildMuxInput ( return { input: destination, isTmpDestination: true } } - return { input: generateRequestStream(videoFile.fileUrl, { timeout, bodyKBLimit }), isTmpDestination: false } + return { + input: generateRequestStream(videoFile.fileUrl, { timeout, bodyKBLimit }).on('error', onStreamError), + isTmpDestination: false + } } // ---------------------------------------------------------------------------