mirror of
https://github.com/Chocobozzz/PeerTube.git
synced 2025-10-05 19:42:24 +02:00
server/server -> server/core
This commit is contained in:
parent
114327d4ce
commit
5a3d0650c9
838 changed files with 111 additions and 111 deletions
4
server/core/lib/live/index.ts
Normal file
4
server/core/lib/live/index.ts
Normal file
|
@ -0,0 +1,4 @@
|
|||
export * from './live-manager.js'
|
||||
export * from './live-quota-store.js'
|
||||
export * from './live-segment-sha-store.js'
|
||||
export * from './live-utils.js'
|
557
server/core/lib/live/live-manager.ts
Normal file
557
server/core/lib/live/live-manager.ts
Normal file
|
@ -0,0 +1,557 @@
|
|||
import { readdir, readFile } from 'fs/promises'
|
||||
import { createServer, Server } from 'net'
|
||||
import context from 'node-media-server/src/node_core_ctx.js'
|
||||
import nodeMediaServerLogger from 'node-media-server/src/node_core_logger.js'
|
||||
import NodeRtmpSession from 'node-media-server/src/node_rtmp_session.js'
|
||||
import { join } from 'path'
|
||||
import { createServer as createServerTLS, Server as ServerTLS } from 'tls'
|
||||
import { pick, wait } from '@peertube/peertube-core-utils'
|
||||
import { LiveVideoError, LiveVideoErrorType, VideoState } from '@peertube/peertube-models'
|
||||
import { logger, loggerTagsFactory } from '@server/helpers/logger.js'
|
||||
import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config.js'
|
||||
import { VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants.js'
|
||||
import { sequelizeTypescript } from '@server/initializers/database.js'
|
||||
import { RunnerJobModel } from '@server/models/runner/runner-job.js'
|
||||
import { UserModel } from '@server/models/user/user.js'
|
||||
import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting.js'
|
||||
import { VideoLiveSessionModel } from '@server/models/video/video-live-session.js'
|
||||
import { VideoLiveModel } from '@server/models/video/video-live.js'
|
||||
import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist.js'
|
||||
import { VideoModel } from '@server/models/video/video.js'
|
||||
import { MVideo, MVideoLiveSession, MVideoLiveVideo, MVideoLiveVideoWithSetting } from '@server/types/models/index.js'
|
||||
import {
|
||||
ffprobePromise,
|
||||
getVideoStreamBitrate,
|
||||
getVideoStreamDimensionsInfo,
|
||||
getVideoStreamFPS,
|
||||
hasAudioStream
|
||||
} from '@peertube/peertube-ffmpeg'
|
||||
import { federateVideoIfNeeded } from '../activitypub/videos/index.js'
|
||||
import { JobQueue } from '../job-queue/index.js'
|
||||
import { getLiveReplayBaseDirectory } from '../paths.js'
|
||||
import { PeerTubeSocket } from '../peertube-socket.js'
|
||||
import { Hooks } from '../plugins/hooks.js'
|
||||
import { computeResolutionsToTranscode } from '../transcoding/transcoding-resolutions.js'
|
||||
import { LiveQuotaStore } from './live-quota-store.js'
|
||||
import { cleanupAndDestroyPermanentLive, getLiveSegmentTime } from './live-utils.js'
|
||||
import { MuxingSession } from './shared/index.js'
|
||||
|
||||
// Disable node media server logs
|
||||
nodeMediaServerLogger.setLogType(0)
|
||||
|
||||
const config = {
|
||||
rtmp: {
|
||||
port: CONFIG.LIVE.RTMP.PORT,
|
||||
chunk_size: VIDEO_LIVE.RTMP.CHUNK_SIZE,
|
||||
gop_cache: VIDEO_LIVE.RTMP.GOP_CACHE,
|
||||
ping: VIDEO_LIVE.RTMP.PING,
|
||||
ping_timeout: VIDEO_LIVE.RTMP.PING_TIMEOUT
|
||||
}
|
||||
}
|
||||
|
||||
const lTags = loggerTagsFactory('live')
|
||||
|
||||
class LiveManager {
|
||||
|
||||
private static instance: LiveManager
|
||||
|
||||
private readonly muxingSessions = new Map<string, MuxingSession>()
|
||||
private readonly videoSessions = new Map<string, string>()
|
||||
|
||||
private rtmpServer: Server
|
||||
private rtmpsServer: ServerTLS
|
||||
|
||||
private running = false
|
||||
|
||||
private constructor () {
|
||||
}
|
||||
|
||||
init () {
|
||||
const events = this.getContext().nodeEvent
|
||||
events.on('postPublish', (sessionId: string, streamPath: string) => {
|
||||
logger.debug('RTMP received stream', { id: sessionId, streamPath, ...lTags(sessionId) })
|
||||
|
||||
const splittedPath = streamPath.split('/')
|
||||
if (splittedPath.length !== 3 || splittedPath[1] !== VIDEO_LIVE.RTMP.BASE_PATH) {
|
||||
logger.warn('Live path is incorrect.', { streamPath, ...lTags(sessionId) })
|
||||
return this.abortSession(sessionId)
|
||||
}
|
||||
|
||||
const session = this.getContext().sessions.get(sessionId)
|
||||
const inputLocalUrl = session.inputOriginLocalUrl + streamPath
|
||||
const inputPublicUrl = session.inputOriginPublicUrl + streamPath
|
||||
|
||||
this.handleSession({ sessionId, inputPublicUrl, inputLocalUrl, streamKey: splittedPath[2] })
|
||||
.catch(err => logger.error('Cannot handle sessions.', { err, ...lTags(sessionId) }))
|
||||
})
|
||||
|
||||
events.on('donePublish', sessionId => {
|
||||
logger.info('Live session ended.', { sessionId, ...lTags(sessionId) })
|
||||
|
||||
// Force session aborting, so we kill ffmpeg even if it still has data to process (slow CPU)
|
||||
setTimeout(() => this.abortSession(sessionId), 2000)
|
||||
})
|
||||
|
||||
registerConfigChangedHandler(() => {
|
||||
if (!this.running && CONFIG.LIVE.ENABLED === true) {
|
||||
this.run().catch(err => logger.error('Cannot run live server.', { err }))
|
||||
return
|
||||
}
|
||||
|
||||
if (this.running && CONFIG.LIVE.ENABLED === false) {
|
||||
this.stop()
|
||||
}
|
||||
})
|
||||
|
||||
// Cleanup broken lives, that were terminated by a server restart for example
|
||||
this.handleBrokenLives()
|
||||
.catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() }))
|
||||
}
|
||||
|
||||
async run () {
|
||||
this.running = true
|
||||
|
||||
if (CONFIG.LIVE.RTMP.ENABLED) {
|
||||
logger.info('Running RTMP server on port %d', CONFIG.LIVE.RTMP.PORT, lTags())
|
||||
|
||||
this.rtmpServer = createServer(socket => {
|
||||
const session = new NodeRtmpSession(config, socket)
|
||||
|
||||
session.inputOriginLocalUrl = 'rtmp://127.0.0.1:' + CONFIG.LIVE.RTMP.PORT
|
||||
session.inputOriginPublicUrl = WEBSERVER.RTMP_URL
|
||||
session.run()
|
||||
})
|
||||
|
||||
this.rtmpServer.on('error', err => {
|
||||
logger.error('Cannot run RTMP server.', { err, ...lTags() })
|
||||
})
|
||||
|
||||
this.rtmpServer.listen(CONFIG.LIVE.RTMP.PORT, CONFIG.LIVE.RTMP.HOSTNAME)
|
||||
}
|
||||
|
||||
if (CONFIG.LIVE.RTMPS.ENABLED) {
|
||||
logger.info('Running RTMPS server on port %d', CONFIG.LIVE.RTMPS.PORT, lTags())
|
||||
|
||||
const [ key, cert ] = await Promise.all([
|
||||
readFile(CONFIG.LIVE.RTMPS.KEY_FILE),
|
||||
readFile(CONFIG.LIVE.RTMPS.CERT_FILE)
|
||||
])
|
||||
const serverOptions = { key, cert }
|
||||
|
||||
this.rtmpsServer = createServerTLS(serverOptions, socket => {
|
||||
const session = new NodeRtmpSession(config, socket)
|
||||
|
||||
session.inputOriginLocalUrl = 'rtmps://127.0.0.1:' + CONFIG.LIVE.RTMPS.PORT
|
||||
session.inputOriginPublicUrl = WEBSERVER.RTMPS_URL
|
||||
session.run()
|
||||
})
|
||||
|
||||
this.rtmpsServer.on('error', err => {
|
||||
logger.error('Cannot run RTMPS server.', { err, ...lTags() })
|
||||
})
|
||||
|
||||
this.rtmpsServer.listen(CONFIG.LIVE.RTMPS.PORT, CONFIG.LIVE.RTMPS.HOSTNAME)
|
||||
}
|
||||
}
|
||||
|
||||
stop () {
|
||||
this.running = false
|
||||
|
||||
if (this.rtmpServer) {
|
||||
logger.info('Stopping RTMP server.', lTags())
|
||||
|
||||
this.rtmpServer.close()
|
||||
this.rtmpServer = undefined
|
||||
}
|
||||
|
||||
if (this.rtmpsServer) {
|
||||
logger.info('Stopping RTMPS server.', lTags())
|
||||
|
||||
this.rtmpsServer.close()
|
||||
this.rtmpsServer = undefined
|
||||
}
|
||||
|
||||
// Sessions is an object
|
||||
this.getContext().sessions.forEach((session: any) => {
|
||||
if (session instanceof NodeRtmpSession) {
|
||||
session.stop()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
isRunning () {
|
||||
return !!this.rtmpServer
|
||||
}
|
||||
|
||||
hasSession (sessionId: string) {
|
||||
return this.getContext().sessions.has(sessionId)
|
||||
}
|
||||
|
||||
stopSessionOf (videoUUID: string, error: LiveVideoErrorType | null) {
|
||||
const sessionId = this.videoSessions.get(videoUUID)
|
||||
if (!sessionId) {
|
||||
logger.debug('No live session to stop for video %s', videoUUID, lTags(sessionId, videoUUID))
|
||||
return
|
||||
}
|
||||
|
||||
logger.info('Stopping live session of video %s', videoUUID, { error, ...lTags(sessionId, videoUUID) })
|
||||
|
||||
this.saveEndingSession(videoUUID, error)
|
||||
.catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId, videoUUID) }))
|
||||
|
||||
this.videoSessions.delete(videoUUID)
|
||||
this.abortSession(sessionId)
|
||||
}
|
||||
|
||||
private getContext () {
|
||||
return context
|
||||
}
|
||||
|
||||
private abortSession (sessionId: string) {
|
||||
const session = this.getContext().sessions.get(sessionId)
|
||||
if (session) {
|
||||
session.stop()
|
||||
this.getContext().sessions.delete(sessionId)
|
||||
}
|
||||
|
||||
const muxingSession = this.muxingSessions.get(sessionId)
|
||||
if (muxingSession) {
|
||||
// Muxing session will fire and event so we correctly cleanup the session
|
||||
muxingSession.abort()
|
||||
|
||||
this.muxingSessions.delete(sessionId)
|
||||
}
|
||||
}
|
||||
|
||||
private async handleSession (options: {
|
||||
sessionId: string
|
||||
inputLocalUrl: string
|
||||
inputPublicUrl: string
|
||||
streamKey: string
|
||||
}) {
|
||||
const { inputLocalUrl, inputPublicUrl, sessionId, streamKey } = options
|
||||
|
||||
const videoLive = await VideoLiveModel.loadByStreamKey(streamKey)
|
||||
if (!videoLive) {
|
||||
logger.warn('Unknown live video with stream key %s.', streamKey, lTags(sessionId))
|
||||
return this.abortSession(sessionId)
|
||||
}
|
||||
|
||||
const video = videoLive.Video
|
||||
if (video.isBlacklisted()) {
|
||||
logger.warn('Video is blacklisted. Refusing stream %s.', streamKey, lTags(sessionId, video.uuid))
|
||||
return this.abortSession(sessionId)
|
||||
}
|
||||
|
||||
if (this.videoSessions.has(video.uuid)) {
|
||||
logger.warn('Video %s has already a live session. Refusing stream %s.', video.uuid, streamKey, lTags(sessionId, video.uuid))
|
||||
return this.abortSession(sessionId)
|
||||
}
|
||||
|
||||
// Cleanup old potential live (could happen with a permanent live)
|
||||
const oldStreamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id)
|
||||
if (oldStreamingPlaylist) {
|
||||
if (!videoLive.permanentLive) throw new Error('Found previous session in a non permanent live: ' + video.uuid)
|
||||
|
||||
await cleanupAndDestroyPermanentLive(video, oldStreamingPlaylist)
|
||||
}
|
||||
|
||||
this.videoSessions.set(video.uuid, sessionId)
|
||||
|
||||
const now = Date.now()
|
||||
const probe = await ffprobePromise(inputLocalUrl)
|
||||
|
||||
const [ { resolution, ratio }, fps, bitrate, hasAudio ] = await Promise.all([
|
||||
getVideoStreamDimensionsInfo(inputLocalUrl, probe),
|
||||
getVideoStreamFPS(inputLocalUrl, probe),
|
||||
getVideoStreamBitrate(inputLocalUrl, probe),
|
||||
hasAudioStream(inputLocalUrl, probe)
|
||||
])
|
||||
|
||||
logger.info(
|
||||
'%s probing took %d ms (bitrate: %d, fps: %d, resolution: %d)',
|
||||
inputLocalUrl, Date.now() - now, bitrate, fps, resolution, lTags(sessionId, video.uuid)
|
||||
)
|
||||
|
||||
const allResolutions = await Hooks.wrapObject(
|
||||
this.buildAllResolutionsToTranscode(resolution, hasAudio),
|
||||
'filter:transcoding.auto.resolutions-to-transcode.result',
|
||||
{ video }
|
||||
)
|
||||
|
||||
logger.info(
|
||||
'Handling live video of original resolution %d.', resolution,
|
||||
{ allResolutions, ...lTags(sessionId, video.uuid) }
|
||||
)
|
||||
|
||||
return this.runMuxingSession({
|
||||
sessionId,
|
||||
videoLive,
|
||||
|
||||
inputLocalUrl,
|
||||
inputPublicUrl,
|
||||
fps,
|
||||
bitrate,
|
||||
ratio,
|
||||
allResolutions,
|
||||
hasAudio
|
||||
})
|
||||
}
|
||||
|
||||
private async runMuxingSession (options: {
|
||||
sessionId: string
|
||||
videoLive: MVideoLiveVideoWithSetting
|
||||
|
||||
inputLocalUrl: string
|
||||
inputPublicUrl: string
|
||||
|
||||
fps: number
|
||||
bitrate: number
|
||||
ratio: number
|
||||
allResolutions: number[]
|
||||
hasAudio: boolean
|
||||
}) {
|
||||
const { sessionId, videoLive } = options
|
||||
const videoUUID = videoLive.Video.uuid
|
||||
const localLTags = lTags(sessionId, videoUUID)
|
||||
|
||||
const liveSession = await this.saveStartingSession(videoLive)
|
||||
|
||||
const user = await UserModel.loadByLiveId(videoLive.id)
|
||||
LiveQuotaStore.Instance.addNewLive(user.id, sessionId)
|
||||
|
||||
const muxingSession = new MuxingSession({
|
||||
context: this.getContext(),
|
||||
sessionId,
|
||||
videoLive,
|
||||
user,
|
||||
|
||||
...pick(options, [ 'inputLocalUrl', 'inputPublicUrl', 'bitrate', 'ratio', 'fps', 'allResolutions', 'hasAudio' ])
|
||||
})
|
||||
|
||||
muxingSession.on('live-ready', () => this.publishAndFederateLive(videoLive, localLTags))
|
||||
|
||||
muxingSession.on('bad-socket-health', ({ videoUUID }) => {
|
||||
logger.error(
|
||||
'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' +
|
||||
' Stopping session of video %s.', videoUUID,
|
||||
localLTags
|
||||
)
|
||||
|
||||
this.stopSessionOf(videoUUID, LiveVideoError.BAD_SOCKET_HEALTH)
|
||||
})
|
||||
|
||||
muxingSession.on('duration-exceeded', ({ videoUUID }) => {
|
||||
logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags)
|
||||
|
||||
this.stopSessionOf(videoUUID, LiveVideoError.DURATION_EXCEEDED)
|
||||
})
|
||||
|
||||
muxingSession.on('quota-exceeded', ({ videoUUID }) => {
|
||||
logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags)
|
||||
|
||||
this.stopSessionOf(videoUUID, LiveVideoError.QUOTA_EXCEEDED)
|
||||
})
|
||||
|
||||
muxingSession.on('transcoding-error', ({ videoUUID }) => {
|
||||
this.stopSessionOf(videoUUID, LiveVideoError.FFMPEG_ERROR)
|
||||
})
|
||||
|
||||
muxingSession.on('transcoding-end', ({ videoUUID }) => {
|
||||
this.onMuxingFFmpegEnd(videoUUID, sessionId)
|
||||
})
|
||||
|
||||
muxingSession.on('after-cleanup', ({ videoUUID }) => {
|
||||
this.muxingSessions.delete(sessionId)
|
||||
|
||||
LiveQuotaStore.Instance.removeLive(user.id, sessionId)
|
||||
|
||||
muxingSession.destroy()
|
||||
|
||||
return this.onAfterMuxingCleanup({ videoUUID, liveSession })
|
||||
.catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags }))
|
||||
})
|
||||
|
||||
this.muxingSessions.set(sessionId, muxingSession)
|
||||
|
||||
muxingSession.runMuxing()
|
||||
.catch(err => {
|
||||
logger.error('Cannot run muxing.', { err, ...localLTags })
|
||||
this.abortSession(sessionId)
|
||||
})
|
||||
}
|
||||
|
||||
private async publishAndFederateLive (live: MVideoLiveVideo, localLTags: { tags: string[] }) {
|
||||
const videoId = live.videoId
|
||||
|
||||
try {
|
||||
const video = await VideoModel.loadFull(videoId)
|
||||
|
||||
logger.info('Will publish and federate live %s.', video.url, localLTags)
|
||||
|
||||
video.state = VideoState.PUBLISHED
|
||||
video.publishedAt = new Date()
|
||||
await video.save()
|
||||
|
||||
live.Video = video
|
||||
|
||||
await wait(getLiveSegmentTime(live.latencyMode) * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION)
|
||||
|
||||
try {
|
||||
await federateVideoIfNeeded(video, false)
|
||||
} catch (err) {
|
||||
logger.error('Cannot federate live video %s.', video.url, { err, ...localLTags })
|
||||
}
|
||||
|
||||
PeerTubeSocket.Instance.sendVideoLiveNewState(video)
|
||||
|
||||
Hooks.runAction('action:live.video.state.updated', { video })
|
||||
} catch (err) {
|
||||
logger.error('Cannot save/federate live video %d.', videoId, { err, ...localLTags })
|
||||
}
|
||||
}
|
||||
|
||||
private onMuxingFFmpegEnd (videoUUID: string, sessionId: string) {
|
||||
// Session already cleaned up
|
||||
if (!this.videoSessions.has(videoUUID)) return
|
||||
|
||||
this.videoSessions.delete(videoUUID)
|
||||
|
||||
this.saveEndingSession(videoUUID, null)
|
||||
.catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) }))
|
||||
}
|
||||
|
||||
private async onAfterMuxingCleanup (options: {
|
||||
videoUUID: string
|
||||
liveSession?: MVideoLiveSession
|
||||
cleanupNow?: boolean // Default false
|
||||
}) {
|
||||
const { videoUUID, liveSession: liveSessionArg, cleanupNow = false } = options
|
||||
|
||||
logger.debug('Live of video %s has been cleaned up. Moving to its next state.', videoUUID, lTags(videoUUID))
|
||||
|
||||
try {
|
||||
const fullVideo = await VideoModel.loadFull(videoUUID)
|
||||
if (!fullVideo) return
|
||||
|
||||
const live = await VideoLiveModel.loadByVideoId(fullVideo.id)
|
||||
|
||||
const liveSession = liveSessionArg ?? await VideoLiveSessionModel.findLatestSessionOf(fullVideo.id)
|
||||
|
||||
// On server restart during a live
|
||||
if (!liveSession.endDate) {
|
||||
liveSession.endDate = new Date()
|
||||
await liveSession.save()
|
||||
}
|
||||
|
||||
JobQueue.Instance.createJobAsync({
|
||||
type: 'video-live-ending',
|
||||
payload: {
|
||||
videoId: fullVideo.id,
|
||||
|
||||
replayDirectory: live.saveReplay
|
||||
? await this.findReplayDirectory(fullVideo)
|
||||
: undefined,
|
||||
|
||||
liveSessionId: liveSession.id,
|
||||
streamingPlaylistId: fullVideo.getHLSPlaylist()?.id,
|
||||
|
||||
publishedAt: fullVideo.publishedAt.toISOString()
|
||||
},
|
||||
|
||||
delay: cleanupNow
|
||||
? 0
|
||||
: VIDEO_LIVE.CLEANUP_DELAY
|
||||
})
|
||||
|
||||
fullVideo.state = live.permanentLive
|
||||
? VideoState.WAITING_FOR_LIVE
|
||||
: VideoState.LIVE_ENDED
|
||||
|
||||
await fullVideo.save()
|
||||
|
||||
PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo)
|
||||
|
||||
await federateVideoIfNeeded(fullVideo, false)
|
||||
|
||||
Hooks.runAction('action:live.video.state.updated', { video: fullVideo })
|
||||
} catch (err) {
|
||||
logger.error('Cannot save/federate new video state of live streaming of video %s.', videoUUID, { err, ...lTags(videoUUID) })
|
||||
}
|
||||
}
|
||||
|
||||
private async handleBrokenLives () {
|
||||
await RunnerJobModel.cancelAllJobs({ type: 'live-rtmp-hls-transcoding' })
|
||||
|
||||
const videoUUIDs = await VideoModel.listPublishedLiveUUIDs()
|
||||
|
||||
for (const uuid of videoUUIDs) {
|
||||
await this.onAfterMuxingCleanup({ videoUUID: uuid, cleanupNow: true })
|
||||
}
|
||||
}
|
||||
|
||||
private async findReplayDirectory (video: MVideo) {
|
||||
const directory = getLiveReplayBaseDirectory(video)
|
||||
const files = await readdir(directory)
|
||||
|
||||
if (files.length === 0) return undefined
|
||||
|
||||
return join(directory, files.sort().reverse()[0])
|
||||
}
|
||||
|
||||
private buildAllResolutionsToTranscode (originResolution: number, hasAudio: boolean) {
|
||||
const includeInput = CONFIG.LIVE.TRANSCODING.ALWAYS_TRANSCODE_ORIGINAL_RESOLUTION
|
||||
|
||||
const resolutionsEnabled = CONFIG.LIVE.TRANSCODING.ENABLED
|
||||
? computeResolutionsToTranscode({ input: originResolution, type: 'live', includeInput, strictLower: false, hasAudio })
|
||||
: []
|
||||
|
||||
if (resolutionsEnabled.length === 0) {
|
||||
return [ originResolution ]
|
||||
}
|
||||
|
||||
return resolutionsEnabled
|
||||
}
|
||||
|
||||
private async saveStartingSession (videoLive: MVideoLiveVideoWithSetting) {
|
||||
const replaySettings = videoLive.saveReplay
|
||||
? new VideoLiveReplaySettingModel({
|
||||
privacy: videoLive.ReplaySetting.privacy
|
||||
})
|
||||
: null
|
||||
|
||||
return sequelizeTypescript.transaction(async t => {
|
||||
if (videoLive.saveReplay) {
|
||||
await replaySettings.save({ transaction: t })
|
||||
}
|
||||
|
||||
return VideoLiveSessionModel.create({
|
||||
startDate: new Date(),
|
||||
liveVideoId: videoLive.videoId,
|
||||
saveReplay: videoLive.saveReplay,
|
||||
replaySettingId: videoLive.saveReplay ? replaySettings.id : null,
|
||||
endingProcessed: false
|
||||
}, { transaction: t })
|
||||
})
|
||||
}
|
||||
|
||||
private async saveEndingSession (videoUUID: string, error: LiveVideoErrorType | null) {
|
||||
const liveSession = await VideoLiveSessionModel.findCurrentSessionOf(videoUUID)
|
||||
if (!liveSession) return
|
||||
|
||||
liveSession.endDate = new Date()
|
||||
liveSession.error = error
|
||||
|
||||
return liveSession.save()
|
||||
}
|
||||
|
||||
static get Instance () {
|
||||
return this.instance || (this.instance = new this())
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export {
|
||||
LiveManager
|
||||
}
|
48
server/core/lib/live/live-quota-store.ts
Normal file
48
server/core/lib/live/live-quota-store.ts
Normal file
|
@ -0,0 +1,48 @@
|
|||
class LiveQuotaStore {
|
||||
|
||||
private static instance: LiveQuotaStore
|
||||
|
||||
private readonly livesPerUser = new Map<number, { sessionId: string, size: number }[]>()
|
||||
|
||||
private constructor () {
|
||||
}
|
||||
|
||||
addNewLive (userId: number, sessionId: string) {
|
||||
if (!this.livesPerUser.has(userId)) {
|
||||
this.livesPerUser.set(userId, [])
|
||||
}
|
||||
|
||||
const currentUserLive = { sessionId, size: 0 }
|
||||
const livesOfUser = this.livesPerUser.get(userId)
|
||||
livesOfUser.push(currentUserLive)
|
||||
}
|
||||
|
||||
removeLive (userId: number, sessionId: string) {
|
||||
const newLivesPerUser = this.livesPerUser.get(userId)
|
||||
.filter(o => o.sessionId !== sessionId)
|
||||
|
||||
this.livesPerUser.set(userId, newLivesPerUser)
|
||||
}
|
||||
|
||||
addQuotaTo (userId: number, sessionId: string, size: number) {
|
||||
const lives = this.livesPerUser.get(userId)
|
||||
const live = lives.find(l => l.sessionId === sessionId)
|
||||
|
||||
live.size += size
|
||||
}
|
||||
|
||||
getLiveQuotaOf (userId: number) {
|
||||
const currentLives = this.livesPerUser.get(userId)
|
||||
if (!currentLives) return 0
|
||||
|
||||
return currentLives.reduce((sum, obj) => sum + obj.size, 0)
|
||||
}
|
||||
|
||||
static get Instance () {
|
||||
return this.instance || (this.instance = new this())
|
||||
}
|
||||
}
|
||||
|
||||
export {
|
||||
LiveQuotaStore
|
||||
}
|
96
server/core/lib/live/live-segment-sha-store.ts
Normal file
96
server/core/lib/live/live-segment-sha-store.ts
Normal file
|
@ -0,0 +1,96 @@
|
|||
import { writeJson } from 'fs-extra/esm'
|
||||
import { rename } from 'fs/promises'
|
||||
import PQueue from 'p-queue'
|
||||
import { basename } from 'path'
|
||||
import { mapToJSON } from '@server/helpers/core-utils.js'
|
||||
import { logger, loggerTagsFactory } from '@server/helpers/logger.js'
|
||||
import { MStreamingPlaylistVideo } from '@server/types/models/index.js'
|
||||
import { buildSha256Segment } from '../hls.js'
|
||||
import { storeHLSFileFromPath } from '../object-storage/index.js'
|
||||
|
||||
const lTags = loggerTagsFactory('live')
|
||||
|
||||
class LiveSegmentShaStore {
|
||||
|
||||
private readonly segmentsSha256 = new Map<string, string>()
|
||||
|
||||
private readonly videoUUID: string
|
||||
|
||||
private readonly sha256Path: string
|
||||
private readonly sha256PathTMP: string
|
||||
|
||||
private readonly streamingPlaylist: MStreamingPlaylistVideo
|
||||
private readonly sendToObjectStorage: boolean
|
||||
private readonly writeQueue = new PQueue({ concurrency: 1 })
|
||||
|
||||
constructor (options: {
|
||||
videoUUID: string
|
||||
sha256Path: string
|
||||
streamingPlaylist: MStreamingPlaylistVideo
|
||||
sendToObjectStorage: boolean
|
||||
}) {
|
||||
this.videoUUID = options.videoUUID
|
||||
|
||||
this.sha256Path = options.sha256Path
|
||||
this.sha256PathTMP = options.sha256Path + '.tmp'
|
||||
|
||||
this.streamingPlaylist = options.streamingPlaylist
|
||||
this.sendToObjectStorage = options.sendToObjectStorage
|
||||
}
|
||||
|
||||
async addSegmentSha (segmentPath: string) {
|
||||
logger.debug('Adding live sha segment %s.', segmentPath, lTags(this.videoUUID))
|
||||
|
||||
const shaResult = await buildSha256Segment(segmentPath)
|
||||
|
||||
const segmentName = basename(segmentPath)
|
||||
this.segmentsSha256.set(segmentName, shaResult)
|
||||
|
||||
try {
|
||||
await this.writeToDisk()
|
||||
} catch (err) {
|
||||
logger.error('Cannot write sha segments to disk.', { err })
|
||||
}
|
||||
}
|
||||
|
||||
async removeSegmentSha (segmentPath: string) {
|
||||
const segmentName = basename(segmentPath)
|
||||
|
||||
logger.debug('Removing live sha segment %s.', segmentPath, lTags(this.videoUUID))
|
||||
|
||||
if (!this.segmentsSha256.has(segmentName)) {
|
||||
logger.warn(
|
||||
'Unknown segment in live segment hash store for video %s and segment %s.',
|
||||
this.videoUUID, segmentPath, lTags(this.videoUUID)
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
this.segmentsSha256.delete(segmentName)
|
||||
|
||||
await this.writeToDisk()
|
||||
}
|
||||
|
||||
private writeToDisk () {
|
||||
return this.writeQueue.add(async () => {
|
||||
logger.debug(`Writing segment sha JSON ${this.sha256Path} of ${this.videoUUID} on disk.`, lTags(this.videoUUID))
|
||||
|
||||
// Atomic write: use rename instead of move that is not atomic
|
||||
await writeJson(this.sha256PathTMP, mapToJSON(this.segmentsSha256))
|
||||
await rename(this.sha256PathTMP, this.sha256Path)
|
||||
|
||||
if (this.sendToObjectStorage) {
|
||||
const url = await storeHLSFileFromPath(this.streamingPlaylist, this.sha256Path)
|
||||
|
||||
if (this.streamingPlaylist.segmentsSha256Url !== url) {
|
||||
this.streamingPlaylist.segmentsSha256Url = url
|
||||
await this.streamingPlaylist.save()
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
export {
|
||||
LiveSegmentShaStore
|
||||
}
|
100
server/core/lib/live/live-utils.ts
Normal file
100
server/core/lib/live/live-utils.ts
Normal file
|
@ -0,0 +1,100 @@
|
|||
import { pathExists, remove } from 'fs-extra/esm'
|
||||
import { readdir } from 'fs/promises'
|
||||
import { basename, join } from 'path'
|
||||
import { LiveVideoLatencyMode, LiveVideoLatencyModeType, VideoStorage } from '@peertube/peertube-models'
|
||||
import { logger } from '@server/helpers/logger.js'
|
||||
import { VIDEO_LIVE } from '@server/initializers/constants.js'
|
||||
import { MStreamingPlaylist, MStreamingPlaylistVideo, MVideo } from '@server/types/models/index.js'
|
||||
import { listHLSFileKeysOf, removeHLSFileObjectStorageByFullKey, removeHLSObjectStorage } from '../object-storage/index.js'
|
||||
import { getLiveDirectory } from '../paths.js'
|
||||
|
||||
function buildConcatenatedName (segmentOrPlaylistPath: string) {
|
||||
const num = basename(segmentOrPlaylistPath).match(/^(\d+)(-|\.)/)
|
||||
|
||||
return 'concat-' + num[1] + '.ts'
|
||||
}
|
||||
|
||||
async function cleanupAndDestroyPermanentLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
|
||||
await cleanupTMPLiveFiles(video, streamingPlaylist)
|
||||
|
||||
await streamingPlaylist.destroy()
|
||||
}
|
||||
|
||||
async function cleanupUnsavedNormalLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
|
||||
const hlsDirectory = getLiveDirectory(video)
|
||||
|
||||
// We uploaded files to object storage too, remove them
|
||||
if (streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
|
||||
await removeHLSObjectStorage(streamingPlaylist.withVideo(video))
|
||||
}
|
||||
|
||||
await remove(hlsDirectory)
|
||||
|
||||
await streamingPlaylist.destroy()
|
||||
}
|
||||
|
||||
async function cleanupTMPLiveFiles (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
|
||||
await cleanupTMPLiveFilesFromObjectStorage(streamingPlaylist.withVideo(video))
|
||||
|
||||
await cleanupTMPLiveFilesFromFilesystem(video)
|
||||
}
|
||||
|
||||
function getLiveSegmentTime (latencyMode: LiveVideoLatencyModeType) {
|
||||
if (latencyMode === LiveVideoLatencyMode.SMALL_LATENCY) {
|
||||
return VIDEO_LIVE.SEGMENT_TIME_SECONDS.SMALL_LATENCY
|
||||
}
|
||||
|
||||
return VIDEO_LIVE.SEGMENT_TIME_SECONDS.DEFAULT_LATENCY
|
||||
}
|
||||
|
||||
export {
|
||||
cleanupAndDestroyPermanentLive,
|
||||
cleanupUnsavedNormalLive,
|
||||
cleanupTMPLiveFiles,
|
||||
getLiveSegmentTime,
|
||||
buildConcatenatedName
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function isTMPLiveFile (name: string) {
|
||||
return name.endsWith('.ts') ||
|
||||
name.endsWith('.m3u8') ||
|
||||
name.endsWith('.json') ||
|
||||
name.endsWith('.mpd') ||
|
||||
name.endsWith('.m4s') ||
|
||||
name.endsWith('.tmp')
|
||||
}
|
||||
|
||||
async function cleanupTMPLiveFilesFromFilesystem (video: MVideo) {
|
||||
const hlsDirectory = getLiveDirectory(video)
|
||||
|
||||
if (!await pathExists(hlsDirectory)) return
|
||||
|
||||
logger.info('Cleanup TMP live files from filesystem of %s.', hlsDirectory)
|
||||
|
||||
const files = await readdir(hlsDirectory)
|
||||
|
||||
for (const filename of files) {
|
||||
if (isTMPLiveFile(filename)) {
|
||||
const p = join(hlsDirectory, filename)
|
||||
|
||||
remove(p)
|
||||
.catch(err => logger.error('Cannot remove %s.', p, { err }))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function cleanupTMPLiveFilesFromObjectStorage (streamingPlaylist: MStreamingPlaylistVideo) {
|
||||
if (streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return
|
||||
|
||||
logger.info('Cleanup TMP live files from object storage for %s.', streamingPlaylist.Video.uuid)
|
||||
|
||||
const keys = await listHLSFileKeysOf(streamingPlaylist)
|
||||
|
||||
for (const key of keys) {
|
||||
if (isTMPLiveFile(key)) {
|
||||
await removeHLSFileObjectStorageByFullKey(key)
|
||||
}
|
||||
}
|
||||
}
|
1
server/core/lib/live/shared/index.ts
Normal file
1
server/core/lib/live/shared/index.ts
Normal file
|
@ -0,0 +1 @@
|
|||
export * from './muxing-session.js'
|
519
server/core/lib/live/shared/muxing-session.ts
Normal file
519
server/core/lib/live/shared/muxing-session.ts
Normal file
|
@ -0,0 +1,519 @@
|
|||
import Bluebird from 'bluebird'
|
||||
import { FSWatcher, watch } from 'chokidar'
|
||||
import { EventEmitter } from 'events'
|
||||
import { ensureDir } from 'fs-extra/esm'
|
||||
import { appendFile, readFile, stat } from 'fs/promises'
|
||||
import memoizee from 'memoizee'
|
||||
import PQueue from 'p-queue'
|
||||
import { basename, join } from 'path'
|
||||
import { computeOutputFPS } from '@server/helpers/ffmpeg/index.js'
|
||||
import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger.js'
|
||||
import { CONFIG } from '@server/initializers/config.js'
|
||||
import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants.js'
|
||||
import { removeHLSFileObjectStorageByPath, storeHLSFileFromContent, storeHLSFileFromPath } from '@server/lib/object-storage/index.js'
|
||||
import { VideoFileModel } from '@server/models/video/video-file.js'
|
||||
import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist.js'
|
||||
import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models/index.js'
|
||||
import { VideoStorage, VideoStreamingPlaylistType } from '@peertube/peertube-models'
|
||||
import {
|
||||
generateHLSMasterPlaylistFilename,
|
||||
generateHlsSha256SegmentsFilename,
|
||||
getLiveDirectory,
|
||||
getLiveReplayBaseDirectory
|
||||
} from '../../paths.js'
|
||||
import { isAbleToUploadVideo } from '../../user.js'
|
||||
import { LiveQuotaStore } from '../live-quota-store.js'
|
||||
import { LiveSegmentShaStore } from '../live-segment-sha-store.js'
|
||||
import { buildConcatenatedName, getLiveSegmentTime } from '../live-utils.js'
|
||||
import { AbstractTranscodingWrapper, FFmpegTranscodingWrapper, RemoteTranscodingWrapper } from './transcoding-wrapper/index.js'
|
||||
|
||||
interface MuxingSessionEvents {
|
||||
'live-ready': (options: { videoUUID: string }) => void
|
||||
|
||||
'bad-socket-health': (options: { videoUUID: string }) => void
|
||||
'duration-exceeded': (options: { videoUUID: string }) => void
|
||||
'quota-exceeded': (options: { videoUUID: string }) => void
|
||||
|
||||
'transcoding-end': (options: { videoUUID: string }) => void
|
||||
'transcoding-error': (options: { videoUUID: string }) => void
|
||||
|
||||
'after-cleanup': (options: { videoUUID: string }) => void
|
||||
}
|
||||
|
||||
declare interface MuxingSession {
|
||||
on<U extends keyof MuxingSessionEvents>(
|
||||
event: U, listener: MuxingSessionEvents[U]
|
||||
): this
|
||||
|
||||
emit<U extends keyof MuxingSessionEvents>(
|
||||
event: U, ...args: Parameters<MuxingSessionEvents[U]>
|
||||
): boolean
|
||||
}
|
||||
|
||||
class MuxingSession extends EventEmitter {
|
||||
|
||||
private transcodingWrapper: AbstractTranscodingWrapper
|
||||
|
||||
private readonly context: any
|
||||
private readonly user: MUserId
|
||||
private readonly sessionId: string
|
||||
private readonly videoLive: MVideoLiveVideo
|
||||
|
||||
private readonly inputLocalUrl: string
|
||||
private readonly inputPublicUrl: string
|
||||
|
||||
private readonly fps: number
|
||||
private readonly allResolutions: number[]
|
||||
|
||||
private readonly bitrate: number
|
||||
private readonly ratio: number
|
||||
|
||||
private readonly hasAudio: boolean
|
||||
|
||||
private readonly videoUUID: string
|
||||
private readonly saveReplay: boolean
|
||||
|
||||
private readonly outDirectory: string
|
||||
private readonly replayDirectory: string
|
||||
|
||||
private readonly lTags: LoggerTagsFn
|
||||
|
||||
// Path -> Queue
|
||||
private readonly objectStorageSendQueues = new Map<string, PQueue>()
|
||||
|
||||
private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {}
|
||||
|
||||
private streamingPlaylist: MStreamingPlaylistVideo
|
||||
private liveSegmentShaStore: LiveSegmentShaStore
|
||||
|
||||
private filesWatcher: FSWatcher
|
||||
|
||||
private masterPlaylistCreated = false
|
||||
private liveReady = false
|
||||
|
||||
private aborted = false
|
||||
|
||||
private readonly isAbleToUploadVideoWithCache = memoizee((userId: number) => {
|
||||
return isAbleToUploadVideo(userId, 1000)
|
||||
}, { maxAge: MEMOIZE_TTL.LIVE_ABLE_TO_UPLOAD })
|
||||
|
||||
private readonly hasClientSocketInBadHealthWithCache = memoizee((sessionId: string) => {
|
||||
return this.hasClientSocketInBadHealth(sessionId)
|
||||
}, { maxAge: MEMOIZE_TTL.LIVE_CHECK_SOCKET_HEALTH })
|
||||
|
||||
constructor (options: {
|
||||
context: any
|
||||
user: MUserId
|
||||
sessionId: string
|
||||
videoLive: MVideoLiveVideo
|
||||
|
||||
inputLocalUrl: string
|
||||
inputPublicUrl: string
|
||||
|
||||
fps: number
|
||||
bitrate: number
|
||||
ratio: number
|
||||
allResolutions: number[]
|
||||
hasAudio: boolean
|
||||
}) {
|
||||
super()
|
||||
|
||||
this.context = options.context
|
||||
this.user = options.user
|
||||
this.sessionId = options.sessionId
|
||||
this.videoLive = options.videoLive
|
||||
|
||||
this.inputLocalUrl = options.inputLocalUrl
|
||||
this.inputPublicUrl = options.inputPublicUrl
|
||||
|
||||
this.fps = options.fps
|
||||
|
||||
this.bitrate = options.bitrate
|
||||
this.ratio = options.ratio
|
||||
|
||||
this.hasAudio = options.hasAudio
|
||||
|
||||
this.allResolutions = options.allResolutions
|
||||
|
||||
this.videoUUID = this.videoLive.Video.uuid
|
||||
|
||||
this.saveReplay = this.videoLive.saveReplay
|
||||
|
||||
this.outDirectory = getLiveDirectory(this.videoLive.Video)
|
||||
this.replayDirectory = join(getLiveReplayBaseDirectory(this.videoLive.Video), new Date().toISOString())
|
||||
|
||||
this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID)
|
||||
}
|
||||
|
||||
async runMuxing () {
|
||||
this.streamingPlaylist = await this.createLivePlaylist()
|
||||
|
||||
this.createLiveShaStore()
|
||||
this.createFiles()
|
||||
|
||||
await this.prepareDirectories()
|
||||
|
||||
this.transcodingWrapper = this.buildTranscodingWrapper()
|
||||
|
||||
this.transcodingWrapper.on('end', () => this.onTranscodedEnded())
|
||||
this.transcodingWrapper.on('error', () => this.onTranscodingError())
|
||||
|
||||
await this.transcodingWrapper.run()
|
||||
|
||||
this.filesWatcher = watch(this.outDirectory, { depth: 0 })
|
||||
|
||||
this.watchMasterFile()
|
||||
this.watchTSFiles()
|
||||
}
|
||||
|
||||
abort () {
|
||||
if (!this.transcodingWrapper) return
|
||||
|
||||
this.aborted = true
|
||||
this.transcodingWrapper.abort()
|
||||
}
|
||||
|
||||
destroy () {
|
||||
this.removeAllListeners()
|
||||
this.isAbleToUploadVideoWithCache.clear()
|
||||
this.hasClientSocketInBadHealthWithCache.clear()
|
||||
}
|
||||
|
||||
private watchMasterFile () {
|
||||
this.filesWatcher.on('add', async path => {
|
||||
if (path !== join(this.outDirectory, this.streamingPlaylist.playlistFilename)) return
|
||||
if (this.masterPlaylistCreated === true) return
|
||||
|
||||
try {
|
||||
if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
|
||||
const masterContent = await readFile(path, 'utf-8')
|
||||
logger.debug('Uploading live master playlist on object storage for %s', this.videoUUID, { masterContent, ...this.lTags() })
|
||||
|
||||
const url = await storeHLSFileFromContent(this.streamingPlaylist, this.streamingPlaylist.playlistFilename, masterContent)
|
||||
|
||||
this.streamingPlaylist.playlistUrl = url
|
||||
}
|
||||
|
||||
this.streamingPlaylist.assignP2PMediaLoaderInfoHashes(this.videoLive.Video, this.allResolutions)
|
||||
|
||||
await this.streamingPlaylist.save()
|
||||
} catch (err) {
|
||||
logger.error('Cannot update streaming playlist.', { err, ...this.lTags() })
|
||||
}
|
||||
|
||||
this.masterPlaylistCreated = true
|
||||
|
||||
logger.info('Master playlist file for %s has been created', this.videoUUID, this.lTags())
|
||||
})
|
||||
}
|
||||
|
||||
private watchTSFiles () {
|
||||
const startStreamDateTime = new Date().getTime()
|
||||
|
||||
const addHandler = async (segmentPath: string) => {
|
||||
if (segmentPath.endsWith('.ts') !== true) return
|
||||
|
||||
logger.debug('Live add handler of TS file %s.', segmentPath, this.lTags())
|
||||
|
||||
const playlistId = this.getPlaylistIdFromTS(segmentPath)
|
||||
|
||||
const segmentsToProcess = this.segmentsToProcessPerPlaylist[playlistId] || []
|
||||
this.processSegments(segmentsToProcess)
|
||||
|
||||
this.segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ]
|
||||
|
||||
if (this.hasClientSocketInBadHealthWithCache(this.sessionId)) {
|
||||
this.emit('bad-socket-health', { videoUUID: this.videoUUID })
|
||||
return
|
||||
}
|
||||
|
||||
// Duration constraint check
|
||||
if (this.isDurationConstraintValid(startStreamDateTime) !== true) {
|
||||
this.emit('duration-exceeded', { videoUUID: this.videoUUID })
|
||||
return
|
||||
}
|
||||
|
||||
// Check user quota if the user enabled replay saving
|
||||
if (await this.isQuotaExceeded(segmentPath) === true) {
|
||||
this.emit('quota-exceeded', { videoUUID: this.videoUUID })
|
||||
}
|
||||
}
|
||||
|
||||
const deleteHandler = async (segmentPath: string) => {
|
||||
if (segmentPath.endsWith('.ts') !== true) return
|
||||
|
||||
logger.debug('Live delete handler of TS file %s.', segmentPath, this.lTags())
|
||||
|
||||
try {
|
||||
await this.liveSegmentShaStore.removeSegmentSha(segmentPath)
|
||||
} catch (err) {
|
||||
logger.warn('Cannot remove segment sha %s from sha store', segmentPath, { err, ...this.lTags() })
|
||||
}
|
||||
|
||||
if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
|
||||
try {
|
||||
await removeHLSFileObjectStorageByPath(this.streamingPlaylist, segmentPath)
|
||||
} catch (err) {
|
||||
logger.error('Cannot remove segment %s from object storage', segmentPath, { err, ...this.lTags() })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.filesWatcher.on('add', p => addHandler(p))
|
||||
this.filesWatcher.on('unlink', p => deleteHandler(p))
|
||||
}
|
||||
|
||||
private async isQuotaExceeded (segmentPath: string) {
|
||||
if (this.saveReplay !== true) return false
|
||||
if (this.aborted) return false
|
||||
|
||||
try {
|
||||
const segmentStat = await stat(segmentPath)
|
||||
|
||||
LiveQuotaStore.Instance.addQuotaTo(this.user.id, this.sessionId, segmentStat.size)
|
||||
|
||||
const canUpload = await this.isAbleToUploadVideoWithCache(this.user.id)
|
||||
|
||||
return canUpload !== true
|
||||
} catch (err) {
|
||||
logger.error('Cannot stat %s or check quota of %d.', segmentPath, this.user.id, { err, ...this.lTags() })
|
||||
}
|
||||
}
|
||||
|
||||
private createFiles () {
|
||||
for (let i = 0; i < this.allResolutions.length; i++) {
|
||||
const resolution = this.allResolutions[i]
|
||||
|
||||
const file = new VideoFileModel({
|
||||
resolution,
|
||||
size: -1,
|
||||
extname: '.ts',
|
||||
infoHash: null,
|
||||
fps: this.fps,
|
||||
storage: this.streamingPlaylist.storage,
|
||||
videoStreamingPlaylistId: this.streamingPlaylist.id
|
||||
})
|
||||
|
||||
VideoFileModel.customUpsert(file, 'streaming-playlist', null)
|
||||
.catch(err => logger.error('Cannot create file for live streaming.', { err, ...this.lTags() }))
|
||||
}
|
||||
}
|
||||
|
||||
private async prepareDirectories () {
|
||||
await ensureDir(this.outDirectory)
|
||||
|
||||
if (this.videoLive.saveReplay === true) {
|
||||
await ensureDir(this.replayDirectory)
|
||||
}
|
||||
}
|
||||
|
||||
private isDurationConstraintValid (streamingStartTime: number) {
|
||||
const maxDuration = CONFIG.LIVE.MAX_DURATION
|
||||
// No limit
|
||||
if (maxDuration < 0) return true
|
||||
|
||||
const now = new Date().getTime()
|
||||
const max = streamingStartTime + maxDuration
|
||||
|
||||
return now <= max
|
||||
}
|
||||
|
||||
private processSegments (segmentPaths: string[]) {
|
||||
Bluebird.mapSeries(segmentPaths, previousSegment => this.processSegment(previousSegment))
|
||||
.catch(err => {
|
||||
if (this.aborted) return
|
||||
|
||||
logger.error('Cannot process segments', { err, ...this.lTags() })
|
||||
})
|
||||
}
|
||||
|
||||
private async processSegment (segmentPath: string) {
|
||||
// Add sha hash of previous segments, because ffmpeg should have finished generating them
|
||||
await this.liveSegmentShaStore.addSegmentSha(segmentPath)
|
||||
|
||||
if (this.saveReplay) {
|
||||
await this.addSegmentToReplay(segmentPath)
|
||||
}
|
||||
|
||||
if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
|
||||
try {
|
||||
await storeHLSFileFromPath(this.streamingPlaylist, segmentPath)
|
||||
|
||||
await this.processM3U8ToObjectStorage(segmentPath)
|
||||
} catch (err) {
|
||||
logger.error('Cannot store TS segment %s in object storage', segmentPath, { err, ...this.lTags() })
|
||||
}
|
||||
}
|
||||
|
||||
// Master playlist and segment JSON file are created, live is ready
|
||||
if (this.masterPlaylistCreated && !this.liveReady) {
|
||||
this.liveReady = true
|
||||
|
||||
this.emit('live-ready', { videoUUID: this.videoUUID })
|
||||
}
|
||||
}
|
||||
|
||||
private async processM3U8ToObjectStorage (segmentPath: string) {
|
||||
const m3u8Path = join(this.outDirectory, this.getPlaylistNameFromTS(segmentPath))
|
||||
|
||||
logger.debug('Process M3U8 file %s.', m3u8Path, this.lTags())
|
||||
|
||||
const segmentName = basename(segmentPath)
|
||||
|
||||
const playlistContent = await readFile(m3u8Path, 'utf-8')
|
||||
// Remove new chunk references, that will be processed later
|
||||
const filteredPlaylistContent = playlistContent.substring(0, playlistContent.lastIndexOf(segmentName) + segmentName.length) + '\n'
|
||||
|
||||
try {
|
||||
if (!this.objectStorageSendQueues.has(m3u8Path)) {
|
||||
this.objectStorageSendQueues.set(m3u8Path, new PQueue({ concurrency: 1 }))
|
||||
}
|
||||
|
||||
const queue = this.objectStorageSendQueues.get(m3u8Path)
|
||||
await queue.add(() => storeHLSFileFromContent(this.streamingPlaylist, m3u8Path, filteredPlaylistContent))
|
||||
} catch (err) {
|
||||
logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() })
|
||||
}
|
||||
}
|
||||
|
||||
private onTranscodingError () {
|
||||
this.emit('transcoding-error', ({ videoUUID: this.videoUUID }))
|
||||
}
|
||||
|
||||
private onTranscodedEnded () {
|
||||
this.emit('transcoding-end', ({ videoUUID: this.videoUUID }))
|
||||
|
||||
logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.inputLocalUrl, this.lTags())
|
||||
|
||||
setTimeout(() => {
|
||||
// Wait latest segments generation, and close watchers
|
||||
|
||||
const promise = this.filesWatcher?.close() || Promise.resolve()
|
||||
promise
|
||||
.then(() => {
|
||||
// Process remaining segments hash
|
||||
for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) {
|
||||
this.processSegments(this.segmentsToProcessPerPlaylist[key])
|
||||
}
|
||||
})
|
||||
.catch(err => {
|
||||
logger.error(
|
||||
'Cannot close watchers of %s or process remaining hash segments.', this.outDirectory,
|
||||
{ err, ...this.lTags() }
|
||||
)
|
||||
})
|
||||
|
||||
this.emit('after-cleanup', { videoUUID: this.videoUUID })
|
||||
}, 1000)
|
||||
}
|
||||
|
||||
private hasClientSocketInBadHealth (sessionId: string) {
|
||||
const rtmpSession = this.context.sessions.get(sessionId)
|
||||
|
||||
if (!rtmpSession) {
|
||||
logger.warn('Cannot get session %s to check players socket health.', sessionId, this.lTags())
|
||||
return
|
||||
}
|
||||
|
||||
for (const playerSessionId of rtmpSession.players) {
|
||||
const playerSession = this.context.sessions.get(playerSessionId)
|
||||
|
||||
if (!playerSession) {
|
||||
logger.error('Cannot get player session %s to check socket health.', playerSession, this.lTags())
|
||||
continue
|
||||
}
|
||||
|
||||
if (playerSession.socket.writableLength > VIDEO_LIVE.MAX_SOCKET_WAITING_DATA) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
private async addSegmentToReplay (segmentPath: string) {
|
||||
const segmentName = basename(segmentPath)
|
||||
const dest = join(this.replayDirectory, buildConcatenatedName(segmentName))
|
||||
|
||||
try {
|
||||
const data = await readFile(segmentPath)
|
||||
|
||||
await appendFile(dest, data)
|
||||
} catch (err) {
|
||||
logger.error('Cannot copy segment %s to replay directory.', segmentPath, { err, ...this.lTags() })
|
||||
}
|
||||
}
|
||||
|
||||
private async createLivePlaylist (): Promise<MStreamingPlaylistVideo> {
|
||||
const playlist = await VideoStreamingPlaylistModel.loadOrGenerate(this.videoLive.Video)
|
||||
|
||||
playlist.playlistFilename = generateHLSMasterPlaylistFilename(true)
|
||||
playlist.segmentsSha256Filename = generateHlsSha256SegmentsFilename(true)
|
||||
|
||||
playlist.p2pMediaLoaderPeerVersion = P2P_MEDIA_LOADER_PEER_VERSION
|
||||
playlist.type = VideoStreamingPlaylistType.HLS
|
||||
|
||||
playlist.storage = CONFIG.OBJECT_STORAGE.ENABLED
|
||||
? VideoStorage.OBJECT_STORAGE
|
||||
: VideoStorage.FILE_SYSTEM
|
||||
|
||||
return playlist.save()
|
||||
}
|
||||
|
||||
private createLiveShaStore () {
|
||||
this.liveSegmentShaStore = new LiveSegmentShaStore({
|
||||
videoUUID: this.videoLive.Video.uuid,
|
||||
sha256Path: join(this.outDirectory, this.streamingPlaylist.segmentsSha256Filename),
|
||||
streamingPlaylist: this.streamingPlaylist,
|
||||
sendToObjectStorage: CONFIG.OBJECT_STORAGE.ENABLED
|
||||
})
|
||||
}
|
||||
|
||||
private buildTranscodingWrapper () {
|
||||
const options = {
|
||||
streamingPlaylist: this.streamingPlaylist,
|
||||
videoLive: this.videoLive,
|
||||
|
||||
lTags: this.lTags,
|
||||
|
||||
sessionId: this.sessionId,
|
||||
inputLocalUrl: this.inputLocalUrl,
|
||||
inputPublicUrl: this.inputPublicUrl,
|
||||
|
||||
toTranscode: this.allResolutions.map(resolution => ({
|
||||
resolution,
|
||||
fps: computeOutputFPS({ inputFPS: this.fps, resolution })
|
||||
})),
|
||||
|
||||
fps: this.fps,
|
||||
bitrate: this.bitrate,
|
||||
ratio: this.ratio,
|
||||
hasAudio: this.hasAudio,
|
||||
|
||||
segmentListSize: VIDEO_LIVE.SEGMENTS_LIST_SIZE,
|
||||
segmentDuration: getLiveSegmentTime(this.videoLive.latencyMode),
|
||||
|
||||
outDirectory: this.outDirectory
|
||||
}
|
||||
|
||||
return CONFIG.LIVE.TRANSCODING.ENABLED && CONFIG.LIVE.TRANSCODING.REMOTE_RUNNERS.ENABLED
|
||||
? new RemoteTranscodingWrapper(options)
|
||||
: new FFmpegTranscodingWrapper(options)
|
||||
}
|
||||
|
||||
private getPlaylistIdFromTS (segmentPath: string) {
|
||||
const playlistIdMatcher = /^([\d+])-/
|
||||
|
||||
return basename(segmentPath).match(playlistIdMatcher)[1]
|
||||
}
|
||||
|
||||
private getPlaylistNameFromTS (segmentPath: string) {
|
||||
return `${this.getPlaylistIdFromTS(segmentPath)}.m3u8`
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export {
|
||||
MuxingSession
|
||||
}
|
|
@ -0,0 +1,111 @@
|
|||
import { LiveVideoErrorType } from '@peertube/peertube-models'
|
||||
import { LoggerTagsFn } from '@server/helpers/logger.js'
|
||||
import { MStreamingPlaylistVideo, MVideoLiveVideo } from '@server/types/models/index.js'
|
||||
import EventEmitter from 'events'
|
||||
|
||||
interface TranscodingWrapperEvents {
|
||||
'end': () => void
|
||||
|
||||
'error': (options: { err: Error }) => void
|
||||
}
|
||||
|
||||
declare interface AbstractTranscodingWrapper {
|
||||
on<U extends keyof TranscodingWrapperEvents>(
|
||||
event: U, listener: TranscodingWrapperEvents[U]
|
||||
): this
|
||||
|
||||
emit<U extends keyof TranscodingWrapperEvents>(
|
||||
event: U, ...args: Parameters<TranscodingWrapperEvents[U]>
|
||||
): boolean
|
||||
}
|
||||
|
||||
interface AbstractTranscodingWrapperOptions {
|
||||
streamingPlaylist: MStreamingPlaylistVideo
|
||||
videoLive: MVideoLiveVideo
|
||||
|
||||
lTags: LoggerTagsFn
|
||||
|
||||
sessionId: string
|
||||
inputLocalUrl: string
|
||||
inputPublicUrl: string
|
||||
|
||||
fps: number
|
||||
toTranscode: {
|
||||
resolution: number
|
||||
fps: number
|
||||
}[]
|
||||
|
||||
bitrate: number
|
||||
ratio: number
|
||||
hasAudio: boolean
|
||||
|
||||
segmentListSize: number
|
||||
segmentDuration: number
|
||||
|
||||
outDirectory: string
|
||||
}
|
||||
|
||||
abstract class AbstractTranscodingWrapper extends EventEmitter {
|
||||
protected readonly videoLive: MVideoLiveVideo
|
||||
|
||||
protected readonly toTranscode: {
|
||||
resolution: number
|
||||
fps: number
|
||||
}[]
|
||||
|
||||
protected readonly sessionId: string
|
||||
protected readonly inputLocalUrl: string
|
||||
protected readonly inputPublicUrl: string
|
||||
|
||||
protected readonly fps: number
|
||||
protected readonly bitrate: number
|
||||
protected readonly ratio: number
|
||||
protected readonly hasAudio: boolean
|
||||
|
||||
protected readonly segmentListSize: number
|
||||
protected readonly segmentDuration: number
|
||||
|
||||
protected readonly videoUUID: string
|
||||
|
||||
protected readonly outDirectory: string
|
||||
|
||||
protected readonly lTags: LoggerTagsFn
|
||||
|
||||
protected readonly streamingPlaylist: MStreamingPlaylistVideo
|
||||
|
||||
constructor (options: AbstractTranscodingWrapperOptions) {
|
||||
super()
|
||||
|
||||
this.lTags = options.lTags
|
||||
|
||||
this.videoLive = options.videoLive
|
||||
this.videoUUID = options.videoLive.Video.uuid
|
||||
this.streamingPlaylist = options.streamingPlaylist
|
||||
|
||||
this.sessionId = options.sessionId
|
||||
this.inputLocalUrl = options.inputLocalUrl
|
||||
this.inputPublicUrl = options.inputPublicUrl
|
||||
|
||||
this.fps = options.fps
|
||||
this.toTranscode = options.toTranscode
|
||||
|
||||
this.bitrate = options.bitrate
|
||||
this.ratio = options.ratio
|
||||
this.hasAudio = options.hasAudio
|
||||
|
||||
this.segmentListSize = options.segmentListSize
|
||||
this.segmentDuration = options.segmentDuration
|
||||
|
||||
this.outDirectory = options.outDirectory
|
||||
}
|
||||
|
||||
abstract run (): Promise<void>
|
||||
|
||||
abstract abort (error?: LiveVideoErrorType): void
|
||||
}
|
||||
|
||||
export {
|
||||
type AbstractTranscodingWrapperOptions,
|
||||
|
||||
AbstractTranscodingWrapper
|
||||
}
|
|
@ -0,0 +1,107 @@
|
|||
import { FfmpegCommand } from 'fluent-ffmpeg'
|
||||
import { getFFmpegCommandWrapperOptions } from '@server/helpers/ffmpeg/index.js'
|
||||
import { logger } from '@server/helpers/logger.js'
|
||||
import { CONFIG } from '@server/initializers/config.js'
|
||||
import { VIDEO_LIVE } from '@server/initializers/constants.js'
|
||||
import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles.js'
|
||||
import { FFmpegLive } from '@peertube/peertube-ffmpeg'
|
||||
import { getLiveSegmentTime } from '../../live-utils.js'
|
||||
import { AbstractTranscodingWrapper } from './abstract-transcoding-wrapper.js'
|
||||
|
||||
export class FFmpegTranscodingWrapper extends AbstractTranscodingWrapper {
|
||||
private ffmpegCommand: FfmpegCommand
|
||||
|
||||
private aborted = false
|
||||
private errored = false
|
||||
private ended = false
|
||||
|
||||
async run () {
|
||||
this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED
|
||||
? await this.buildFFmpegLive().getLiveTranscodingCommand({
|
||||
inputUrl: this.inputLocalUrl,
|
||||
|
||||
outPath: this.outDirectory,
|
||||
masterPlaylistName: this.streamingPlaylist.playlistFilename,
|
||||
|
||||
segmentListSize: this.segmentListSize,
|
||||
segmentDuration: this.segmentDuration,
|
||||
|
||||
toTranscode: this.toTranscode,
|
||||
|
||||
bitrate: this.bitrate,
|
||||
ratio: this.ratio,
|
||||
|
||||
hasAudio: this.hasAudio
|
||||
})
|
||||
: this.buildFFmpegLive().getLiveMuxingCommand({
|
||||
inputUrl: this.inputLocalUrl,
|
||||
outPath: this.outDirectory,
|
||||
|
||||
masterPlaylistName: this.streamingPlaylist.playlistFilename,
|
||||
|
||||
segmentListSize: VIDEO_LIVE.SEGMENTS_LIST_SIZE,
|
||||
segmentDuration: getLiveSegmentTime(this.videoLive.latencyMode)
|
||||
})
|
||||
|
||||
logger.info('Running local live muxing/transcoding for %s.', this.videoUUID, this.lTags())
|
||||
|
||||
let ffmpegShellCommand: string
|
||||
this.ffmpegCommand.on('start', cmdline => {
|
||||
ffmpegShellCommand = cmdline
|
||||
|
||||
logger.debug('Running ffmpeg command for live', { ffmpegShellCommand, ...this.lTags() })
|
||||
})
|
||||
|
||||
this.ffmpegCommand.on('error', (err, stdout, stderr) => {
|
||||
this.onFFmpegError({ err, stdout, stderr, ffmpegShellCommand })
|
||||
})
|
||||
|
||||
this.ffmpegCommand.on('end', () => {
|
||||
this.onFFmpegEnded()
|
||||
})
|
||||
|
||||
this.ffmpegCommand.run()
|
||||
}
|
||||
|
||||
abort () {
|
||||
if (this.ended || this.errored || this.aborted) return
|
||||
|
||||
logger.debug('Killing ffmpeg after live abort of ' + this.videoUUID, this.lTags())
|
||||
|
||||
this.ffmpegCommand.kill('SIGINT')
|
||||
|
||||
this.aborted = true
|
||||
this.emit('end')
|
||||
}
|
||||
|
||||
private onFFmpegError (options: {
|
||||
err: any
|
||||
stdout: string
|
||||
stderr: string
|
||||
ffmpegShellCommand: string
|
||||
}) {
|
||||
const { err, stdout, stderr, ffmpegShellCommand } = options
|
||||
|
||||
// Don't care that we killed the ffmpeg process
|
||||
if (err?.message?.includes('Exiting normally')) return
|
||||
if (this.ended || this.errored || this.aborted) return
|
||||
|
||||
logger.error('FFmpeg transcoding error.', { err, stdout, stderr, ffmpegShellCommand, ...this.lTags() })
|
||||
|
||||
this.errored = true
|
||||
this.emit('error', { err })
|
||||
}
|
||||
|
||||
private onFFmpegEnded () {
|
||||
if (this.ended || this.errored || this.aborted) return
|
||||
|
||||
logger.debug('Live ffmpeg transcoding ended for ' + this.videoUUID, this.lTags())
|
||||
|
||||
this.ended = true
|
||||
this.emit('end')
|
||||
}
|
||||
|
||||
private buildFFmpegLive () {
|
||||
return new FFmpegLive(getFFmpegCommandWrapperOptions('live', VideoTranscodingProfilesManager.Instance.getAvailableEncoders()))
|
||||
}
|
||||
}
|
3
server/core/lib/live/shared/transcoding-wrapper/index.ts
Normal file
3
server/core/lib/live/shared/transcoding-wrapper/index.ts
Normal file
|
@ -0,0 +1,3 @@
|
|||
export * from './abstract-transcoding-wrapper.js'
|
||||
export * from './ffmpeg-transcoding-wrapper.js'
|
||||
export * from './remote-transcoding-wrapper.js'
|
|
@ -0,0 +1,21 @@
|
|||
import { LiveRTMPHLSTranscodingJobHandler } from '@server/lib/runners/index.js'
|
||||
import { AbstractTranscodingWrapper } from './abstract-transcoding-wrapper.js'
|
||||
|
||||
export class RemoteTranscodingWrapper extends AbstractTranscodingWrapper {
|
||||
async run () {
|
||||
await new LiveRTMPHLSTranscodingJobHandler().create({
|
||||
rtmpUrl: this.inputPublicUrl,
|
||||
sessionId: this.sessionId,
|
||||
toTranscode: this.toTranscode,
|
||||
video: this.videoLive.Video,
|
||||
outputDirectory: this.outDirectory,
|
||||
playlist: this.streamingPlaylist,
|
||||
segmentListSize: this.segmentListSize,
|
||||
segmentDuration: this.segmentDuration
|
||||
})
|
||||
}
|
||||
|
||||
abort () {
|
||||
this.emit('end')
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue