feat(stream): add Consumable.WrapByteReadableStream and MaybeConsumable.WrapWritableStream

This commit is contained in:
Simon Chan 2025-06-16 13:50:18 +08:00
parent a835eb81b5
commit 40a60ca112
No known key found for this signature in database
GPG key ID: A8B69F750B9BCEDD
5 changed files with 45 additions and 1 deletions

View file

@ -0,0 +1,5 @@
---
"@yume-chan/stream-extra": minor
---
Add `Consumable.WrapByteReadableStream` and `MaybeConsumable.WrapWritableStream`

View file

@ -7,6 +7,7 @@ import type {
} from "./consumable/index.js"; } from "./consumable/index.js";
import { import {
ConsumableReadableStream, ConsumableReadableStream,
ConsumableWrapByteReadableStream,
ConsumableWrapWritableStream, ConsumableWrapWritableStream,
ConsumableWritableStream, ConsumableWritableStream,
} from "./consumable/index.js"; } from "./consumable/index.js";
@ -17,6 +18,7 @@ export class Consumable<T> {
static readonly WritableStream = ConsumableWritableStream; static readonly WritableStream = ConsumableWritableStream;
static readonly WrapWritableStream = ConsumableWrapWritableStream; static readonly WrapWritableStream = ConsumableWrapWritableStream;
static readonly ReadableStream = ConsumableReadableStream; static readonly ReadableStream = ConsumableReadableStream;
static readonly WrapByteReadableStream = ConsumableWrapByteReadableStream;
readonly #task: Task; readonly #task: Task;
readonly #resolver: PromiseResolver<void>; readonly #resolver: PromiseResolver<void>;
@ -75,4 +77,7 @@ export namespace Consumable {
ConsumableReadableStreamController<T>; ConsumableReadableStreamController<T>;
export type ReadableStreamSource<T> = ConsumableReadableStreamSource<T>; export type ReadableStreamSource<T> = ConsumableReadableStreamSource<T>;
export type ReadableStream<T> = typeof ConsumableReadableStream<T>; export type ReadableStream<T> = typeof ConsumableReadableStream<T>;
export type WrapByteReadableStream =
typeof ConsumableWrapByteReadableStream;
} }

View file

@ -1,3 +1,4 @@
export * from "./readable.js"; export * from "./readable.js";
export * from "./wrap-byte-readable.js";
export * from "./wrap-writable.js"; export * from "./wrap-writable.js";
export * from "./writable.js"; export * from "./writable.js";

View file

@ -0,0 +1,33 @@
import type { Consumable } from "../consumable.js";
import { ReadableStream } from "../stream.js";
import { ConsumableReadableStream } from "./readable.js";
export class ConsumableWrapByteReadableStream extends ReadableStream<
Consumable<Uint8Array>
> {
constructor(
stream: ReadableStream<Uint8Array>,
chunkSize: number,
min?: number,
) {
const reader = stream.getReader({ mode: "byob" });
let array = new Uint8Array(chunkSize);
super({
async pull(controller) {
const { done, value } = await reader.read(array, { min });
if (done) {
controller.close();
return;
}
await ConsumableReadableStream.enqueue(controller, value);
array = new Uint8Array(value.buffer);
},
cancel(reason) {
return reader.cancel(reason);
},
});
}
}

View file

@ -302,7 +302,7 @@ export declare class ReadableStreamBYOBReader {
* @public * @public
*/ */
export declare interface ReadableStreamBYOBReaderReadOptions { export declare interface ReadableStreamBYOBReaderReadOptions {
min?: number; min?: number | undefined;
} }
/** /**