1
0
Fork 0
mirror of https://github.com/Chocobozzz/PeerTube.git synced 2025-10-04 02:09:37 +02:00

Add runner server tests

This commit is contained in:
Chocobozzz 2023-04-21 15:00:01 +02:00 committed by Chocobozzz
parent 2fe978744e
commit d102de1b38
95 changed files with 4215 additions and 648 deletions

View file

@ -3,10 +3,10 @@ export * from './cli'
export * from './custom-pages'
export * from './feeds'
export * from './logs'
export * from './miscs'
export * from './moderation'
export * from './overviews'
export * from './requests'
export * from './runners'
export * from './search'
export * from './server'
export * from './socket'

View file

@ -1,2 +0,0 @@
export * from './sql-command'
export * from './webtorrent'

View file

@ -1,146 +0,0 @@
import { QueryTypes, Sequelize } from 'sequelize'
import { forceNumber } from '@shared/core-utils'
import { AbstractCommand } from '../shared'
export class SQLCommand extends AbstractCommand {
private sequelize: Sequelize
deleteAll (table: string) {
const seq = this.getSequelize()
const options = { type: QueryTypes.DELETE }
return seq.query(`DELETE FROM "${table}"`, options)
}
async getVideoShareCount () {
const [ { total } ] = await this.selectQuery<{ total: string }>(`SELECT COUNT(*) as total FROM "videoShare"`)
if (total === null) return 0
return parseInt(total, 10)
}
async getInternalFileUrl (fileId: number) {
return this.selectQuery<{ fileUrl: string }>(`SELECT "fileUrl" FROM "videoFile" WHERE id = :fileId`, { fileId })
.then(rows => rows[0].fileUrl)
}
setActorField (to: string, field: string, value: string) {
return this.updateQuery(`UPDATE actor SET ${this.escapeColumnName(field)} = :value WHERE url = :to`, { value, to })
}
setVideoField (uuid: string, field: string, value: string) {
return this.updateQuery(`UPDATE video SET ${this.escapeColumnName(field)} = :value WHERE uuid = :uuid`, { value, uuid })
}
setPlaylistField (uuid: string, field: string, value: string) {
return this.updateQuery(`UPDATE "videoPlaylist" SET ${this.escapeColumnName(field)} = :value WHERE uuid = :uuid`, { value, uuid })
}
async countVideoViewsOf (uuid: string) {
const query = 'SELECT SUM("videoView"."views") AS "total" FROM "videoView" ' +
`INNER JOIN "video" ON "video"."id" = "videoView"."videoId" WHERE "video"."uuid" = :uuid`
const [ { total } ] = await this.selectQuery<{ total: number }>(query, { uuid })
if (!total) return 0
return forceNumber(total)
}
getActorImage (filename: string) {
return this.selectQuery<{ width: number, height: number }>(`SELECT * FROM "actorImage" WHERE filename = :filename`, { filename })
.then(rows => rows[0])
}
// ---------------------------------------------------------------------------
setPluginVersion (pluginName: string, newVersion: string) {
return this.setPluginField(pluginName, 'version', newVersion)
}
setPluginLatestVersion (pluginName: string, newVersion: string) {
return this.setPluginField(pluginName, 'latestVersion', newVersion)
}
setPluginField (pluginName: string, field: string, value: string) {
return this.updateQuery(
`UPDATE "plugin" SET ${this.escapeColumnName(field)} = :value WHERE "name" = :pluginName`,
{ pluginName, value }
)
}
// ---------------------------------------------------------------------------
selectQuery <T extends object> (query: string, replacements: { [id: string]: string | number } = {}) {
const seq = this.getSequelize()
const options = {
type: QueryTypes.SELECT as QueryTypes.SELECT,
replacements
}
return seq.query<T>(query, options)
}
updateQuery (query: string, replacements: { [id: string]: string | number } = {}) {
const seq = this.getSequelize()
const options = { type: QueryTypes.UPDATE as QueryTypes.UPDATE, replacements }
return seq.query(query, options)
}
// ---------------------------------------------------------------------------
async getPlaylistInfohash (playlistId: number) {
const query = 'SELECT "p2pMediaLoaderInfohashes" FROM "videoStreamingPlaylist" WHERE id = :playlistId'
const result = await this.selectQuery<{ p2pMediaLoaderInfohashes: string }>(query, { playlistId })
if (!result || result.length === 0) return []
return result[0].p2pMediaLoaderInfohashes
}
// ---------------------------------------------------------------------------
setActorFollowScores (newScore: number) {
return this.updateQuery(`UPDATE "actorFollow" SET "score" = :newScore`, { newScore })
}
setTokenField (accessToken: string, field: string, value: string) {
return this.updateQuery(
`UPDATE "oAuthToken" SET ${this.escapeColumnName(field)} = :value WHERE "accessToken" = :accessToken`,
{ value, accessToken }
)
}
async cleanup () {
if (!this.sequelize) return
await this.sequelize.close()
this.sequelize = undefined
}
private getSequelize () {
if (this.sequelize) return this.sequelize
const dbname = 'peertube_test' + this.server.internalServerNumber
const username = 'peertube'
const password = 'peertube'
const host = '127.0.0.1'
const port = 5432
this.sequelize = new Sequelize(dbname, username, password, {
dialect: 'postgres',
host,
port,
logging: false
})
return this.sequelize
}
private escapeColumnName (columnName: string) {
return this.getSequelize().escape(columnName)
.replace(/^'/, '"')
.replace(/'$/, '"')
}
}

View file

@ -1,46 +0,0 @@
import { readFile } from 'fs-extra'
import parseTorrent from 'parse-torrent'
import { basename, join } from 'path'
import * as WebTorrent from 'webtorrent'
import { VideoFile } from '@shared/models'
import { PeerTubeServer } from '../server'
let webtorrent: WebTorrent.Instance
function webtorrentAdd (torrentId: string, refreshWebTorrent = false) {
const WebTorrent = require('webtorrent')
if (webtorrent && refreshWebTorrent) webtorrent.destroy()
if (!webtorrent || refreshWebTorrent) webtorrent = new WebTorrent()
webtorrent.on('error', err => console.error('Error in webtorrent', err))
return new Promise<WebTorrent.Torrent>(res => {
const torrent = webtorrent.add(torrentId, res)
torrent.on('error', err => console.error('Error in webtorrent torrent', err))
torrent.on('warning', warn => {
const msg = typeof warn === 'string'
? warn
: warn.message
if (msg.includes('Unsupported')) return
console.error('Warning in webtorrent torrent', warn)
})
})
}
async function parseTorrentVideo (server: PeerTubeServer, file: VideoFile) {
const torrentName = basename(file.torrentUrl)
const torrentPath = server.servers.buildDirectory(join('torrents', torrentName))
const data = await readFile(torrentPath)
return parseTorrent(data)
}
export {
webtorrentAdd,
parseTorrentVideo
}

View file

@ -10,6 +10,7 @@ export type CommonRequestParams = {
url: string
path?: string
contentType?: string
responseType?: string
range?: string
redirects?: number
accept?: string
@ -27,16 +28,23 @@ function makeRawRequest (options: {
expectedStatus?: HttpStatusCode
range?: string
query?: { [ id: string ]: string }
method?: 'GET' | 'POST'
}) {
const { host, protocol, pathname } = new URL(options.url)
return makeGetRequest({
const reqOptions = {
url: `${protocol}//${host}`,
path: pathname,
contentType: undefined,
...pick(options, [ 'expectedStatus', 'range', 'token', 'query' ])
})
}
if (options.method === 'POST') {
return makePostBodyRequest(reqOptions)
}
return makeGetRequest(reqOptions)
}
function makeGetRequest (options: CommonRequestParams & {
@ -135,6 +143,8 @@ function decodeQueryString (path: string) {
return decode(path.split('?')[1])
}
// ---------------------------------------------------------------------------
function unwrapBody <T> (test: request.Test): Promise<T> {
return test.then(res => res.body)
}
@ -149,7 +159,16 @@ function unwrapBodyOrDecodeToJSON <T> (test: request.Test): Promise<T> {
try {
return JSON.parse(new TextDecoder().decode(res.body))
} catch (err) {
console.error('Cannot decode JSON.', res.body)
console.error('Cannot decode JSON.', res.body instanceof Buffer ? res.body.toString() : res.body)
throw err
}
}
if (res.text) {
try {
return JSON.parse(res.text)
} catch (err) {
console.error('Cannot decode json', res.text)
throw err
}
}
@ -184,6 +203,7 @@ export {
function buildRequest (req: request.Test, options: CommonRequestParams) {
if (options.contentType) req.set('Accept', options.contentType)
if (options.responseType) req.responseType(options.responseType)
if (options.token) req.set('Authorization', 'Bearer ' + options.token)
if (options.range) req.set('Range', options.range)
if (options.accept) req.set('Accept', options.accept)
@ -196,13 +216,18 @@ function buildRequest (req: request.Test, options: CommonRequestParams) {
req.set(name, options.headers[name])
})
return req.expect((res) => {
return req.expect(res => {
if (options.expectedStatus && res.status !== options.expectedStatus) {
throw new Error(`Expected status ${options.expectedStatus}, got ${res.status}. ` +
const err = new Error(`Expected status ${options.expectedStatus}, got ${res.status}. ` +
`\nThe server responded: "${res.body?.error ?? res.text}".\n` +
'You may take a closer look at the logs. To see how to do so, check out this page: ' +
'https://github.com/Chocobozzz/PeerTube/blob/develop/support/doc/development/tests.md#debug-server-logs')
'https://github.com/Chocobozzz/PeerTube/blob/develop/support/doc/development/tests.md#debug-server-logs');
(err as any).res = res
throw err
}
return res
})
}

View file

@ -0,0 +1,3 @@
export * from './runner-jobs-command'
export * from './runner-registration-tokens-command'
export * from './runners-command'

View file

@ -0,0 +1,279 @@
import { omit, pick, wait } from '@shared/core-utils'
import {
AbortRunnerJobBody,
AcceptRunnerJobBody,
AcceptRunnerJobResult,
ErrorRunnerJobBody,
HttpStatusCode,
isHLSTranscodingPayloadSuccess,
isLiveRTMPHLSTranscodingUpdatePayload,
isWebVideoOrAudioMergeTranscodingPayloadSuccess,
RequestRunnerJobBody,
RequestRunnerJobResult,
ResultList,
RunnerJobAdmin,
RunnerJobLiveRTMPHLSTranscodingPayload,
RunnerJobPayload,
RunnerJobState,
RunnerJobSuccessBody,
RunnerJobSuccessPayload,
RunnerJobType,
RunnerJobUpdateBody,
RunnerJobVODPayload
} from '@shared/models'
import { unwrapBody } from '../requests'
import { waitJobs } from '../server'
import { AbstractCommand, OverrideCommandOptions } from '../shared'
export class RunnerJobsCommand extends AbstractCommand {
list (options: OverrideCommandOptions & {
start?: number
count?: number
sort?: string
search?: string
} = {}) {
const path = '/api/v1/runners/jobs'
return this.getRequestBody<ResultList<RunnerJobAdmin>>({
...options,
path,
query: pick(options, [ 'start', 'count', 'sort', 'search' ]),
implicitToken: true,
defaultExpectedStatus: HttpStatusCode.OK_200
})
}
cancelByAdmin (options: OverrideCommandOptions & { jobUUID: string }) {
const path = '/api/v1/runners/jobs/' + options.jobUUID + '/cancel'
return this.postBodyRequest({
...options,
path,
implicitToken: true,
defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
})
}
// ---------------------------------------------------------------------------
request (options: OverrideCommandOptions & RequestRunnerJobBody) {
const path = '/api/v1/runners/jobs/request'
return unwrapBody<RequestRunnerJobResult>(this.postBodyRequest({
...options,
path,
fields: pick(options, [ 'runnerToken' ]),
implicitToken: false,
defaultExpectedStatus: HttpStatusCode.OK_200
}))
}
async requestVOD (options: OverrideCommandOptions & RequestRunnerJobBody) {
const vodTypes = new Set<RunnerJobType>([ 'vod-audio-merge-transcoding', 'vod-hls-transcoding', 'vod-web-video-transcoding' ])
const { availableJobs } = await this.request(options)
return {
availableJobs: availableJobs.filter(j => vodTypes.has(j.type))
} as RequestRunnerJobResult<RunnerJobVODPayload>
}
async requestLive (options: OverrideCommandOptions & RequestRunnerJobBody) {
const vodTypes = new Set<RunnerJobType>([ 'live-rtmp-hls-transcoding' ])
const { availableJobs } = await this.request(options)
return {
availableJobs: availableJobs.filter(j => vodTypes.has(j.type))
} as RequestRunnerJobResult<RunnerJobLiveRTMPHLSTranscodingPayload>
}
// ---------------------------------------------------------------------------
accept <T extends RunnerJobPayload = RunnerJobPayload> (options: OverrideCommandOptions & AcceptRunnerJobBody & { jobUUID: string }) {
const path = '/api/v1/runners/jobs/' + options.jobUUID + '/accept'
return unwrapBody<AcceptRunnerJobResult<T>>(this.postBodyRequest({
...options,
path,
fields: pick(options, [ 'runnerToken' ]),
implicitToken: false,
defaultExpectedStatus: HttpStatusCode.OK_200
}))
}
abort (options: OverrideCommandOptions & AbortRunnerJobBody & { jobUUID: string }) {
const path = '/api/v1/runners/jobs/' + options.jobUUID + '/abort'
return this.postBodyRequest({
...options,
path,
fields: pick(options, [ 'reason', 'jobToken', 'runnerToken' ]),
implicitToken: false,
defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
})
}
update (options: OverrideCommandOptions & RunnerJobUpdateBody & { jobUUID: string }) {
const path = '/api/v1/runners/jobs/' + options.jobUUID + '/update'
const { payload } = options
const attaches: { [id: string]: any } = {}
let payloadWithoutFiles = payload
if (isLiveRTMPHLSTranscodingUpdatePayload(payload)) {
if (payload.masterPlaylistFile) {
attaches[`payload[masterPlaylistFile]`] = payload.masterPlaylistFile
}
attaches[`payload[resolutionPlaylistFile]`] = payload.resolutionPlaylistFile
attaches[`payload[videoChunkFile]`] = payload.videoChunkFile
payloadWithoutFiles = omit(payloadWithoutFiles as any, [ 'masterPlaylistFile', 'resolutionPlaylistFile', 'videoChunkFile' ])
}
return this.postUploadRequest({
...options,
path,
fields: {
...pick(options, [ 'progress', 'jobToken', 'runnerToken' ]),
payload: payloadWithoutFiles
},
attaches,
implicitToken: false,
defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
})
}
error (options: OverrideCommandOptions & ErrorRunnerJobBody & { jobUUID: string }) {
const path = '/api/v1/runners/jobs/' + options.jobUUID + '/error'
return this.postBodyRequest({
...options,
path,
fields: pick(options, [ 'message', 'jobToken', 'runnerToken' ]),
implicitToken: false,
defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
})
}
success (options: OverrideCommandOptions & RunnerJobSuccessBody & { jobUUID: string }) {
const { payload } = options
const path = '/api/v1/runners/jobs/' + options.jobUUID + '/success'
const attaches: { [id: string]: any } = {}
let payloadWithoutFiles = payload
if ((isWebVideoOrAudioMergeTranscodingPayloadSuccess(payload) || isHLSTranscodingPayloadSuccess(payload)) && payload.videoFile) {
attaches[`payload[videoFile]`] = payload.videoFile
payloadWithoutFiles = omit(payloadWithoutFiles as any, [ 'videoFile' ])
}
if (isHLSTranscodingPayloadSuccess(payload) && payload.resolutionPlaylistFile) {
attaches[`payload[resolutionPlaylistFile]`] = payload.resolutionPlaylistFile
payloadWithoutFiles = omit(payloadWithoutFiles as any, [ 'resolutionPlaylistFile' ])
}
return this.postUploadRequest({
...options,
path,
attaches,
fields: {
...pick(options, [ 'jobToken', 'runnerToken' ]),
payload: payloadWithoutFiles
},
implicitToken: false,
defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
})
}
getInputFile (options: OverrideCommandOptions & { url: string, jobToken: string, runnerToken: string }) {
const { host, protocol, pathname } = new URL(options.url)
return this.postBodyRequest({
url: `${protocol}//${host}`,
path: pathname,
fields: pick(options, [ 'jobToken', 'runnerToken' ]),
implicitToken: false,
defaultExpectedStatus: HttpStatusCode.OK_200
})
}
// ---------------------------------------------------------------------------
async autoAccept (options: OverrideCommandOptions & RequestRunnerJobBody & { type?: RunnerJobType }) {
const { availableJobs } = await this.request(options)
const job = options.type
? availableJobs.find(j => j.type === options.type)
: availableJobs[0]
return this.accept({ ...options, jobUUID: job.uuid })
}
async autoProcessWebVideoJob (runnerToken: string, jobUUIDToProcess?: string) {
let jobUUID = jobUUIDToProcess
if (!jobUUID) {
const { availableJobs } = await this.request({ runnerToken })
jobUUID = availableJobs[0].uuid
}
const { job } = await this.accept({ runnerToken, jobUUID })
const jobToken = job.jobToken
const payload: RunnerJobSuccessPayload = { videoFile: 'video_short.mp4' }
await this.success({ runnerToken, jobUUID, jobToken, payload })
await waitJobs([ this.server ])
return job
}
async cancelAllJobs (options: { state?: RunnerJobState } = {}) {
const { state } = options
const { data } = await this.list({ count: 100 })
for (const job of data) {
if (state && job.state.id !== state) continue
await this.cancelByAdmin({ jobUUID: job.uuid })
}
}
async getJob (options: OverrideCommandOptions & { uuid: string }) {
const { data } = await this.list({ ...options, count: 100, sort: '-updatedAt' })
return data.find(j => j.uuid === options.uuid)
}
async requestLiveJob (runnerToken: string) {
let availableJobs: RequestRunnerJobResult<RunnerJobLiveRTMPHLSTranscodingPayload>['availableJobs'] = []
while (availableJobs.length === 0) {
const result = await this.requestLive({ runnerToken })
availableJobs = result.availableJobs
if (availableJobs.length === 1) break
await wait(150)
}
return availableJobs[0]
}
}

View file

@ -0,0 +1,55 @@
import { pick } from '@shared/core-utils'
import { HttpStatusCode, ResultList, RunnerRegistrationToken } from '@shared/models'
import { AbstractCommand, OverrideCommandOptions } from '../shared'
export class RunnerRegistrationTokensCommand extends AbstractCommand {
list (options: OverrideCommandOptions & {
start?: number
count?: number
sort?: string
} = {}) {
const path = '/api/v1/runners/registration-tokens'
return this.getRequestBody<ResultList<RunnerRegistrationToken>>({
...options,
path,
query: pick(options, [ 'start', 'count', 'sort' ]),
implicitToken: true,
defaultExpectedStatus: HttpStatusCode.OK_200
})
}
generate (options: OverrideCommandOptions = {}) {
const path = '/api/v1/runners/registration-tokens/generate'
return this.postBodyRequest({
...options,
path,
implicitToken: true,
defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
})
}
delete (options: OverrideCommandOptions & {
id: number
}) {
const path = '/api/v1/runners/registration-tokens/' + options.id
return this.deleteRequest({
...options,
path,
implicitToken: true,
defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
})
}
async getFirstRegistrationToken (options: OverrideCommandOptions = {}) {
const { data } = await this.list(options)
return data[0].registrationToken
}
}

View file

@ -0,0 +1,77 @@
import { pick } from '@shared/core-utils'
import { HttpStatusCode, RegisterRunnerBody, RegisterRunnerResult, ResultList, Runner, UnregisterRunnerBody } from '@shared/models'
import { unwrapBody } from '../requests'
import { AbstractCommand, OverrideCommandOptions } from '../shared'
export class RunnersCommand extends AbstractCommand {
list (options: OverrideCommandOptions & {
start?: number
count?: number
sort?: string
} = {}) {
const path = '/api/v1/runners'
return this.getRequestBody<ResultList<Runner>>({
...options,
path,
query: pick(options, [ 'start', 'count', 'sort' ]),
implicitToken: true,
defaultExpectedStatus: HttpStatusCode.OK_200
})
}
register (options: OverrideCommandOptions & RegisterRunnerBody) {
const path = '/api/v1/runners/register'
return unwrapBody<RegisterRunnerResult>(this.postBodyRequest({
...options,
path,
fields: pick(options, [ 'name', 'registrationToken', 'description' ]),
implicitToken: true,
defaultExpectedStatus: HttpStatusCode.OK_200
}))
}
unregister (options: OverrideCommandOptions & UnregisterRunnerBody) {
const path = '/api/v1/runners/unregister'
return this.postBodyRequest({
...options,
path,
fields: pick(options, [ 'runnerToken' ]),
implicitToken: false,
defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
})
}
delete (options: OverrideCommandOptions & {
id: number
}) {
const path = '/api/v1/runners/' + options.id
return this.deleteRequest({
...options,
path,
implicitToken: true,
defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
})
}
// ---------------------------------------------------------------------------
async autoRegisterRunner () {
const { data } = await this.server.runnerRegistrationTokens.list({ sort: 'createdAt' })
const { runnerToken } = await this.register({
name: 'runner',
registrationToken: data[0].registrationToken
})
return runnerToken
}
}

View file

@ -5,8 +5,9 @@ import { AbstractCommand, OverrideCommandOptions } from '../shared/abstract-comm
export class ConfigCommand extends AbstractCommand {
static getCustomConfigResolutions (enabled: boolean) {
static getCustomConfigResolutions (enabled: boolean, with0p = false) {
return {
'0p': enabled && with0p,
'144p': enabled,
'240p': enabled,
'360p': enabled,
@ -129,7 +130,8 @@ export class ConfigCommand extends AbstractCommand {
})
}
enableTranscoding (webtorrent = true, hls = true) {
// TODO: convert args to object
enableTranscoding (webtorrent = true, hls = true, with0p = false) {
return this.updateExistingSubConfig({
newConfig: {
transcoding: {
@ -138,7 +140,7 @@ export class ConfigCommand extends AbstractCommand {
allowAudioFiles: true,
allowAdditionalExtensions: true,
resolutions: ConfigCommand.getCustomConfigResolutions(true),
resolutions: ConfigCommand.getCustomConfigResolutions(true, with0p),
webtorrent: {
enabled: webtorrent
@ -151,6 +153,7 @@ export class ConfigCommand extends AbstractCommand {
})
}
// TODO: convert args to object
enableMinimumTranscoding (webtorrent = true, hls = true) {
return this.updateExistingSubConfig({
newConfig: {
@ -173,6 +176,25 @@ export class ConfigCommand extends AbstractCommand {
})
}
enableRemoteTranscoding () {
return this.updateExistingSubConfig({
newConfig: {
transcoding: {
remoteRunners: {
enabled: true
}
},
live: {
transcoding: {
remoteRunners: {
enabled: true
}
}
}
}
})
}
// ---------------------------------------------------------------------------
enableStudio () {
@ -363,6 +385,9 @@ export class ConfigCommand extends AbstractCommand {
},
transcoding: {
enabled: true,
remoteRunners: {
enabled: false
},
allowAdditionalExtensions: true,
allowAudioFiles: true,
threads: 1,
@ -398,6 +423,9 @@ export class ConfigCommand extends AbstractCommand {
maxUserLives: 50,
transcoding: {
enabled: true,
remoteRunners: {
enabled: false
},
threads: 4,
profile: 'default',
resolutions: {

View file

@ -1,16 +1,17 @@
import { expect } from 'chai'
import { wait } from '@shared/core-utils'
import { JobState, JobType } from '../../models'
import { JobState, JobType, RunnerJobState } from '../../models'
import { PeerTubeServer } from './server'
async function waitJobs (
serversArg: PeerTubeServer[] | PeerTubeServer,
options: {
skipDelayed?: boolean // default false
runnerJobs?: boolean // default false
} = {}
) {
const { skipDelayed = false } = options
const { skipDelayed = false, runnerJobs = false } = options
const pendingJobWait = process.env.NODE_PENDING_JOB_WAIT
? parseInt(process.env.NODE_PENDING_JOB_WAIT, 10)
@ -33,7 +34,8 @@ async function waitJobs (
// Check if each server has pending request
for (const server of servers) {
for (const state of states) {
const p = server.jobs.list({
const jobPromise = server.jobs.list({
state,
start: 0,
count: 10,
@ -46,17 +48,29 @@ async function waitJobs (
}
})
tasks.push(p)
tasks.push(jobPromise)
}
const p = server.debug.getDebug()
const debugPromise = server.debug.getDebug()
.then(obj => {
if (obj.activityPubMessagesWaiting !== 0) {
pendingRequests = true
}
})
tasks.push(debugPromise)
if (runnerJobs) {
const runnerJobsPromise = server.runnerJobs.list({ count: 100 })
.then(({ data }) => {
for (const job of data) {
if (job.state.id !== RunnerJobState.COMPLETED) {
pendingRequests = true
}
}
})
tasks.push(runnerJobsPromise)
}
tasks.push(p)
}
return tasks

View file

@ -8,9 +8,9 @@ import { CLICommand } from '../cli'
import { CustomPagesCommand } from '../custom-pages'
import { FeedCommand } from '../feeds'
import { LogsCommand } from '../logs'
import { SQLCommand } from '../miscs'
import { AbusesCommand } from '../moderation'
import { OverviewsCommand } from '../overviews'
import { RunnerJobsCommand, RunnerRegistrationTokensCommand, RunnersCommand } from '../runners'
import { SearchCommand } from '../search'
import { SocketIOCommand } from '../socket'
import {
@ -136,7 +136,6 @@ export class PeerTubeServer {
streamingPlaylists?: StreamingPlaylistsCommand
channels?: ChannelsCommand
comments?: CommentsCommand
sql?: SQLCommand
notifications?: NotificationsCommand
servers?: ServersCommand
login?: LoginCommand
@ -150,6 +149,10 @@ export class PeerTubeServer {
videoToken?: VideoTokenCommand
registrations?: RegistrationsCommand
runners?: RunnersCommand
runnerRegistrationTokens?: RunnerRegistrationTokensCommand
runnerJobs?: RunnerJobsCommand
constructor (options: { serverNumber: number } | { url: string }) {
if ((options as any).url) {
this.setUrl((options as any).url)
@ -311,14 +314,14 @@ export class PeerTubeServer {
})
}
async kill () {
if (!this.app) return
await this.sql.cleanup()
kill () {
if (!this.app) return Promise.resolve()
process.kill(-this.app.pid)
this.app = null
return Promise.resolve()
}
private randomServer () {
@ -420,7 +423,6 @@ export class PeerTubeServer {
this.streamingPlaylists = new StreamingPlaylistsCommand(this)
this.channels = new ChannelsCommand(this)
this.comments = new CommentsCommand(this)
this.sql = new SQLCommand(this)
this.notifications = new NotificationsCommand(this)
this.servers = new ServersCommand(this)
this.login = new LoginCommand(this)
@ -433,5 +435,9 @@ export class PeerTubeServer {
this.twoFactor = new TwoFactorCommand(this)
this.videoToken = new VideoTokenCommand(this)
this.registrations = new RegistrationsCommand(this)
this.runners = new RunnersCommand(this)
this.runnerRegistrationTokens = new RunnerRegistrationTokensCommand(this)
this.runnerJobs = new RunnerJobsCommand(this)
}
}

View file

@ -20,7 +20,7 @@ function createMultipleServers (totalServers: number, configOverride?: object, o
return Promise.all(serverPromises)
}
async function killallServers (servers: PeerTubeServer[]) {
function killallServers (servers: PeerTubeServer[]) {
return Promise.all(servers.map(s => s.kill()))
}

View file

@ -33,6 +33,7 @@ interface InternalCommonCommandOptions extends OverrideCommandOptions {
host?: string
headers?: { [ name: string ]: string }
requestType?: string
responseType?: string
xForwardedFor?: string
}
@ -169,7 +170,7 @@ abstract class AbstractCommand {
}
protected buildCommonRequestOptions (options: InternalCommonCommandOptions) {
const { url, path, redirects, contentType, accept, range, host, headers, requestType, xForwardedFor } = options
const { url, path, redirects, contentType, accept, range, host, headers, requestType, xForwardedFor, responseType } = options
return {
url: url ?? this.server.url,
@ -185,6 +186,7 @@ abstract class AbstractCommand {
accept,
headers,
type: requestType,
responseType,
xForwardedFor
}
}

View file

@ -12,4 +12,13 @@ export class SocketIOCommand extends AbstractCommand {
getLiveNotificationSocket () {
return io(this.server.url + '/live-videos')
}
getRunnersSocket (options: {
runnerToken: string
}) {
return io(this.server.url + '/runners', {
reconnection: false,
auth: { runnerToken: options.runnerToken }
})
}
}

View file

@ -121,7 +121,7 @@ export class LiveCommand extends AbstractCommand {
permanentLive: boolean
privacy?: VideoPrivacy
}) {
const { saveReplay, permanentLive, privacy } = options
const { saveReplay, permanentLive, privacy = VideoPrivacy.PUBLIC } = options
const { uuid } = await this.create({
...options,

View file

@ -13,7 +13,7 @@ export class StreamingPlaylistsCommand extends AbstractCommand {
withRetry?: boolean // default false
currentRetry?: number
}) {
}): Promise<string> {
const { videoFileToken, reinjectVideoFileToken, withRetry, currentRetry = 1 } = options
try {
@ -54,6 +54,7 @@ export class StreamingPlaylistsCommand extends AbstractCommand {
url: options.url,
range: options.range,
implicitToken: false,
responseType: 'application/octet-stream',
defaultExpectedStatus: HttpStatusCode.OK_200
}))
}
@ -65,6 +66,7 @@ export class StreamingPlaylistsCommand extends AbstractCommand {
...options,
url: options.url,
contentType: 'application/json',
implicitToken: false,
defaultExpectedStatus: HttpStatusCode.OK_200
}))