1
0
Fork 0
mirror of https://github.com/Chocobozzz/PeerTube.git synced 2025-10-03 17:59:37 +02:00

Support query params in runner custom upload

This commit is contained in:
Chocobozzz 2025-04-16 15:00:59 +02:00
parent e498a80638
commit b7a974e448
No known key found for this signature in database
GPG key ID: 583A612D890159BE
6 changed files with 197 additions and 152 deletions

View file

@ -6,7 +6,6 @@ import { VideoEdit } from '@app/+videos-publish-manage/shared-manage/common/vide
import { VideoUploadService } from '@app/+videos-publish-manage/shared-manage/common/video-upload.service'
import { VideoManageController } from '@app/+videos-publish-manage/shared-manage/video-manage-controller.service'
import { CanComponentDeactivate, CanDeactivateGuard, HooksService, MetaService, Notifier, ServerService } from '@app/core'
import { AlertComponent } from '@app/shared/shared-main/common/alert.component'
import { NgbTooltip } from '@ng-bootstrap/ng-bootstrap'
import { UserVideoQuota, VideoPrivacyType } from '@peertube/peertube-models'
import debug from 'debug'

View file

@ -17,10 +17,12 @@ export type CommonRequestParams = {
accept?: string
host?: string
token?: string
headers?: { [ name: string ]: string }
headers?: { [name: string]: string }
type?: string
xForwardedFor?: string
expectedStatus?: HttpStatusCodeType
query?: { [id: string]: any }
rawQuery?: string
}
export function makeRawRequest (options: {
@ -29,10 +31,10 @@ export function makeRawRequest (options: {
expectedStatus?: HttpStatusCodeType
responseType?: string
range?: string
query?: { [ id: string ]: string }
query?: { [id: string]: string }
method?: 'GET' | 'POST'
accept?: string
headers?: { [ name: string ]: string }
headers?: { [name: string]: string }
redirects?: number
}) {
const { host, protocol, pathname, searchParams } = new URL(options.url)
@ -68,15 +70,9 @@ export const makeFileRequest = (url: string) => {
})
}
export function makeGetRequest (options: CommonRequestParams & {
query?: any
rawQuery?: string
}) {
export function makeGetRequest (options: CommonRequestParams) {
const req = request(options.url).get(options.path)
if (options.query) req.query(options.query)
if (options.rawQuery) req.query(options.rawQuery)
return buildRequest(req, { contentType: 'application/json', expectedStatus: HttpStatusCode.BAD_REQUEST_400, ...options })
}
@ -110,24 +106,25 @@ export function makeActivityPubRawRequest (url: string, expectedStatus: HttpStat
// ---------------------------------------------------------------------------
export function makeDeleteRequest (options: CommonRequestParams & {
query?: any
rawQuery?: string
}) {
export function makeDeleteRequest (
options: CommonRequestParams & {
query?: any
rawQuery?: string
}
) {
const req = request(options.url).delete(options.path)
if (options.query) req.query(options.query)
if (options.rawQuery) req.query(options.rawQuery)
return buildRequest(req, { accept: 'application/json', expectedStatus: HttpStatusCode.BAD_REQUEST_400, ...options })
}
export function makeUploadRequest (options: CommonRequestParams & {
method?: 'POST' | 'PUT'
export function makeUploadRequest (
options: CommonRequestParams & {
method?: 'POST' | 'PUT'
fields: { [ fieldName: string ]: any }
attaches?: { [ attachName: string ]: any | any[] }
}) {
fields: { [fieldName: string]: any }
attaches?: { [attachName: string]: any | any[] }
}
) {
let req = options.method === 'PUT'
? request(options.url).put(options.path)
: request(options.url).post(options.path)
@ -161,11 +158,13 @@ export function makeUploadRequest (options: CommonRequestParams & {
return req
}
export function makePostBodyRequest (options: CommonRequestParams & {
fields?: { [ fieldName: string ]: any }
}) {
export function makePostBodyRequest (
options: CommonRequestParams & {
fields?: { [fieldName: string]: any }
}
) {
const req = request(options.url).post(options.path)
.send(options.fields)
.send(options.fields)
return buildRequest(req, { accept: 'application/json', expectedStatus: HttpStatusCode.BAD_REQUEST_400, ...options })
}
@ -174,12 +173,12 @@ export function makePutBodyRequest (options: {
url: string
path: string
token?: string
fields: { [ fieldName: string ]: any }
fields: { [fieldName: string]: any }
expectedStatus?: HttpStatusCodeType
headers?: { [name: string]: string }
}) {
const req = request(options.url).put(options.path)
.send(options.fields)
.send(options.fields)
return buildRequest(req, { accept: 'application/json', expectedStatus: HttpStatusCode.BAD_REQUEST_400, ...options })
}
@ -205,7 +204,7 @@ export function decodeQueryString (path: string) {
// ---------------------------------------------------------------------------
export function unwrapBody <T> (test: request.Test): Promise<T> {
export function unwrapBody<T> (test: request.Test): Promise<T> {
return test.then(res => res.body)
}
@ -213,7 +212,7 @@ export function unwrapText (test: request.Test): Promise<string> {
return test.then(res => res.text)
}
export function unwrapBodyOrDecodeToJSON <T> (test: request.Test): Promise<T> {
export function unwrapBodyOrDecodeToJSON<T> (test: request.Test): Promise<T> {
return test.then(res => {
if (res.body instanceof Buffer) {
try {
@ -255,6 +254,8 @@ function buildRequest (req: request.Test, options: CommonRequestParams) {
if (options.redirects) req.redirects(options.redirects)
if (options.xForwardedFor) req.set('X-Forwarded-For', options.xForwardedFor)
if (options.type) req.type(options.type)
if (options.query) req.query(options.query)
if (options.rawQuery) req.query(options.rawQuery)
Object.keys(options.headers || {}).forEach(name => {
req.set(name, options.headers[name])
@ -262,12 +263,13 @@ function buildRequest (req: request.Test, options: CommonRequestParams) {
return req.expect(res => {
if (options.expectedStatus && res.status !== options.expectedStatus) {
const err = new Error(`Expected status ${options.expectedStatus}, got ${res.status}. ` +
`\nThe server responded: "${res.body?.error ?? res.text}".\n` +
'You may take a closer look at the logs. To see how to do so, check out this page: ' +
'https://github.com/Chocobozzz/PeerTube/blob/develop/support/doc/development/tests.md#debug-server-logs');
(err as any).res = res
const err = new Error(
`Expected status ${options.expectedStatus}, got ${res.status}. ` +
`\nThe server responded: "${res.body?.error ?? res.text}".\n` +
'You may take a closer look at the logs. To see how to do so, check out this page: ' +
'https://github.com/Chocobozzz/PeerTube/blob/develop/support/doc/development/tests.md#debug-server-logs'
)
;(err as any).res = res
throw err
}
@ -276,7 +278,7 @@ function buildRequest (req: request.Test, options: CommonRequestParams) {
})
}
function buildFields (req: request.Test, fields: { [ fieldName: string ]: any }, namespace?: string) {
function buildFields (req: request.Test, fields: { [fieldName: string]: any }, namespace?: string) {
if (!fields) return
let formKey: string

View file

@ -36,7 +36,6 @@ import { waitJobs } from '../server/jobs.js'
import { AbstractCommand, OverrideCommandOptions } from '../shared/index.js'
export class RunnerJobsCommand extends AbstractCommand {
list (options: OverrideCommandOptions & ListRunnerJobsQuery = {}) {
const path = '/api/v1/runners/jobs'
@ -111,7 +110,7 @@ export class RunnerJobsCommand extends AbstractCommand {
// ---------------------------------------------------------------------------
accept <T extends RunnerJobPayload = RunnerJobPayload> (options: OverrideCommandOptions & AcceptRunnerJobBody & { jobUUID: string }) {
accept<T extends RunnerJobPayload = RunnerJobPayload> (options: OverrideCommandOptions & AcceptRunnerJobBody & { jobUUID: string }) {
const path = '/api/v1/runners/jobs/' + options.jobUUID + '/accept'
return unwrapBody<AcceptRunnerJobResult<T>>(this.postBodyRequest({
@ -295,14 +294,16 @@ export class RunnerJobsCommand extends AbstractCommand {
}
}
private async uploadRunnerJobRequest (options: OverrideCommandOptions & {
path: string
private async uploadRunnerJobRequest (
options: OverrideCommandOptions & {
path: string
fields: { [ fieldName: string ]: any }
attaches: { [ fieldName: string ]: any }
fields: { [fieldName: string]: any }
attaches: { [fieldName: string]: any }
customUploads?: (RunnerJobCustomUpload & { file: string | Blob })[]
}) {
customUploads?: (RunnerJobCustomUpload & { file: string | Blob })[]
}
) {
for (const customUpload of (options.customUploads || [])) {
await this.customUpload(customUpload)
}
@ -318,15 +319,18 @@ export class RunnerJobsCommand extends AbstractCommand {
private customUpload (options: RunnerJobCustomUpload & { file: Blob | string }) {
const parsedUrl = new URL(options.url)
const reqOptions = {
const reqOptions: Parameters<RunnerJobsCommand['postUploadRequest']>[0] = {
url: parsedUrl.origin,
path: parsedUrl.pathname,
rawQuery: parsedUrl.searchParams.toString(),
attaches: { file: options.file },
implicitToken: false,
defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
}
if (options.method === 'POST') return this.postUploadRequest(reqOptions)
if (options.method === 'POST') {
return this.postUploadRequest(reqOptions)
}
return this.putUploadRequest(reqOptions)
}

View file

@ -15,6 +15,7 @@ import {
unwrapText
} from '../requests/requests.js'
import { pick } from '@peertube/peertube-core-utils'
import { createReadStream } from 'fs'
import type { PeerTubeServer } from '../server/server.js'
@ -38,38 +39,30 @@ interface InternalCommonCommandOptions extends OverrideCommandOptions {
redirects?: number
range?: string
host?: string
headers?: { [ name: string ]: string }
headers?: { [name: string]: string }
requestType?: string
responseType?: string
xForwardedFor?: string
}
interface InternalGetCommandOptions extends InternalCommonCommandOptions {
query?: { [ id: string ]: any }
}
interface InternalDeleteCommandOptions extends InternalCommonCommandOptions {
query?: { [ id: string ]: any }
query?: { [id: string]: any }
rawQuery?: string
}
export abstract class AbstractCommand {
constructor (
protected server: PeerTubeServer
) {
}
protected getRequestBody <T> (options: InternalGetCommandOptions) {
protected getRequestBody<T> (options: InternalCommonCommandOptions) {
return unwrapBody<T>(this.getRequest(options))
}
protected getRequestText (options: InternalGetCommandOptions) {
protected getRequestText (options: InternalCommonCommandOptions) {
return unwrapText(this.getRequest(options))
}
protected getRawRequest (options: Omit<InternalGetCommandOptions, 'path'>) {
protected getRawRequest (options: Omit<InternalCommonCommandOptions, 'path'>) {
const { url, range } = options
const { host, protocol, pathname } = new URL(url)
@ -85,31 +78,20 @@ export abstract class AbstractCommand {
})
}
protected getRequest (options: InternalGetCommandOptions) {
const { query } = options
return makeGetRequest({
...this.buildCommonRequestOptions(options),
query
})
protected getRequest (options: InternalCommonCommandOptions) {
return makeGetRequest(this.buildCommonRequestOptions(options))
}
protected deleteRequest (options: InternalDeleteCommandOptions) {
const { query, rawQuery } = options
return makeDeleteRequest({
...this.buildCommonRequestOptions(options),
query,
rawQuery
})
protected deleteRequest (options: InternalCommonCommandOptions) {
return makeDeleteRequest(this.buildCommonRequestOptions(options))
}
protected putBodyRequest (options: InternalCommonCommandOptions & {
fields?: { [ fieldName: string ]: any }
headers?: { [name: string]: string }
}) {
protected putBodyRequest (
options: InternalCommonCommandOptions & {
fields?: { [fieldName: string]: any }
headers?: { [name: string]: string }
}
) {
const { fields, headers } = options
return makePutBodyRequest({
@ -120,10 +102,12 @@ export abstract class AbstractCommand {
})
}
protected postBodyRequest (options: InternalCommonCommandOptions & {
fields?: { [ fieldName: string ]: any }
headers?: { [name: string]: string }
}) {
protected postBodyRequest (
options: InternalCommonCommandOptions & {
fields?: { [fieldName: string]: any }
headers?: { [name: string]: string }
}
) {
const { fields, headers } = options
return makePostBodyRequest({
@ -134,10 +118,12 @@ export abstract class AbstractCommand {
})
}
protected postUploadRequest (options: InternalCommonCommandOptions & {
fields?: { [ fieldName: string ]: any }
attaches?: { [ fieldName: string ]: any }
}) {
protected postUploadRequest (
options: InternalCommonCommandOptions & {
fields?: { [fieldName: string]: any }
attaches?: { [fieldName: string]: any }
}
) {
const { fields, attaches } = options
return makeUploadRequest({
@ -149,10 +135,12 @@ export abstract class AbstractCommand {
})
}
protected putUploadRequest (options: InternalCommonCommandOptions & {
fields?: { [ fieldName: string ]: any }
attaches?: { [ fieldName: string ]: any }
}) {
protected putUploadRequest (
options: InternalCommonCommandOptions & {
fields?: { [fieldName: string]: any }
attaches?: { [fieldName: string]: any }
}
) {
const { fields, attaches } = options
return makeUploadRequest({
@ -164,10 +152,12 @@ export abstract class AbstractCommand {
})
}
protected updateImageRequest (options: InternalCommonCommandOptions & {
fixture: string
fieldname: string
}) {
protected updateImageRequest (
options: InternalCommonCommandOptions & {
fixture: string
fieldname: string
}
) {
const filePath = isAbsolute(options.fixture)
? options.fixture
: buildAbsoluteFixturePath(options.fixture)
@ -181,24 +171,28 @@ export abstract class AbstractCommand {
}
protected buildCommonRequestOptions (options: InternalCommonCommandOptions) {
const { url, path, redirects, contentType, accept, range, host, headers, requestType, xForwardedFor, responseType } = options
const { url, path, requestType } = options
return {
url: url ?? this.server.url,
path,
type: requestType,
token: this.buildCommonRequestToken(options),
expectedStatus: this.buildExpectedStatus(options),
redirects,
contentType,
range,
host,
accept,
headers,
type: requestType,
responseType,
xForwardedFor
...pick(options, [
'redirects',
'contentType',
'range',
'host',
'accept',
'headers',
'responseType',
'xForwardedFor',
'query',
'rawQuery'
])
}
}
@ -226,15 +220,17 @@ export abstract class AbstractCommand {
// ---------------------------------------------------------------------------
protected async buildResumeUpload <T> (options: OverrideCommandOptions & {
path: string
protected async buildResumeUpload<T> (
options: OverrideCommandOptions & {
path: string
fixture: string
attaches?: Record<string, string>
fields?: Record<string, any>
fixture: string
attaches?: Record<string, string>
fields?: Record<string, any>
completedExpectedStatus?: HttpStatusCodeType // When the upload is finished
}): Promise<T> {
completedExpectedStatus?: HttpStatusCodeType // When the upload is finished
}
): Promise<T> {
const { path, fixture, expectedStatus = HttpStatusCode.OK_200, completedExpectedStatus } = options
let size = 0
@ -304,19 +300,21 @@ export abstract class AbstractCommand {
return initializeSessionRes.body.video || initializeSessionRes.body
}
protected async prepareResumableUpload (options: OverrideCommandOptions & {
path: string
protected async prepareResumableUpload (
options: OverrideCommandOptions & {
path: string
fixture: string
size: number
mimetype: string
fixture: string
size: number
mimetype: string
attaches?: Record<string, string>
fields?: Record<string, any>
attaches?: Record<string, string>
fields?: Record<string, any>
originalName?: string
lastModified?: number
}) {
originalName?: string
lastModified?: number
}
) {
const { path, attaches = {}, fields = {}, originalName, lastModified, fixture, size, mimetype } = options
const uploadOptions = {
@ -347,15 +345,17 @@ export abstract class AbstractCommand {
return this.postUploadRequest(uploadOptions)
}
protected async sendResumableChunks <T> (options: OverrideCommandOptions & {
pathUploadId: string
path: string
videoFilePath: string
size: number
contentLength?: number
contentRangeBuilder?: (start: number, chunk: any) => string
digestBuilder?: (chunk: any) => string
}) {
protected async sendResumableChunks<T> (
options: OverrideCommandOptions & {
pathUploadId: string
path: string
videoFilePath: string
size: number
contentLength?: number
contentRangeBuilder?: (start: number, chunk: any) => string
digestBuilder?: (chunk: any) => string
}
) {
const {
path,
pathUploadId,
@ -414,7 +414,8 @@ export abstract class AbstractCommand {
readable.off('data', onData)
// eslint-disable-next-line max-len
const message = `Incorrect transient behaviour sending intermediary chunks. Status code is ${res.statusCode} instead of ${expectedStatus}`
const message =
`Incorrect transient behaviour sending intermediary chunks. Status code is ${res.statusCode} instead of ${expectedStatus}`
return reject(new Error(message))
}
}
@ -427,10 +428,12 @@ export abstract class AbstractCommand {
})
}
protected endResumableUpload (options: OverrideCommandOptions & {
path: string
pathUploadId: string
}) {
protected endResumableUpload (
options: OverrideCommandOptions & {
path: string
pathUploadId: string
}
) {
return this.deleteRequest({
...options,

View file

@ -11,9 +11,10 @@ import {
setDefaultVideoChannel,
waitJobs
} from '@peertube/peertube-server-commands'
import { MockUpload } from '@tests/shared/mock-servers/mock-upload.js'
import { MockUpload, MockUploadStore } from '@tests/shared/mock-servers/mock-upload.js'
import { PeerTubeRunnerProcess } from '@tests/shared/peertube-runner-process.js'
import { SQLCommand } from '@tests/shared/sql-command.js'
import { expect } from 'chai'
describe('Test peertube-runner custom upload', function () {
let server: PeerTubeServer
@ -34,7 +35,16 @@ describe('Test peertube-runner custom upload', function () {
await peertubeRunner.unregisterPeerTubeInstance({ runnerName: 'runner' })
}
async function updatePayload (method?: 'PUT' | 'POST') {
async function updatePayload (options: {
method: 'PUT' | 'POST'
query?: string
}) {
const { method, query } = options
const urlSuffix = query
? `?${query}`
: ''
const { data } = await server.runnerJobs.list({ stateOneOf: [ RunnerJobState.PENDING ] })
for (const job of data) {
@ -42,7 +52,7 @@ describe('Test peertube-runner custom upload', function () {
payload.output.videoFileCustomUpload = {
method,
url: mockUploadServerUrl + '/upload-file'
url: mockUploadServerUrl + '/upload-file' + urlSuffix
}
await sqlCommand.setRunnerJobPayload(job.uuid, payload)
@ -74,18 +84,28 @@ describe('Test peertube-runner custom upload', function () {
})
it('Should upload the file on another endpoint for web video', async function () {
await server.config.enableTranscoding({ hls: false, webVideo: true })
await server.videos.quickUpload({ name: 'video 1' })
await server.videos.quickUpload({ name: 'video 2' })
await waitJobs([ server ])
await updatePayload('POST')
await updatePayload({ method: 'POST' })
await registerRunner()
do {
const { body } = await makeGetRequest({ url: mockUploadServerUrl, path: '/uploaded-files', expectedStatus: HttpStatusCode.OK_200 })
const res = await makeGetRequest({ url: mockUploadServerUrl, path: '/uploaded-files', expectedStatus: HttpStatusCode.OK_200 })
const body = res.body as MockUploadStore[]
// 2 x 5 retries because the server doesn't accept non existing files
if (body.length === 10 && body.every(f => f.method === 'POST')) break
if (body.length === 10 && body.every(f => f.method === 'POST')) {
for (const b of body) {
expect(Object.keys(b.query)).to.deep.equal([])
}
break
}
await wait(500)
} while (true)
@ -97,14 +117,22 @@ describe('Test peertube-runner custom upload', function () {
await server.videos.runTranscoding({ transcodingType: 'hls', videoId: transcoded })
await waitJobs([ server ])
await updatePayload()
await updatePayload({ method: 'PUT', query: 'key1=value1&key2=value2' })
await registerRunner()
do {
const { body } = await makeGetRequest({ url: mockUploadServerUrl, path: '/uploaded-files', expectedStatus: HttpStatusCode.OK_200 })
const res = await makeGetRequest({ url: mockUploadServerUrl, path: '/uploaded-files', expectedStatus: HttpStatusCode.OK_200 })
const body = res.body as MockUploadStore[]
// 5 retries because the server doesn't accept non existing files
if (body.length === 5 && body.every(f => f.method === 'PUT')) break
if (body.length === 5 && body.every(f => f.method === 'PUT')) {
for (const b of body) {
expect(b.query).to.deep.equal({ key1: 'value1', key2: 'value2' })
}
break
}
await wait(500)
} while (true)

View file

@ -4,10 +4,16 @@ import { Server } from 'http'
import multer from 'multer'
import { getPort, randomListen, terminateServer } from './shared.js'
export type MockUploadStore = {
method: string
file: Buffer
query: Record<string, any>
}
export class MockUpload {
private server: Server
private uploads: { method: string, file: Buffer }[] = []
private uploads: MockUploadStore[] = []
async initialize () {
const app = express()
@ -16,12 +22,15 @@ export class MockUpload {
'/upload-file',
multer({ storage: multer.memoryStorage() }).single('file'),
(req: express.Request, res: express.Response, next: express.NextFunction) => {
if (process.env.DEBUG) console.log('Receiving request on upload mock server.', req.url)
if (process.env.DEBUG) {
console.log('Receiving request on upload mock server.', { url: req.originalUrl, query: req.query })
}
this.uploads.push({ method: req.method, file: req.file.buffer })
this.uploads.push({ method: req.method, file: req.file.buffer, query: req.query })
return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
})
}
)
app.get('/uploaded-files', (req: express.Request, res: express.Response) => {
return res.json(this.uploads)