diff --git a/apps/peertube-runner/CHANGELOG.md b/apps/peertube-runner/CHANGELOG.md new file mode 100644 index 000000000..564f76a91 --- /dev/null +++ b/apps/peertube-runner/CHANGELOG.md @@ -0,0 +1,8 @@ +# Changelog + +## v0.1.0 + + * Requires Node 20 + * Introduce `list-jobs` command to list processing jobs + * Update dependencies + * Send last chunks/playlist content to correctly end the live diff --git a/apps/peertube-runner/src/peertube-runner.ts b/apps/peertube-runner/src/peertube-runner.ts index 25349cd20..05662028a 100644 --- a/apps/peertube-runner/src/peertube-runner.ts +++ b/apps/peertube-runner/src/peertube-runner.ts @@ -2,7 +2,7 @@ import { Command, InvalidArgumentError } from '@commander-js/extra-typings' import { RunnerJobType } from '@peertube/peertube-models' -import { listRegistered, registerRunner, unregisterRunner } from './register/index.js' +import { listJobs, listRegistered, registerRunner, unregisterRunner } from './register/index.js' import { gracefulShutdown } from './register/shutdown.js' import { RunnerServer } from './server/index.js' import { getSupportedJobsList } from './server/shared/supported-job.js' @@ -99,6 +99,19 @@ program.command('list-registered') } }) +program.command('list-jobs') + .description('List processing jobs') + .option('--include-payload', 'Include job payload in the output') + .action(async options => { + try { + await listJobs({ includePayload: options.includePayload }) + } catch (err) { + console.error('Cannot list processing jobs.') + console.error(err) + process.exit(-1) + } + }) + program.command('graceful-shutdown') .description('Exit runner when all processing tasks are finished') .action(async () => { diff --git a/apps/peertube-runner/src/register/register.ts b/apps/peertube-runner/src/register/register.ts index e8af21661..f76790f9c 100644 --- a/apps/peertube-runner/src/register/register.ts +++ b/apps/peertube-runner/src/register/register.ts @@ -34,3 +34,14 @@ export async function listRegistered () { client.stop() } + +export async function listJobs (options: { + includePayload: boolean +}) { + const client = new IPCClient() + await client.run() + + await client.askListJobs(options) + + client.stop() +} diff --git a/apps/peertube-runner/src/server/process/shared/common.ts b/apps/peertube-runner/src/server/process/shared/common.ts index 52ea0a217..8e7ba4583 100644 --- a/apps/peertube-runner/src/server/process/shared/common.ts +++ b/apps/peertube-runner/src/server/process/shared/common.ts @@ -61,11 +61,13 @@ export function scheduleTranscodingProgress (options: { : 60000 const update = () => { + job.progress = progressGetter() || 0 + server.runnerJobs.update({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken, - progress: progressGetter() + progress: job.progress }).catch(err => logger.error({ err }, 'Cannot send job progress')) } diff --git a/apps/peertube-runner/src/server/server.ts b/apps/peertube-runner/src/server/server.ts index 6c58915ef..b897324a9 100644 --- a/apps/peertube-runner/src/server/server.ts +++ b/apps/peertube-runner/src/server/server.ts @@ -183,6 +183,19 @@ export class RunnerServer { // --------------------------------------------------------------------------- + listJobs () { + return { + concurrency: ConfigManager.Instance.getConfig().jobs.concurrency, + + processingJobs: this.processingJobs.map(j => ({ + serverUrl: j.server.url, + job: pick(j.job, [ 'type', 'startedAt', 'progress', 'payload' ]) + })) + } + } + + // --------------------------------------------------------------------------- + requestGracefulShutdown () { logger.info('Received graceful shutdown request') diff --git a/apps/peertube-runner/src/shared/ipc/ipc-client.ts b/apps/peertube-runner/src/shared/ipc/ipc-client.ts index 90575dbb4..7f7215dc8 100644 --- a/apps/peertube-runner/src/shared/ipc/ipc-client.ts +++ b/apps/peertube-runner/src/shared/ipc/ipc-client.ts @@ -1,8 +1,8 @@ +import { Client as NetIPC } from '@peertube/net-ipc' import CliTable3 from 'cli-table3' import { ensureDir } from 'fs-extra/esm' -import { Client as NetIPC } from '@peertube/net-ipc' import { ConfigManager } from '../config-manager.js' -import { IPCResponse, IPCResponseData, IPCRequest } from './shared/index.js' +import { IPCRequest, IPCResponse, IPCResponseListJobs, IPCResponseListRegistered } from './shared/index.js' export class IPCClient { private netIPC: NetIPC @@ -65,7 +65,7 @@ export class IPCClient { type: 'list-registered' } - const { success, error, data } = await this.netIPC.request(req) as IPCResponse + const { success, error, data } = await this.netIPC.request(req) as IPCResponse if (!success) { console.error('Could not list registered PeerTube instances', error) return @@ -82,6 +82,39 @@ export class IPCClient { console.log(table.toString()) } + async askListJobs (options: { + includePayload: boolean + }) { + const req: IPCRequest = { + type: 'list-jobs' + } + + const { success, error, data } = await this.netIPC.request(req) as IPCResponse + if (!success) { + console.error('Could not list jobs', error) + return + } + + const head = [ 'instance', 'type', 'started', 'progress' ] + if (options.includePayload) head.push('payload') + + const table = new CliTable3({ + head, + wordWrap: true, + wrapOnWordBoundary: false + }) + + for (const { serverUrl, job } of data.processingJobs) { + const row = [ serverUrl, job.type, job.startedAt.toLocaleString(), `${job.progress}%` ] + if (options.includePayload) row.push(JSON.stringify(job.payload, undefined, 2)) + + table.push(row) + } + + console.log(`Processing ${data.processingJobs.length}/${data.concurrency} jobs`) + console.log(table.toString()) + } + // --------------------------------------------------------------------------- async askGracefulShutdown () { diff --git a/apps/peertube-runner/src/shared/ipc/ipc-server.ts b/apps/peertube-runner/src/shared/ipc/ipc-server.ts index ff03d5aef..c8d046193 100644 --- a/apps/peertube-runner/src/shared/ipc/ipc-server.ts +++ b/apps/peertube-runner/src/shared/ipc/ipc-server.ts @@ -46,6 +46,9 @@ export class IPCServer { case 'list-registered': return Promise.resolve(this.runnerServer.listRegistered()) + case 'list-jobs': + return Promise.resolve(this.runnerServer.listJobs()) + case 'graceful-shutdown': this.runnerServer.requestGracefulShutdown() return undefined diff --git a/apps/peertube-runner/src/shared/ipc/shared/ipc-request.model.ts b/apps/peertube-runner/src/shared/ipc/shared/ipc-request.model.ts index 2ea1b71b2..a5e61fc65 100644 --- a/apps/peertube-runner/src/shared/ipc/shared/ipc-request.model.ts +++ b/apps/peertube-runner/src/shared/ipc/shared/ipc-request.model.ts @@ -2,7 +2,8 @@ export type IPCRequest = IPCRequestRegister | IPCRequestUnregister | IPCRequestListRegistered | - IPCRequestGracefulShutdown + IPCRequestGracefulShutdown | + IPCRequestListJobs export type IPCRequestRegister = { type: 'register' @@ -14,5 +15,6 @@ export type IPCRequestRegister = { export type IPCRequestUnregister = { type: 'unregister', url: string, runnerName: string } export type IPCRequestListRegistered = { type: 'list-registered' } +export type IPCRequestListJobs = { type: 'list-jobs' } export type IPCRequestGracefulShutdown = { type: 'graceful-shutdown' } diff --git a/apps/peertube-runner/src/shared/ipc/shared/ipc-response.model.ts b/apps/peertube-runner/src/shared/ipc/shared/ipc-response.model.ts index 475586778..dc707cad9 100644 --- a/apps/peertube-runner/src/shared/ipc/shared/ipc-response.model.ts +++ b/apps/peertube-runner/src/shared/ipc/shared/ipc-response.model.ts @@ -1,15 +1,24 @@ +import { RunnerJob } from '@peertube/peertube-models' + export type IPCResponse = { success: boolean error?: string data?: T } -export type IPCResponseData = - // list registered - { - servers: { - runnerName: string - runnerDescription: string - url: string - }[] - } +export type IPCResponseData = IPCResponseListRegistered | IPCResponseListJobs + +export type IPCResponseListRegistered = { + servers: { + runnerName: string + runnerDescription: string + url: string + }[] +} +export type IPCResponseListJobs = { + concurrency: number + processingJobs: { + serverUrl: string + job: Pick + }[] +} diff --git a/packages/tests/src/peertube-runner/live-transcoding.ts b/packages/tests/src/peertube-runner/live-transcoding.ts index cae259938..466d92a29 100644 --- a/packages/tests/src/peertube-runner/live-transcoding.ts +++ b/packages/tests/src/peertube-runner/live-transcoding.ts @@ -53,6 +53,13 @@ describe('Test Live transcoding in peertube-runner program', function () { transcoded: true }) + // Check jobs output + { + const jobsList = await peertubeRunner.listJobs() + expect(jobsList).to.contain(servers[0].url) + expect(jobsList).to.contain('live-rtmp-hls-transcoding') + } + await stopFfmpeg(ffmpegCommand) await waitUntilLiveWaitingOnAllServers(servers, video.uuid) diff --git a/packages/tests/src/shared/peertube-runner-process.ts b/packages/tests/src/shared/peertube-runner-process.ts index e215fa826..6ce0885c2 100644 --- a/packages/tests/src/shared/peertube-runner-process.ts +++ b/packages/tests/src/shared/peertube-runner-process.ts @@ -89,6 +89,13 @@ export class PeerTubeRunnerProcess { return stdout } + async listJobs () { + const args = [ 'list-jobs', ...this.buildIdArg() ] + const { stdout } = await this.runCommand(this.getRunnerPath(), args) + + return stdout + } + // --------------------------------------------------------------------------- gracefulShutdown () { diff --git a/support/doc/tools.md b/support/doc/tools.md index 39a9f1c5c..087042df6 100644 --- a/support/doc/tools.md +++ b/support/doc/tools.md @@ -359,6 +359,26 @@ sudo -u prunner peertube-runner list-registered ::: +### List jobs + +**Runner >= 0.1.0** + +To list jobs that are processed by the runner: + +::: code-group + +```bash [Shell] +peertube-runner list-jobs +peertube-runner list-jobs --include-payload +``` + +```bash [Systemd] +sudo -u prunner peertube-runner list-jobs +sudo -u prunner peertube-runner list-jobs --include-payload +``` + +::: + ### Graceful shutdown Ask the runner to shutdown when it has finished all of its current tasks: @@ -424,7 +444,7 @@ docker compose exec -u peertube peertube npm run parse-log -- --level info `--level` is optional and could be `info`/`warn`/`error` -You can also remove SQL or HTTP logs using `--not-tags` (PeerTube >= 3.2): +You can also remove SQL or HTTP logs using `--not-tags`: ::: code-group