From cb21cd243177be3bafdc7e57730fbec7cd39ba57 Mon Sep 17 00:00:00 2001 From: Simon Chan <1330321+yume-chan@users.noreply.github.com> Date: Thu, 20 Feb 2025 11:44:16 +0800 Subject: [PATCH] feat(stream): make BufferedTransformStream propagate cancel signal immediately --- .../stream-extra/src/buffered-transform.ts | 28 +++++++++++++------ 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/libraries/stream-extra/src/buffered-transform.ts b/libraries/stream-extra/src/buffered-transform.ts index df311c61..4176364a 100644 --- a/libraries/stream-extra/src/buffered-transform.ts +++ b/libraries/stream-extra/src/buffered-transform.ts @@ -4,7 +4,10 @@ import { StructEmptyError } from "@yume-chan/struct"; import { BufferedReadableStream } from "./buffered.js"; import type { PushReadableStreamController } from "./push-readable.js"; import { PushReadableStream } from "./push-readable.js"; -import type { ReadableWritablePair } from "./stream.js"; +import type { + ReadableWritablePair, + WritableStreamDefaultController, +} from "./stream.js"; import { ReadableStream, WritableStream } from "./stream.js"; // TODO: BufferedTransformStream: find better implementation @@ -25,11 +28,13 @@ export class BufferedTransformStream transform: (stream: BufferedReadableStream) => MaybePromiseLike, ) { // Convert incoming chunks to a `BufferedReadableStream` - let sourceStreamController!: PushReadableStreamController; + let bufferedStreamController!: PushReadableStreamController; + + let writableStreamController!: WritableStreamDefaultController; const buffered = new BufferedReadableStream( new PushReadableStream((controller) => { - sourceStreamController = controller; + bufferedStreamController = controller; }), ); @@ -50,21 +55,26 @@ export class BufferedTransformStream } }, cancel: (reason) => { - // Propagate cancel to the source stream - // So future writes will be rejected - return buffered.cancel(reason); + // If a `ReadableStream` is piping into `#writable`, + // This will cancel the `ReadableStream` immediately. + // If upstream is writing using `#writable`'s writer, this will + // throw errors for any future writes + return writableStreamController.error(reason); }, }); this.#writable = new WritableStream({ + start(controller) { + writableStreamController = controller; + }, async write(chunk) { - await sourceStreamController.enqueue(chunk); + await bufferedStreamController.enqueue(chunk); }, abort() { - sourceStreamController.close(); + bufferedStreamController.close(); }, close() { - sourceStreamController.close(); + bufferedStreamController.close(); }, }); }