feat(stream): add WrapWritableStream to Consumable and MaybeConsumable

This commit is contained in:
Simon Chan 2025-06-16 03:07:07 +08:00
parent d5f6720d11
commit 1d3d3c8864
No known key found for this signature in database
GPG key ID: A8B69F750B9BCEDD
14 changed files with 438 additions and 175 deletions

View file

@ -1,133 +1,22 @@
import { PromiseResolver, isPromiseLike } from "@yume-chan/async";
import type {
QueuingStrategy,
WritableStreamDefaultController,
WritableStreamDefaultWriter,
} from "./stream.js";
ConsumableReadableStreamController,
ConsumableReadableStreamSource,
ConsumableWritableStreamSink,
} from "./consumable/index.js";
import {
ReadableStream as NativeReadableStream,
WritableStream as NativeWritableStream,
} from "./stream.js";
ConsumableReadableStream,
ConsumableWrapWritableStream,
ConsumableWritableStream,
} from "./consumable/index.js";
import type { Task } from "./task.js";
import { createTask } from "./task.js";
// Workaround https://github.com/evanw/esbuild/issues/3923
class WritableStream<in T> extends NativeWritableStream<Consumable<T>> {
static async write<T>(
writer: WritableStreamDefaultWriter<Consumable<T>>,
value: T,
) {
const consumable = new Consumable(value);
await writer.write(consumable);
await consumable.consumed;
}
constructor(
sink: Consumable.WritableStreamSink<T>,
strategy?: QueuingStrategy<T>,
) {
let wrappedStrategy: QueuingStrategy<Consumable<T>> | undefined;
if (strategy) {
wrappedStrategy = {};
if ("highWaterMark" in strategy) {
wrappedStrategy.highWaterMark = strategy.highWaterMark;
}
if ("size" in strategy) {
wrappedStrategy.size = (chunk) => {
return strategy.size!(
chunk instanceof Consumable ? chunk.value : chunk,
);
};
}
}
super(
{
start(controller) {
return sink.start?.(controller);
},
async write(chunk, controller) {
await chunk.tryConsume((chunk) =>
sink.write?.(chunk, controller),
);
},
abort(reason) {
return sink.abort?.(reason);
},
close() {
return sink.close?.();
},
},
wrappedStrategy,
);
}
}
class ReadableStream<T> extends NativeReadableStream<Consumable<T>> {
static async enqueue<T>(
controller: { enqueue: (chunk: Consumable<T>) => void },
chunk: T,
) {
const output = new Consumable(chunk);
controller.enqueue(output);
await output.consumed;
}
constructor(
source: Consumable.ReadableStreamSource<T>,
strategy?: QueuingStrategy<T>,
) {
let wrappedController:
| Consumable.ReadableStreamController<T>
| undefined;
let wrappedStrategy: QueuingStrategy<Consumable<T>> | undefined;
if (strategy) {
wrappedStrategy = {};
if ("highWaterMark" in strategy) {
wrappedStrategy.highWaterMark = strategy.highWaterMark;
}
if ("size" in strategy) {
wrappedStrategy.size = (chunk) => {
return strategy.size!(chunk.value);
};
}
}
super(
{
async start(controller) {
wrappedController = {
async enqueue(chunk) {
await ReadableStream.enqueue(controller, chunk);
},
close() {
controller.close();
},
error(reason) {
controller.error(reason);
},
};
await source.start?.(wrappedController);
},
async pull() {
await source.pull?.(wrappedController!);
},
async cancel(reason) {
await source.cancel?.(reason);
},
},
wrappedStrategy,
);
}
}
export class Consumable<T> {
static readonly WritableStream = WritableStream;
static readonly ReadableStream = ReadableStream;
static readonly WritableStream = ConsumableWritableStream;
static readonly WrapWritableStream = ConsumableWrapWritableStream;
static readonly ReadableStream = ConsumableReadableStream;
readonly #task: Task;
readonly #resolver: PromiseResolver<void>;
@ -176,35 +65,14 @@ export class Consumable<T> {
}
export namespace Consumable {
export interface WritableStreamSink<in T> {
start?(
controller: WritableStreamDefaultController,
): void | PromiseLike<void>;
write?(
chunk: T,
controller: WritableStreamDefaultController,
): void | PromiseLike<void>;
abort?(reason: unknown): void | PromiseLike<void>;
close?(): void | PromiseLike<void>;
}
export type WritableStreamSink<T> = ConsumableWritableStreamSink<T>;
export type WritableStream<in T> = typeof ConsumableWritableStream<T>;
export type WritableStream<in T> = typeof Consumable.WritableStream<T>;
export type WrapWritableStream<in T> =
typeof ConsumableWrapWritableStream<T>;
export interface ReadableStreamController<T> {
enqueue(chunk: T): Promise<void>;
close(): void;
error(reason: unknown): void;
}
export interface ReadableStreamSource<T> {
start?(
controller: ReadableStreamController<T>,
): void | PromiseLike<void>;
pull?(
controller: ReadableStreamController<T>,
): void | PromiseLike<void>;
cancel?(reason: unknown): void | PromiseLike<void>;
}
export type ReadableStream<T> = typeof Consumable.ReadableStream<T>;
export type ReadableStreamController<T> =
ConsumableReadableStreamController<T>;
export type ReadableStreamSource<T> = ConsumableReadableStreamSource<T>;
export type ReadableStream<T> = typeof ConsumableReadableStream<T>;
}

View file

@ -0,0 +1,3 @@
export * from "./readable.js";
export * from "./wrap-writable.js";
export * from "./writable.js";

View file

@ -0,0 +1,80 @@
import { Consumable } from "../consumable.js";
import type { QueuingStrategy } from "../stream.js";
import { ReadableStream } from "../stream.js";
export interface ConsumableReadableStreamController<T> {
enqueue(chunk: T): Promise<void>;
close(): void;
error(reason: unknown): void;
}
export interface ConsumableReadableStreamSource<T> {
start?(
controller: ConsumableReadableStreamController<T>,
): void | PromiseLike<void>;
pull?(
controller: ConsumableReadableStreamController<T>,
): void | PromiseLike<void>;
cancel?(reason: unknown): void | PromiseLike<void>;
}
export class ConsumableReadableStream<T> extends ReadableStream<Consumable<T>> {
static async enqueue<T>(
controller: { enqueue: (chunk: Consumable<T>) => void },
chunk: T,
) {
const output = new Consumable(chunk);
controller.enqueue(output);
await output.consumed;
}
constructor(
source: ConsumableReadableStreamSource<T>,
strategy?: QueuingStrategy<T>,
) {
let wrappedController!: ConsumableReadableStreamController<T>;
let wrappedStrategy: QueuingStrategy<Consumable<T>> | undefined;
if (strategy) {
wrappedStrategy = {};
if ("highWaterMark" in strategy) {
wrappedStrategy.highWaterMark = strategy.highWaterMark;
}
if ("size" in strategy) {
wrappedStrategy.size = (chunk) => {
return strategy.size!(chunk.value);
};
}
}
super(
{
start(controller) {
wrappedController = {
enqueue(chunk) {
return ConsumableReadableStream.enqueue(
controller,
chunk,
);
},
close() {
controller.close();
},
error(reason) {
controller.error(reason);
},
};
return source.start?.(wrappedController);
},
pull() {
return source.pull?.(wrappedController);
},
cancel(reason) {
return source.cancel?.(reason);
},
},
wrappedStrategy,
);
}
}

View file

@ -0,0 +1,41 @@
import * as assert from "node:assert";
import { describe, it } from "node:test";
describe("Consumable", () => {
describe("WritableStream", () => {
it("should not pause the source stream while piping", async () => {
let step = 0;
const stream = new WritableStream<string>({
write(chunk) {
switch (step) {
case 2:
assert.strictEqual(chunk, "a");
step += 1;
break;
case 3:
assert.strictEqual(chunk, "b");
step += 1;
break;
}
},
});
const readable = new ReadableStream<string>({
start(controller) {
controller.enqueue("a");
assert.strictEqual(step, 0);
step += 1;
controller.enqueue("b");
assert.strictEqual(step, 1);
step += 1;
controller.close();
},
});
await readable.pipeTo(stream);
});
});
});

View file

@ -0,0 +1,21 @@
import type { Consumable } from "../consumable.js";
import { WritableStream } from "../stream.js";
export class ConsumableWrapWritableStream<in T> extends WritableStream<
Consumable<T>
> {
constructor(stream: WritableStream<T>) {
const writer = stream.getWriter();
super({
write(chunk) {
return chunk.tryConsume((chunk) => writer.write(chunk));
},
abort(reason) {
return writer.abort(reason);
},
close() {
return writer.close();
},
});
}
}

View file

@ -0,0 +1,72 @@
import { Consumable } from "../consumable.js";
import type {
QueuingStrategy,
WritableStreamDefaultController,
WritableStreamDefaultWriter,
} from "../stream.js";
import { WritableStream } from "../stream.js";
export interface ConsumableWritableStreamSink<in T> {
start?(
controller: WritableStreamDefaultController,
): void | PromiseLike<void>;
write?(
chunk: T,
controller: WritableStreamDefaultController,
): void | PromiseLike<void>;
abort?(reason: unknown): void | PromiseLike<void>;
close?(): void | PromiseLike<void>;
}
export class ConsumableWritableStream<in T> extends WritableStream<
Consumable<T>
> {
static async write<T>(
writer: WritableStreamDefaultWriter<Consumable<T>>,
value: T,
) {
const consumable = new Consumable(value);
await writer.write(consumable);
await consumable.consumed;
}
constructor(
sink: ConsumableWritableStreamSink<T>,
strategy?: QueuingStrategy<T>,
) {
let wrappedStrategy: QueuingStrategy<Consumable<T>> | undefined;
if (strategy) {
wrappedStrategy = {};
if ("highWaterMark" in strategy) {
wrappedStrategy.highWaterMark = strategy.highWaterMark;
}
if ("size" in strategy) {
wrappedStrategy.size = (chunk) => {
return strategy.size!(
chunk instanceof Consumable ? chunk.value : chunk,
);
};
}
}
super(
{
start(controller) {
return sink.start?.(controller);
},
write(chunk, controller) {
return chunk.tryConsume((chunk) =>
sink.write?.(chunk, controller),
);
},
abort(reason) {
return sink.abort?.(reason);
},
close() {
return sink.close?.();
},
},
wrappedStrategy,
);
}
}

View file

@ -0,0 +1,22 @@
import assert from "node:assert";
import { describe, it } from "node:test";
import { MaybeConsumable } from "./maybe-consumable.js";
describe("MaybeConsumable", () => {
it("should export all symbols", () => {
assert(
!!MaybeConsumable.WrapWritableStream,
"WrapWritableStream should be define",
);
assert(
!!MaybeConsumable.WritableStream,
"WritableStream should be define",
);
assert(!!MaybeConsumable.getValue, "getValue should be define");
assert(!!MaybeConsumable.tryConsume, "tryConsume should be define");
});
});

View file

@ -2,4 +2,4 @@ import type { Consumable } from "./consumable.js";
export type MaybeConsumable<T> = T | Consumable<T>;
export * as MaybeConsumable from "./maybe-consumable-ns.js";
export * as MaybeConsumable from "./maybe-consumable/index.js";

View file

@ -0,0 +1,4 @@
export * from "./utils.js";
export { MaybeConsumableWrapWritableStream as WrapWritableStream } from "./wrap-writable.js";
export { MaybeConsumableWritableStream as WritableStream } from "./writable.js";
export type { MaybeConsumableWritableStreamSink as WritableStreamSink } from "./writable.js";

View file

@ -0,0 +1,42 @@
import * as assert from "node:assert";
import { describe, it } from "node:test";
import { Consumable } from "../consumable.js";
import { getValue, tryConsume } from "./utils.js";
describe("MaybeConsumable", () => {
describe("getValue", () => {
it("should return the original value if it's not Consumable", () => {
const value = {};
assert.strictEqual(getValue(value), value);
});
it("should return the inner value if it's Consumable", () => {
const value = new Consumable({});
assert.strictEqual(getValue(value), value.value);
});
it("should return undefined for undefined", () => {
assert.strictEqual(getValue(undefined), undefined);
});
});
describe("tryConsume", () => {
it("should invoke the callback with the original value if it's not Consumable", () => {
const value = {};
const callback = (got: unknown) => {
assert.strictEqual(got, value);
};
tryConsume(value, callback);
});
it("should invoke the callback with the inner value if it's Consumable", () => {
const value = new Consumable({});
const callback = (got: unknown) => {
assert.strictEqual(got, value.value);
};
tryConsume(value, callback);
});
});
});

View file

@ -0,0 +1,17 @@
import { Consumable } from "../consumable.js";
import type { MaybeConsumable } from "../maybe-consumable.js";
export function getValue<T>(value: MaybeConsumable<T>): T {
return value instanceof Consumable ? value.value : value;
}
export function tryConsume<T, R>(
value: T,
callback: (value: T extends Consumable<infer U> ? U : T) => R,
): R {
if (value instanceof Consumable) {
return value.tryConsume(callback);
} else {
return callback(value as never);
}
}

View file

@ -0,0 +1,80 @@
import * as assert from "node:assert";
import { describe, it } from "node:test";
import { Consumable } from "../consumable.js";
import { WritableStream } from "../stream.js";
import { MaybeConsumableWrapWritableStream } from "./wrap-writable.js";
describe("MaybeConsumable", () => {
describe("WrapWritableStream", () => {
it("should write to inner stream", async () => {
let step = 0;
const stream = new MaybeConsumableWrapWritableStream(
new WritableStream({
write(chunk) {
switch (step) {
case 0:
assert.strictEqual(chunk, "a");
step += 1;
break;
case 2:
assert.strictEqual(chunk, "b");
step += 1;
break;
}
},
}),
);
const writer = stream.getWriter();
await writer.write("a");
assert.strictEqual(step, 1);
step += 1;
await writer.write(new Consumable("b"));
assert.strictEqual(step, 3);
step += 1;
await writer.close();
});
it("should pause the source stream while piping", async () => {
let step = 0;
const stream = new MaybeConsumableWrapWritableStream<string>(
new WritableStream({
write(chunk) {
switch (step) {
case 0:
assert.strictEqual(chunk, "a");
step += 1;
break;
case 2:
assert.strictEqual(chunk, "b");
step += 1;
break;
}
},
}),
);
const readable = new Consumable.ReadableStream<string>({
async start(controller) {
await controller.enqueue("a");
assert.strictEqual(step, 1);
step += 1;
await controller.enqueue("b");
assert.strictEqual(step, 3);
step += 1;
controller.close();
},
});
await readable.pipeTo(stream);
});
});
});

View file

@ -0,0 +1,23 @@
import type { MaybeConsumable } from "../maybe-consumable.js";
import { WritableStream } from "../stream.js";
import { tryConsume } from "./utils.js";
export class MaybeConsumableWrapWritableStream<T> extends WritableStream<
MaybeConsumable<T>
> {
constructor(stream: WritableStream<T>) {
const writer = stream.getWriter();
super({
write(chunk) {
return tryConsume(chunk, (chunk) => writer.write(chunk as T));
},
abort(reason) {
return writer.abort(reason);
},
close() {
return writer.close();
},
});
}
}

View file

@ -1,27 +1,14 @@
import { Consumable } from "./consumable.js";
import type { MaybeConsumable } from "./maybe-consumable.js";
import { Consumable } from "../consumable.js";
import type { MaybeConsumable } from "../maybe-consumable.js";
import type {
QueuingStrategy,
WritableStreamDefaultController,
} from "./stream.js";
import { WritableStream as NativeWritableStream } from "./stream.js";
} from "../stream.js";
import { WritableStream } from "../stream.js";
export function getValue<T>(value: MaybeConsumable<T>): T {
return value instanceof Consumable ? value.value : value;
}
import { tryConsume } from "./utils.js";
export function tryConsume<T, R>(
value: T,
callback: (value: T extends Consumable<infer U> ? U : T) => R,
): R {
if (value instanceof Consumable) {
return value.tryConsume(callback);
} else {
return callback(value as never);
}
}
export interface WritableStreamSink<in T> {
export interface MaybeConsumableWritableStreamSink<in T> {
start?(
controller: WritableStreamDefaultController,
): void | PromiseLike<void>;
@ -33,10 +20,13 @@ export interface WritableStreamSink<in T> {
close?(): void | PromiseLike<void>;
}
export class WritableStream<in T> extends NativeWritableStream<
export class MaybeConsumableWritableStream<in T> extends WritableStream<
MaybeConsumable<T>
> {
constructor(sink: WritableStreamSink<T>, strategy?: QueuingStrategy<T>) {
constructor(
sink: MaybeConsumableWritableStreamSink<T>,
strategy?: QueuingStrategy<T>,
) {
let wrappedStrategy: QueuingStrategy<MaybeConsumable<T>> | undefined;
if (strategy) {
wrappedStrategy = {};
@ -57,8 +47,8 @@ export class WritableStream<in T> extends NativeWritableStream<
start(controller) {
return sink.start?.(controller);
},
async write(chunk, controller) {
await tryConsume(chunk, (chunk) =>
write(chunk, controller) {
return tryConsume(chunk, (chunk) =>
sink.write?.(chunk as T, controller),
);
},