mirror of
https://github.com/Chocobozzz/PeerTube.git
synced 2025-10-05 10:49:28 +02:00
Use private ACL for private videos in s3
This commit is contained in:
parent
3545e72c68
commit
9ab330b90d
46 changed files with 1753 additions and 845 deletions
|
@ -2,18 +2,21 @@ import { createReadStream, createWriteStream, ensureDir, ReadStream } from 'fs-e
|
|||
import { dirname } from 'path'
|
||||
import { Readable } from 'stream'
|
||||
import {
|
||||
_Object,
|
||||
CompleteMultipartUploadCommandOutput,
|
||||
DeleteObjectCommand,
|
||||
GetObjectCommand,
|
||||
ListObjectsV2Command,
|
||||
PutObjectCommandInput
|
||||
PutObjectAclCommand,
|
||||
PutObjectCommandInput,
|
||||
S3Client
|
||||
} from '@aws-sdk/client-s3'
|
||||
import { Upload } from '@aws-sdk/lib-storage'
|
||||
import { pipelinePromise } from '@server/helpers/core-utils'
|
||||
import { isArray } from '@server/helpers/custom-validators/misc'
|
||||
import { logger } from '@server/helpers/logger'
|
||||
import { CONFIG } from '@server/initializers/config'
|
||||
import { getPrivateUrl } from '../urls'
|
||||
import { getInternalUrl } from '../urls'
|
||||
import { getClient } from './client'
|
||||
import { lTags } from './logger'
|
||||
|
||||
|
@ -44,69 +47,91 @@ async function storeObject (options: {
|
|||
inputPath: string
|
||||
objectStorageKey: string
|
||||
bucketInfo: BucketInfo
|
||||
isPrivate: boolean
|
||||
}): Promise<string> {
|
||||
const { inputPath, objectStorageKey, bucketInfo } = options
|
||||
const { inputPath, objectStorageKey, bucketInfo, isPrivate } = options
|
||||
|
||||
logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
|
||||
|
||||
const fileStream = createReadStream(inputPath)
|
||||
|
||||
return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo })
|
||||
return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo, isPrivate })
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
async function removeObject (filename: string, bucketInfo: BucketInfo) {
|
||||
const command = new DeleteObjectCommand({
|
||||
function updateObjectACL (options: {
|
||||
objectStorageKey: string
|
||||
bucketInfo: BucketInfo
|
||||
isPrivate: boolean
|
||||
}) {
|
||||
const { objectStorageKey, bucketInfo, isPrivate } = options
|
||||
|
||||
const key = buildKey(objectStorageKey, bucketInfo)
|
||||
|
||||
logger.debug('Updating ACL file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags())
|
||||
|
||||
const command = new PutObjectAclCommand({
|
||||
Bucket: bucketInfo.BUCKET_NAME,
|
||||
Key: buildKey(filename, bucketInfo)
|
||||
Key: key,
|
||||
ACL: getACL(isPrivate)
|
||||
})
|
||||
|
||||
return getClient().send(command)
|
||||
}
|
||||
|
||||
async function removePrefix (prefix: string, bucketInfo: BucketInfo) {
|
||||
const s3Client = getClient()
|
||||
function updatePrefixACL (options: {
|
||||
prefix: string
|
||||
bucketInfo: BucketInfo
|
||||
isPrivate: boolean
|
||||
}) {
|
||||
const { prefix, bucketInfo, isPrivate } = options
|
||||
|
||||
const commandPrefix = bucketInfo.PREFIX + prefix
|
||||
const listCommand = new ListObjectsV2Command({
|
||||
logger.debug('Updating ACL of files in prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
|
||||
|
||||
return applyOnPrefix({
|
||||
prefix,
|
||||
bucketInfo,
|
||||
commandBuilder: obj => {
|
||||
return new PutObjectAclCommand({
|
||||
Bucket: bucketInfo.BUCKET_NAME,
|
||||
Key: obj.Key,
|
||||
ACL: getACL(isPrivate)
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function removeObject (objectStorageKey: string, bucketInfo: BucketInfo) {
|
||||
const key = buildKey(objectStorageKey, bucketInfo)
|
||||
|
||||
logger.debug('Removing file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags())
|
||||
|
||||
const command = new DeleteObjectCommand({
|
||||
Bucket: bucketInfo.BUCKET_NAME,
|
||||
Prefix: commandPrefix
|
||||
Key: key
|
||||
})
|
||||
|
||||
const listedObjects = await s3Client.send(listCommand)
|
||||
return getClient().send(command)
|
||||
}
|
||||
|
||||
function removePrefix (prefix: string, bucketInfo: BucketInfo) {
|
||||
// FIXME: use bulk delete when s3ninja will support this operation
|
||||
// const deleteParams = {
|
||||
// Bucket: bucketInfo.BUCKET_NAME,
|
||||
// Delete: { Objects: [] }
|
||||
// }
|
||||
|
||||
if (isArray(listedObjects.Contents) !== true) {
|
||||
const message = `Cannot remove ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.`
|
||||
logger.debug('Removing prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
|
||||
|
||||
logger.error(message, { response: listedObjects, ...lTags() })
|
||||
throw new Error(message)
|
||||
}
|
||||
|
||||
for (const object of listedObjects.Contents) {
|
||||
const command = new DeleteObjectCommand({
|
||||
Bucket: bucketInfo.BUCKET_NAME,
|
||||
Key: object.Key
|
||||
})
|
||||
|
||||
await s3Client.send(command)
|
||||
|
||||
// FIXME: use bulk delete when s3ninja will support this operation
|
||||
// deleteParams.Delete.Objects.push({ Key: object.Key })
|
||||
}
|
||||
|
||||
// FIXME: use bulk delete when s3ninja will support this operation
|
||||
// const deleteCommand = new DeleteObjectsCommand(deleteParams)
|
||||
// await s3Client.send(deleteCommand)
|
||||
|
||||
// Repeat if not all objects could be listed at once (limit of 1000?)
|
||||
if (listedObjects.IsTruncated) await removePrefix(prefix, bucketInfo)
|
||||
return applyOnPrefix({
|
||||
prefix,
|
||||
bucketInfo,
|
||||
commandBuilder: obj => {
|
||||
return new DeleteObjectCommand({
|
||||
Bucket: bucketInfo.BUCKET_NAME,
|
||||
Key: obj.Key
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
@ -138,14 +163,42 @@ function buildKey (key: string, bucketInfo: BucketInfo) {
|
|||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
async function createObjectReadStream (options: {
|
||||
key: string
|
||||
bucketInfo: BucketInfo
|
||||
rangeHeader: string
|
||||
}) {
|
||||
const { key, bucketInfo, rangeHeader } = options
|
||||
|
||||
const command = new GetObjectCommand({
|
||||
Bucket: bucketInfo.BUCKET_NAME,
|
||||
Key: buildKey(key, bucketInfo),
|
||||
Range: rangeHeader
|
||||
})
|
||||
|
||||
const response = await getClient().send(command)
|
||||
|
||||
return response.Body as Readable
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export {
|
||||
BucketInfo,
|
||||
buildKey,
|
||||
|
||||
storeObject,
|
||||
|
||||
removeObject,
|
||||
removePrefix,
|
||||
|
||||
makeAvailable,
|
||||
listKeysOfPrefix
|
||||
|
||||
updateObjectACL,
|
||||
updatePrefixACL,
|
||||
|
||||
listKeysOfPrefix,
|
||||
createObjectReadStream
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
@ -154,17 +207,15 @@ async function uploadToStorage (options: {
|
|||
content: ReadStream
|
||||
objectStorageKey: string
|
||||
bucketInfo: BucketInfo
|
||||
isPrivate: boolean
|
||||
}) {
|
||||
const { content, objectStorageKey, bucketInfo } = options
|
||||
const { content, objectStorageKey, bucketInfo, isPrivate } = options
|
||||
|
||||
const input: PutObjectCommandInput = {
|
||||
Body: content,
|
||||
Bucket: bucketInfo.BUCKET_NAME,
|
||||
Key: buildKey(objectStorageKey, bucketInfo)
|
||||
}
|
||||
|
||||
if (CONFIG.OBJECT_STORAGE.UPLOAD_ACL) {
|
||||
input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL
|
||||
Key: buildKey(objectStorageKey, bucketInfo),
|
||||
ACL: getACL(isPrivate)
|
||||
}
|
||||
|
||||
const parallelUploads3 = new Upload({
|
||||
|
@ -194,5 +245,50 @@ async function uploadToStorage (options: {
|
|||
bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
|
||||
)
|
||||
|
||||
return getPrivateUrl(bucketInfo, objectStorageKey)
|
||||
return getInternalUrl(bucketInfo, objectStorageKey)
|
||||
}
|
||||
|
||||
async function applyOnPrefix (options: {
|
||||
prefix: string
|
||||
bucketInfo: BucketInfo
|
||||
commandBuilder: (obj: _Object) => Parameters<S3Client['send']>[0]
|
||||
|
||||
continuationToken?: string
|
||||
}) {
|
||||
const { prefix, bucketInfo, commandBuilder, continuationToken } = options
|
||||
|
||||
const s3Client = getClient()
|
||||
|
||||
const commandPrefix = bucketInfo.PREFIX + prefix
|
||||
const listCommand = new ListObjectsV2Command({
|
||||
Bucket: bucketInfo.BUCKET_NAME,
|
||||
Prefix: commandPrefix,
|
||||
ContinuationToken: continuationToken
|
||||
})
|
||||
|
||||
const listedObjects = await s3Client.send(listCommand)
|
||||
|
||||
if (isArray(listedObjects.Contents) !== true) {
|
||||
const message = `Cannot apply function on ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.`
|
||||
|
||||
logger.error(message, { response: listedObjects, ...lTags() })
|
||||
throw new Error(message)
|
||||
}
|
||||
|
||||
for (const object of listedObjects.Contents) {
|
||||
const command = commandBuilder(object)
|
||||
|
||||
await s3Client.send(command)
|
||||
}
|
||||
|
||||
// Repeat if not all objects could be listed at once (limit of 1000?)
|
||||
if (listedObjects.IsTruncated) {
|
||||
await applyOnPrefix({ ...options, continuationToken: listedObjects.ContinuationToken })
|
||||
}
|
||||
}
|
||||
|
||||
function getACL (isPrivate: boolean) {
|
||||
return isPrivate
|
||||
? CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PRIVATE
|
||||
: CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PUBLIC
|
||||
}
|
||||
|
|
|
@ -1,10 +1,14 @@
|
|||
import { CONFIG } from '@server/initializers/config'
|
||||
import { OBJECT_STORAGE_PROXY_PATHS, WEBSERVER } from '@server/initializers/constants'
|
||||
import { MVideoUUID } from '@server/types/models'
|
||||
import { BucketInfo, buildKey, getEndpointParsed } from './shared'
|
||||
|
||||
function getPrivateUrl (config: BucketInfo, keyWithoutPrefix: string) {
|
||||
function getInternalUrl (config: BucketInfo, keyWithoutPrefix: string) {
|
||||
return getBaseUrl(config) + buildKey(keyWithoutPrefix, config)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function getWebTorrentPublicFileUrl (fileUrl: string) {
|
||||
const baseUrl = CONFIG.OBJECT_STORAGE.VIDEOS.BASE_URL
|
||||
if (!baseUrl) return fileUrl
|
||||
|
@ -19,11 +23,28 @@ function getHLSPublicFileUrl (fileUrl: string) {
|
|||
return replaceByBaseUrl(fileUrl, baseUrl)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function getHLSPrivateFileUrl (video: MVideoUUID, filename: string) {
|
||||
return WEBSERVER.URL + OBJECT_STORAGE_PROXY_PATHS.STREAMING_PLAYLISTS.PRIVATE_HLS + video.uuid + `/${filename}`
|
||||
}
|
||||
|
||||
function getWebTorrentPrivateFileUrl (filename: string) {
|
||||
return WEBSERVER.URL + OBJECT_STORAGE_PROXY_PATHS.PRIVATE_WEBSEED + filename
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export {
|
||||
getPrivateUrl,
|
||||
getInternalUrl,
|
||||
|
||||
getWebTorrentPublicFileUrl,
|
||||
replaceByBaseUrl,
|
||||
getHLSPublicFileUrl
|
||||
getHLSPublicFileUrl,
|
||||
|
||||
getHLSPrivateFileUrl,
|
||||
getWebTorrentPrivateFileUrl,
|
||||
|
||||
replaceByBaseUrl
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
|
|
@ -5,7 +5,17 @@ import { MStreamingPlaylistVideo, MVideo, MVideoFile } from '@server/types/model
|
|||
import { getHLSDirectory } from '../paths'
|
||||
import { VideoPathManager } from '../video-path-manager'
|
||||
import { generateHLSObjectBaseStorageKey, generateHLSObjectStorageKey, generateWebTorrentObjectStorageKey } from './keys'
|
||||
import { listKeysOfPrefix, lTags, makeAvailable, removeObject, removePrefix, storeObject } from './shared'
|
||||
import {
|
||||
createObjectReadStream,
|
||||
listKeysOfPrefix,
|
||||
lTags,
|
||||
makeAvailable,
|
||||
removeObject,
|
||||
removePrefix,
|
||||
storeObject,
|
||||
updateObjectACL,
|
||||
updatePrefixACL
|
||||
} from './shared'
|
||||
|
||||
function listHLSFileKeysOf (playlist: MStreamingPlaylistVideo) {
|
||||
return listKeysOfPrefix(generateHLSObjectBaseStorageKey(playlist), CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS)
|
||||
|
@ -17,7 +27,8 @@ function storeHLSFileFromFilename (playlist: MStreamingPlaylistVideo, filename:
|
|||
return storeObject({
|
||||
inputPath: join(getHLSDirectory(playlist.Video), filename),
|
||||
objectStorageKey: generateHLSObjectStorageKey(playlist, filename),
|
||||
bucketInfo: CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS
|
||||
bucketInfo: CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS,
|
||||
isPrivate: playlist.Video.hasPrivateStaticPath()
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -25,7 +36,8 @@ function storeHLSFileFromPath (playlist: MStreamingPlaylistVideo, path: string)
|
|||
return storeObject({
|
||||
inputPath: path,
|
||||
objectStorageKey: generateHLSObjectStorageKey(playlist, basename(path)),
|
||||
bucketInfo: CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS
|
||||
bucketInfo: CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS,
|
||||
isPrivate: playlist.Video.hasPrivateStaticPath()
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -35,7 +47,26 @@ function storeWebTorrentFile (video: MVideo, file: MVideoFile) {
|
|||
return storeObject({
|
||||
inputPath: VideoPathManager.Instance.getFSVideoFileOutputPath(video, file),
|
||||
objectStorageKey: generateWebTorrentObjectStorageKey(file.filename),
|
||||
bucketInfo: CONFIG.OBJECT_STORAGE.VIDEOS
|
||||
bucketInfo: CONFIG.OBJECT_STORAGE.VIDEOS,
|
||||
isPrivate: video.hasPrivateStaticPath()
|
||||
})
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function updateWebTorrentFileACL (video: MVideo, file: MVideoFile) {
|
||||
return updateObjectACL({
|
||||
objectStorageKey: generateWebTorrentObjectStorageKey(file.filename),
|
||||
bucketInfo: CONFIG.OBJECT_STORAGE.VIDEOS,
|
||||
isPrivate: video.hasPrivateStaticPath()
|
||||
})
|
||||
}
|
||||
|
||||
function updateHLSFilesACL (playlist: MStreamingPlaylistVideo) {
|
||||
return updatePrefixACL({
|
||||
prefix: generateHLSObjectBaseStorageKey(playlist),
|
||||
bucketInfo: CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS,
|
||||
isPrivate: playlist.Video.hasPrivateStaticPath()
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -87,6 +118,39 @@ async function makeWebTorrentFileAvailable (filename: string, destination: strin
|
|||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function getWebTorrentFileReadStream (options: {
|
||||
filename: string
|
||||
rangeHeader: string
|
||||
}) {
|
||||
const { filename, rangeHeader } = options
|
||||
|
||||
const key = generateWebTorrentObjectStorageKey(filename)
|
||||
|
||||
return createObjectReadStream({
|
||||
key,
|
||||
bucketInfo: CONFIG.OBJECT_STORAGE.VIDEOS,
|
||||
rangeHeader
|
||||
})
|
||||
}
|
||||
|
||||
function getHLSFileReadStream (options: {
|
||||
playlist: MStreamingPlaylistVideo
|
||||
filename: string
|
||||
rangeHeader: string
|
||||
}) {
|
||||
const { playlist, filename, rangeHeader } = options
|
||||
|
||||
const key = generateHLSObjectStorageKey(playlist, filename)
|
||||
|
||||
return createObjectReadStream({
|
||||
key,
|
||||
bucketInfo: CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS,
|
||||
rangeHeader
|
||||
})
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export {
|
||||
listHLSFileKeysOf,
|
||||
|
||||
|
@ -94,10 +158,16 @@ export {
|
|||
storeHLSFileFromFilename,
|
||||
storeHLSFileFromPath,
|
||||
|
||||
updateWebTorrentFileACL,
|
||||
updateHLSFilesACL,
|
||||
|
||||
removeHLSObjectStorage,
|
||||
removeHLSFileObjectStorage,
|
||||
removeWebTorrentObjectStorage,
|
||||
|
||||
makeWebTorrentFileAvailable,
|
||||
makeHLSFileAvailable
|
||||
makeHLSFileAvailable,
|
||||
|
||||
getWebTorrentFileReadStream,
|
||||
getHLSFileReadStream
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue