diff --git a/libraries/stream-extra/src/consumable.ts b/libraries/stream-extra/src/consumable.ts index b91a54b2..5f2d901c 100644 --- a/libraries/stream-extra/src/consumable.ts +++ b/libraries/stream-extra/src/consumable.ts @@ -1,133 +1,22 @@ import { PromiseResolver, isPromiseLike } from "@yume-chan/async"; import type { - QueuingStrategy, - WritableStreamDefaultController, - WritableStreamDefaultWriter, -} from "./stream.js"; + ConsumableReadableStreamController, + ConsumableReadableStreamSource, + ConsumableWritableStreamSink, +} from "./consumable/index.js"; import { - ReadableStream as NativeReadableStream, - WritableStream as NativeWritableStream, -} from "./stream.js"; + ConsumableReadableStream, + ConsumableWrapWritableStream, + ConsumableWritableStream, +} from "./consumable/index.js"; import type { Task } from "./task.js"; import { createTask } from "./task.js"; -// Workaround https://github.com/evanw/esbuild/issues/3923 -class WritableStream extends NativeWritableStream> { - static async write( - writer: WritableStreamDefaultWriter>, - value: T, - ) { - const consumable = new Consumable(value); - await writer.write(consumable); - await consumable.consumed; - } - - constructor( - sink: Consumable.WritableStreamSink, - strategy?: QueuingStrategy, - ) { - let wrappedStrategy: QueuingStrategy> | undefined; - if (strategy) { - wrappedStrategy = {}; - if ("highWaterMark" in strategy) { - wrappedStrategy.highWaterMark = strategy.highWaterMark; - } - if ("size" in strategy) { - wrappedStrategy.size = (chunk) => { - return strategy.size!( - chunk instanceof Consumable ? chunk.value : chunk, - ); - }; - } - } - - super( - { - start(controller) { - return sink.start?.(controller); - }, - async write(chunk, controller) { - await chunk.tryConsume((chunk) => - sink.write?.(chunk, controller), - ); - }, - abort(reason) { - return sink.abort?.(reason); - }, - close() { - return sink.close?.(); - }, - }, - wrappedStrategy, - ); - } -} - -class ReadableStream extends NativeReadableStream> { - static async enqueue( - controller: { enqueue: (chunk: Consumable) => void }, - chunk: T, - ) { - const output = new Consumable(chunk); - controller.enqueue(output); - await output.consumed; - } - - constructor( - source: Consumable.ReadableStreamSource, - strategy?: QueuingStrategy, - ) { - let wrappedController: - | Consumable.ReadableStreamController - | undefined; - - let wrappedStrategy: QueuingStrategy> | undefined; - if (strategy) { - wrappedStrategy = {}; - if ("highWaterMark" in strategy) { - wrappedStrategy.highWaterMark = strategy.highWaterMark; - } - if ("size" in strategy) { - wrappedStrategy.size = (chunk) => { - return strategy.size!(chunk.value); - }; - } - } - - super( - { - async start(controller) { - wrappedController = { - async enqueue(chunk) { - await ReadableStream.enqueue(controller, chunk); - }, - close() { - controller.close(); - }, - error(reason) { - controller.error(reason); - }, - }; - - await source.start?.(wrappedController); - }, - async pull() { - await source.pull?.(wrappedController!); - }, - async cancel(reason) { - await source.cancel?.(reason); - }, - }, - wrappedStrategy, - ); - } -} - export class Consumable { - static readonly WritableStream = WritableStream; - - static readonly ReadableStream = ReadableStream; + static readonly WritableStream = ConsumableWritableStream; + static readonly WrapWritableStream = ConsumableWrapWritableStream; + static readonly ReadableStream = ConsumableReadableStream; readonly #task: Task; readonly #resolver: PromiseResolver; @@ -176,35 +65,14 @@ export class Consumable { } export namespace Consumable { - export interface WritableStreamSink { - start?( - controller: WritableStreamDefaultController, - ): void | PromiseLike; - write?( - chunk: T, - controller: WritableStreamDefaultController, - ): void | PromiseLike; - abort?(reason: unknown): void | PromiseLike; - close?(): void | PromiseLike; - } + export type WritableStreamSink = ConsumableWritableStreamSink; + export type WritableStream = typeof ConsumableWritableStream; - export type WritableStream = typeof Consumable.WritableStream; + export type WrapWritableStream = + typeof ConsumableWrapWritableStream; - export interface ReadableStreamController { - enqueue(chunk: T): Promise; - close(): void; - error(reason: unknown): void; - } - - export interface ReadableStreamSource { - start?( - controller: ReadableStreamController, - ): void | PromiseLike; - pull?( - controller: ReadableStreamController, - ): void | PromiseLike; - cancel?(reason: unknown): void | PromiseLike; - } - - export type ReadableStream = typeof Consumable.ReadableStream; + export type ReadableStreamController = + ConsumableReadableStreamController; + export type ReadableStreamSource = ConsumableReadableStreamSource; + export type ReadableStream = typeof ConsumableReadableStream; } diff --git a/libraries/stream-extra/src/consumable/index.ts b/libraries/stream-extra/src/consumable/index.ts new file mode 100644 index 00000000..1a69dba2 --- /dev/null +++ b/libraries/stream-extra/src/consumable/index.ts @@ -0,0 +1,3 @@ +export * from "./readable.js"; +export * from "./wrap-writable.js"; +export * from "./writable.js"; diff --git a/libraries/stream-extra/src/consumable/readable.ts b/libraries/stream-extra/src/consumable/readable.ts new file mode 100644 index 00000000..d3fe5c92 --- /dev/null +++ b/libraries/stream-extra/src/consumable/readable.ts @@ -0,0 +1,80 @@ +import { Consumable } from "../consumable.js"; +import type { QueuingStrategy } from "../stream.js"; +import { ReadableStream } from "../stream.js"; + +export interface ConsumableReadableStreamController { + enqueue(chunk: T): Promise; + close(): void; + error(reason: unknown): void; +} + +export interface ConsumableReadableStreamSource { + start?( + controller: ConsumableReadableStreamController, + ): void | PromiseLike; + pull?( + controller: ConsumableReadableStreamController, + ): void | PromiseLike; + cancel?(reason: unknown): void | PromiseLike; +} + +export class ConsumableReadableStream extends ReadableStream> { + static async enqueue( + controller: { enqueue: (chunk: Consumable) => void }, + chunk: T, + ) { + const output = new Consumable(chunk); + controller.enqueue(output); + await output.consumed; + } + + constructor( + source: ConsumableReadableStreamSource, + strategy?: QueuingStrategy, + ) { + let wrappedController!: ConsumableReadableStreamController; + + let wrappedStrategy: QueuingStrategy> | undefined; + if (strategy) { + wrappedStrategy = {}; + if ("highWaterMark" in strategy) { + wrappedStrategy.highWaterMark = strategy.highWaterMark; + } + if ("size" in strategy) { + wrappedStrategy.size = (chunk) => { + return strategy.size!(chunk.value); + }; + } + } + + super( + { + start(controller) { + wrappedController = { + enqueue(chunk) { + return ConsumableReadableStream.enqueue( + controller, + chunk, + ); + }, + close() { + controller.close(); + }, + error(reason) { + controller.error(reason); + }, + }; + + return source.start?.(wrappedController); + }, + pull() { + return source.pull?.(wrappedController); + }, + cancel(reason) { + return source.cancel?.(reason); + }, + }, + wrappedStrategy, + ); + } +} diff --git a/libraries/stream-extra/src/consumable/wrap-writable.spec.ts b/libraries/stream-extra/src/consumable/wrap-writable.spec.ts new file mode 100644 index 00000000..dfd747fb --- /dev/null +++ b/libraries/stream-extra/src/consumable/wrap-writable.spec.ts @@ -0,0 +1,41 @@ +import * as assert from "node:assert"; +import { describe, it } from "node:test"; + +describe("Consumable", () => { + describe("WritableStream", () => { + it("should not pause the source stream while piping", async () => { + let step = 0; + + const stream = new WritableStream({ + write(chunk) { + switch (step) { + case 2: + assert.strictEqual(chunk, "a"); + step += 1; + break; + case 3: + assert.strictEqual(chunk, "b"); + step += 1; + break; + } + }, + }); + + const readable = new ReadableStream({ + start(controller) { + controller.enqueue("a"); + assert.strictEqual(step, 0); + step += 1; + + controller.enqueue("b"); + assert.strictEqual(step, 1); + step += 1; + + controller.close(); + }, + }); + + await readable.pipeTo(stream); + }); + }); +}); diff --git a/libraries/stream-extra/src/consumable/wrap-writable.ts b/libraries/stream-extra/src/consumable/wrap-writable.ts new file mode 100644 index 00000000..ca6edca0 --- /dev/null +++ b/libraries/stream-extra/src/consumable/wrap-writable.ts @@ -0,0 +1,21 @@ +import type { Consumable } from "../consumable.js"; +import { WritableStream } from "../stream.js"; + +export class ConsumableWrapWritableStream extends WritableStream< + Consumable +> { + constructor(stream: WritableStream) { + const writer = stream.getWriter(); + super({ + write(chunk) { + return chunk.tryConsume((chunk) => writer.write(chunk)); + }, + abort(reason) { + return writer.abort(reason); + }, + close() { + return writer.close(); + }, + }); + } +} diff --git a/libraries/stream-extra/src/consumable/writable.ts b/libraries/stream-extra/src/consumable/writable.ts new file mode 100644 index 00000000..9bf18186 --- /dev/null +++ b/libraries/stream-extra/src/consumable/writable.ts @@ -0,0 +1,72 @@ +import { Consumable } from "../consumable.js"; +import type { + QueuingStrategy, + WritableStreamDefaultController, + WritableStreamDefaultWriter, +} from "../stream.js"; +import { WritableStream } from "../stream.js"; + +export interface ConsumableWritableStreamSink { + start?( + controller: WritableStreamDefaultController, + ): void | PromiseLike; + write?( + chunk: T, + controller: WritableStreamDefaultController, + ): void | PromiseLike; + abort?(reason: unknown): void | PromiseLike; + close?(): void | PromiseLike; +} + +export class ConsumableWritableStream extends WritableStream< + Consumable +> { + static async write( + writer: WritableStreamDefaultWriter>, + value: T, + ) { + const consumable = new Consumable(value); + await writer.write(consumable); + await consumable.consumed; + } + + constructor( + sink: ConsumableWritableStreamSink, + strategy?: QueuingStrategy, + ) { + let wrappedStrategy: QueuingStrategy> | undefined; + if (strategy) { + wrappedStrategy = {}; + if ("highWaterMark" in strategy) { + wrappedStrategy.highWaterMark = strategy.highWaterMark; + } + if ("size" in strategy) { + wrappedStrategy.size = (chunk) => { + return strategy.size!( + chunk instanceof Consumable ? chunk.value : chunk, + ); + }; + } + } + + super( + { + start(controller) { + return sink.start?.(controller); + }, + write(chunk, controller) { + return chunk.tryConsume((chunk) => + sink.write?.(chunk, controller), + ); + }, + abort(reason) { + return sink.abort?.(reason); + }, + close() { + return sink.close?.(); + }, + }, + wrappedStrategy, + ); + } +} diff --git a/libraries/stream-extra/src/maybe-consumable.spec.ts b/libraries/stream-extra/src/maybe-consumable.spec.ts new file mode 100644 index 00000000..f23a905f --- /dev/null +++ b/libraries/stream-extra/src/maybe-consumable.spec.ts @@ -0,0 +1,22 @@ +import assert from "node:assert"; +import { describe, it } from "node:test"; + +import { MaybeConsumable } from "./maybe-consumable.js"; + +describe("MaybeConsumable", () => { + it("should export all symbols", () => { + assert( + !!MaybeConsumable.WrapWritableStream, + "WrapWritableStream should be define", + ); + + assert( + !!MaybeConsumable.WritableStream, + "WritableStream should be define", + ); + + assert(!!MaybeConsumable.getValue, "getValue should be define"); + + assert(!!MaybeConsumable.tryConsume, "tryConsume should be define"); + }); +}); diff --git a/libraries/stream-extra/src/maybe-consumable.ts b/libraries/stream-extra/src/maybe-consumable.ts index 34606984..34800aba 100644 --- a/libraries/stream-extra/src/maybe-consumable.ts +++ b/libraries/stream-extra/src/maybe-consumable.ts @@ -2,4 +2,4 @@ import type { Consumable } from "./consumable.js"; export type MaybeConsumable = T | Consumable; -export * as MaybeConsumable from "./maybe-consumable-ns.js"; +export * as MaybeConsumable from "./maybe-consumable/index.js"; diff --git a/libraries/stream-extra/src/maybe-consumable/index.ts b/libraries/stream-extra/src/maybe-consumable/index.ts new file mode 100644 index 00000000..93338ca3 --- /dev/null +++ b/libraries/stream-extra/src/maybe-consumable/index.ts @@ -0,0 +1,4 @@ +export * from "./utils.js"; +export { MaybeConsumableWrapWritableStream as WrapWritableStream } from "./wrap-writable.js"; +export { MaybeConsumableWritableStream as WritableStream } from "./writable.js"; +export type { MaybeConsumableWritableStreamSink as WritableStreamSink } from "./writable.js"; diff --git a/libraries/stream-extra/src/maybe-consumable/utils.spec.ts b/libraries/stream-extra/src/maybe-consumable/utils.spec.ts new file mode 100644 index 00000000..35442ab9 --- /dev/null +++ b/libraries/stream-extra/src/maybe-consumable/utils.spec.ts @@ -0,0 +1,42 @@ +import * as assert from "node:assert"; +import { describe, it } from "node:test"; + +import { Consumable } from "../consumable.js"; + +import { getValue, tryConsume } from "./utils.js"; + +describe("MaybeConsumable", () => { + describe("getValue", () => { + it("should return the original value if it's not Consumable", () => { + const value = {}; + assert.strictEqual(getValue(value), value); + }); + + it("should return the inner value if it's Consumable", () => { + const value = new Consumable({}); + assert.strictEqual(getValue(value), value.value); + }); + + it("should return undefined for undefined", () => { + assert.strictEqual(getValue(undefined), undefined); + }); + }); + + describe("tryConsume", () => { + it("should invoke the callback with the original value if it's not Consumable", () => { + const value = {}; + const callback = (got: unknown) => { + assert.strictEqual(got, value); + }; + tryConsume(value, callback); + }); + + it("should invoke the callback with the inner value if it's Consumable", () => { + const value = new Consumable({}); + const callback = (got: unknown) => { + assert.strictEqual(got, value.value); + }; + tryConsume(value, callback); + }); + }); +}); diff --git a/libraries/stream-extra/src/maybe-consumable/utils.ts b/libraries/stream-extra/src/maybe-consumable/utils.ts new file mode 100644 index 00000000..e3bb9c42 --- /dev/null +++ b/libraries/stream-extra/src/maybe-consumable/utils.ts @@ -0,0 +1,17 @@ +import { Consumable } from "../consumable.js"; +import type { MaybeConsumable } from "../maybe-consumable.js"; + +export function getValue(value: MaybeConsumable): T { + return value instanceof Consumable ? value.value : value; +} + +export function tryConsume( + value: T, + callback: (value: T extends Consumable ? U : T) => R, +): R { + if (value instanceof Consumable) { + return value.tryConsume(callback); + } else { + return callback(value as never); + } +} diff --git a/libraries/stream-extra/src/maybe-consumable/wrap-writable.spec.ts b/libraries/stream-extra/src/maybe-consumable/wrap-writable.spec.ts new file mode 100644 index 00000000..9ba1ae53 --- /dev/null +++ b/libraries/stream-extra/src/maybe-consumable/wrap-writable.spec.ts @@ -0,0 +1,80 @@ +import * as assert from "node:assert"; +import { describe, it } from "node:test"; + +import { Consumable } from "../consumable.js"; +import { WritableStream } from "../stream.js"; + +import { MaybeConsumableWrapWritableStream } from "./wrap-writable.js"; + +describe("MaybeConsumable", () => { + describe("WrapWritableStream", () => { + it("should write to inner stream", async () => { + let step = 0; + + const stream = new MaybeConsumableWrapWritableStream( + new WritableStream({ + write(chunk) { + switch (step) { + case 0: + assert.strictEqual(chunk, "a"); + step += 1; + break; + case 2: + assert.strictEqual(chunk, "b"); + step += 1; + break; + } + }, + }), + ); + const writer = stream.getWriter(); + + await writer.write("a"); + assert.strictEqual(step, 1); + step += 1; + + await writer.write(new Consumable("b")); + assert.strictEqual(step, 3); + step += 1; + + await writer.close(); + }); + + it("should pause the source stream while piping", async () => { + let step = 0; + + const stream = new MaybeConsumableWrapWritableStream( + new WritableStream({ + write(chunk) { + switch (step) { + case 0: + assert.strictEqual(chunk, "a"); + step += 1; + break; + case 2: + assert.strictEqual(chunk, "b"); + step += 1; + break; + } + }, + }), + ); + + const readable = new Consumable.ReadableStream({ + async start(controller) { + await controller.enqueue("a"); + assert.strictEqual(step, 1); + step += 1; + + await controller.enqueue("b"); + assert.strictEqual(step, 3); + step += 1; + + controller.close(); + }, + }); + + await readable.pipeTo(stream); + }); + }); +}); diff --git a/libraries/stream-extra/src/maybe-consumable/wrap-writable.ts b/libraries/stream-extra/src/maybe-consumable/wrap-writable.ts new file mode 100644 index 00000000..4ae3fc86 --- /dev/null +++ b/libraries/stream-extra/src/maybe-consumable/wrap-writable.ts @@ -0,0 +1,23 @@ +import type { MaybeConsumable } from "../maybe-consumable.js"; +import { WritableStream } from "../stream.js"; + +import { tryConsume } from "./utils.js"; + +export class MaybeConsumableWrapWritableStream extends WritableStream< + MaybeConsumable +> { + constructor(stream: WritableStream) { + const writer = stream.getWriter(); + super({ + write(chunk) { + return tryConsume(chunk, (chunk) => writer.write(chunk as T)); + }, + abort(reason) { + return writer.abort(reason); + }, + close() { + return writer.close(); + }, + }); + } +} diff --git a/libraries/stream-extra/src/maybe-consumable-ns.ts b/libraries/stream-extra/src/maybe-consumable/writable.ts similarity index 62% rename from libraries/stream-extra/src/maybe-consumable-ns.ts rename to libraries/stream-extra/src/maybe-consumable/writable.ts index fd53a56a..eaa03c11 100644 --- a/libraries/stream-extra/src/maybe-consumable-ns.ts +++ b/libraries/stream-extra/src/maybe-consumable/writable.ts @@ -1,27 +1,14 @@ -import { Consumable } from "./consumable.js"; -import type { MaybeConsumable } from "./maybe-consumable.js"; +import { Consumable } from "../consumable.js"; +import type { MaybeConsumable } from "../maybe-consumable.js"; import type { QueuingStrategy, WritableStreamDefaultController, -} from "./stream.js"; -import { WritableStream as NativeWritableStream } from "./stream.js"; +} from "../stream.js"; +import { WritableStream } from "../stream.js"; -export function getValue(value: MaybeConsumable): T { - return value instanceof Consumable ? value.value : value; -} +import { tryConsume } from "./utils.js"; -export function tryConsume( - value: T, - callback: (value: T extends Consumable ? U : T) => R, -): R { - if (value instanceof Consumable) { - return value.tryConsume(callback); - } else { - return callback(value as never); - } -} - -export interface WritableStreamSink { +export interface MaybeConsumableWritableStreamSink { start?( controller: WritableStreamDefaultController, ): void | PromiseLike; @@ -33,10 +20,13 @@ export interface WritableStreamSink { close?(): void | PromiseLike; } -export class WritableStream extends NativeWritableStream< +export class MaybeConsumableWritableStream extends WritableStream< MaybeConsumable > { - constructor(sink: WritableStreamSink, strategy?: QueuingStrategy) { + constructor( + sink: MaybeConsumableWritableStreamSink, + strategy?: QueuingStrategy, + ) { let wrappedStrategy: QueuingStrategy> | undefined; if (strategy) { wrappedStrategy = {}; @@ -57,8 +47,8 @@ export class WritableStream extends NativeWritableStream< start(controller) { return sink.start?.(controller); }, - async write(chunk, controller) { - await tryConsume(chunk, (chunk) => + write(chunk, controller) { + return tryConsume(chunk, (chunk) => sink.write?.(chunk as T, controller), ); },