feat: make consumable writable streams also accept raw values

This commit is contained in:
Simon Chan 2024-04-16 14:01:30 +08:00
parent d60ac95f47
commit d06b5f2ed6
No known key found for this signature in database
GPG key ID: A8B69F750B9BCEDD
31 changed files with 387 additions and 346 deletions

View file

@ -6,10 +6,7 @@ import "source-map-support/register.js";
import { Adb, AdbServerClient } from "@yume-chan/adb";
import { AdbServerNodeTcpConnector } from "@yume-chan/adb-server-node-tcp";
import {
ConsumableWritableStream,
WritableStream,
} from "@yume-chan/stream-extra";
import { WritableStream } from "@yume-chan/stream-extra";
import { program } from "commander";
program
@ -142,7 +139,7 @@ createDeviceCommand("shell [args...]")
process.stdin.setRawMode(true);
process.stdin.on("data", (data: Uint8Array) => {
ConsumableWritableStream.write(stdinWriter, data).catch((e) => {
stdinWriter.write(data).catch((e) => {
console.error(e);
process.exit(1);
});

View file

@ -14,8 +14,8 @@ import type {
WritableStream,
} from "@yume-chan/stream-extra";
import {
ConsumableWritableStream,
DuplexStreamFactory,
MaybeConsumable,
ReadableStream,
pipeFrom,
} from "@yume-chan/stream-extra";
@ -188,7 +188,7 @@ export class AdbDaemonWebUsbConnection
const zeroMask = outEndpoint.packetSize - 1;
this.#writable = pipeFrom(
duplex.createWritable(
new ConsumableWritableStream({
new MaybeConsumable.WritableStream({
write: async (chunk) => {
try {
await device.raw.transferOut(

View file

@ -23,6 +23,7 @@ import {
} from "@yume-chan/scrcpy";
import type {
Consumable,
MaybeConsumable,
ReadableStream,
ReadableWritablePair,
} from "@yume-chan/stream-extra";
@ -100,7 +101,7 @@ export type AdbScrcpyAudioStreamMetadata =
export class AdbScrcpyClient {
static async pushServer(
adb: Adb,
file: ReadableStream<Consumable<Uint8Array>>,
file: ReadableStream<MaybeConsumable<Uint8Array>>,
filename = DEFAULT_SERVER_PATH,
) {
const sync = await adb.sync();

View file

@ -8,8 +8,8 @@ import type {
AdbServerConnector,
} from "@yume-chan/adb";
import {
MaybeConsumable,
PushReadableStream,
UnwrapConsumableStream,
WrapWritableStream,
WritableStream,
} from "@yume-chan/stream-extra";
@ -101,7 +101,7 @@ export class AdbServerNodeTcpConnector implements AdbServerConnector {
readable: connection.readable,
writable: new WrapWritableStream(
connection.writable,
).bePipedThroughFrom(new UnwrapConsumableStream()),
).bePipedThroughFrom(new MaybeConsumable.UnwrapStream()),
get closed() {
return connection.closed;
},

View file

@ -1,4 +1,7 @@
import type { Consumable, ReadableWritablePair } from "@yume-chan/stream-extra";
import type {
MaybeConsumable,
ReadableWritablePair,
} from "@yume-chan/stream-extra";
import { ConcatStringStream, DecodeUtf8Stream } from "@yume-chan/stream-extra";
import type { ValueOrPromise } from "@yume-chan/struct";
@ -20,7 +23,7 @@ export interface Closeable {
}
export interface AdbSocket
extends ReadableWritablePair<Uint8Array, Consumable<Uint8Array>>,
extends ReadableWritablePair<Uint8Array, MaybeConsumable<Uint8Array>>,
Closeable {
get service(): string;

View file

@ -1,4 +1,4 @@
import type { Consumable, WritableStream } from "@yume-chan/stream-extra";
import type { MaybeConsumable, WritableStream } from "@yume-chan/stream-extra";
import { ReadableStream } from "@yume-chan/stream-extra";
import type { Adb, AdbSocket } from "../../../adb.js";
@ -36,7 +36,7 @@ export class AdbSubprocessNoneProtocol implements AdbSubprocessProtocol {
readonly #socket: AdbSocket;
// Legacy shell forwards all data to stdin.
get stdin(): WritableStream<Consumable<Uint8Array>> {
get stdin(): WritableStream<MaybeConsumable<Uint8Array>> {
return this.#socket.writable;
}

View file

@ -1,15 +1,12 @@
import { PromiseResolver } from "@yume-chan/async";
import type {
Consumable,
PushReadableStreamController,
ReadableStream,
WritableStreamDefaultWriter,
} from "@yume-chan/stream-extra";
import {
ConsumableWritableStream,
MaybeConsumable,
PushReadableStream,
StructDeserializeStream,
WritableStream,
type PushReadableStreamController,
type ReadableStream,
type WritableStreamDefaultWriter,
} from "@yume-chan/stream-extra";
import type { StructValueType } from "@yume-chan/struct";
import Struct, { placeholder } from "@yume-chan/struct";
@ -64,9 +61,9 @@ export class AdbSubprocessShellProtocol implements AdbSubprocessProtocol {
}
readonly #socket: AdbSocket;
#writer: WritableStreamDefaultWriter<Consumable<Uint8Array>>;
#writer: WritableStreamDefaultWriter<MaybeConsumable<Uint8Array>>;
#stdin: WritableStream<Consumable<Uint8Array>>;
#stdin: WritableStream<MaybeConsumable<Uint8Array>>;
get stdin() {
return this.#stdin;
}
@ -140,23 +137,22 @@ export class AdbSubprocessShellProtocol implements AdbSubprocessProtocol {
this.#writer = this.#socket.writable.getWriter();
this.#stdin = new WritableStream<Consumable<Uint8Array>>({
this.#stdin = new MaybeConsumable.WritableStream<Uint8Array>({
write: async (chunk) => {
await ConsumableWritableStream.write(
this.#writer,
await MaybeConsumable.tryConsume(chunk, async (chunk) => {
await this.#writer.write(
AdbShellProtocolPacket.serialize({
id: AdbShellProtocolId.Stdin,
data: chunk.value,
data: chunk,
}),
);
chunk.consume();
});
},
});
}
async resize(rows: number, cols: number) {
await ConsumableWritableStream.write(
this.#writer,
await this.#writer.write(
AdbShellProtocolPacket.serialize({
id: AdbShellProtocolId.WindowSizeChange,
// The "correct" format is `${rows}x${cols},${x_pixels}x${y_pixels}`

View file

@ -1,5 +1,5 @@
import type {
Consumable,
MaybeConsumable,
ReadableStream,
WritableStream,
} from "@yume-chan/stream-extra";
@ -11,7 +11,7 @@ export interface AdbSubprocessProtocol {
/**
* A WritableStream that writes to the `stdin` stream.
*/
readonly stdin: WritableStream<Consumable<Uint8Array>>;
readonly stdin: WritableStream<MaybeConsumable<Uint8Array>>;
/**
* The `stdout` stream of the process.

View file

@ -1,8 +1,8 @@
import type { Consumable, ReadableStream } from "@yume-chan/stream-extra";
import {
AbortController,
ConsumableWritableStream,
DistributionStream,
MaybeConsumable,
type ReadableStream,
} from "@yume-chan/stream-extra";
import Struct, { placeholder } from "@yume-chan/struct";
@ -18,7 +18,7 @@ export const ADB_SYNC_MAX_PACKET_SIZE = 64 * 1024;
export interface AdbSyncPushV1Options {
socket: AdbSyncSocket;
filename: string;
file: ReadableStream<Consumable<Uint8Array>>;
file: ReadableStream<MaybeConsumable<Uint8Array>>;
type?: LinuxFileType;
permission?: number;
mtime?: number;
@ -31,7 +31,7 @@ export const AdbSyncOkResponse = new Struct({ littleEndian: true }).uint32(
async function pipeFileData(
locked: AdbSyncSocketLocked,
file: ReadableStream<Consumable<Uint8Array>>,
file: ReadableStream<MaybeConsumable<Uint8Array>>,
packetSize: number,
mtime: number,
) {
@ -40,7 +40,7 @@ async function pipeFileData(
const abortController = new AbortController();
file.pipeThrough(new DistributionStream(packetSize, true))
.pipeTo(
new ConsumableWritableStream({
new MaybeConsumable.WritableStream({
write: async (chunk) => {
await adbSyncWriteRequest(
locked,

View file

@ -19,10 +19,6 @@ export const AdbSyncNumberRequest = new Struct({ littleEndian: true })
.string("id", { length: 4 })
.uint32("arg");
export const AdbSyncDataRequest = new Struct({ littleEndian: true })
.concat(AdbSyncNumberRequest)
.uint8Array("data", { lengthField: "arg" });
export interface AdbSyncWritable {
write(buffer: Uint8Array): Promise<void>;
}
@ -33,22 +29,21 @@ export async function adbSyncWriteRequest(
value: number | string | Uint8Array,
): Promise<void> {
if (typeof value === "number") {
const buffer = AdbSyncNumberRequest.serialize({
id,
arg: value,
});
await writable.write(buffer);
} else if (typeof value === "string") {
// Let `writable` buffer writes
const buffer = encodeUtf8(value);
await writable.write(
AdbSyncNumberRequest.serialize({ id, arg: buffer.byteLength }),
AdbSyncNumberRequest.serialize({ id, arg: value }),
);
await writable.write(buffer);
} else {
return;
}
if (typeof value === "string") {
value = encodeUtf8(value);
}
// `writable` will copy inputs to an internal buffer,
// so we write header and `buffer` separately,
// to avoid an extra copy.
await writable.write(
AdbSyncNumberRequest.serialize({ id, arg: value.byteLength }),
);
await writable.write(value);
}
}

View file

@ -1,11 +1,11 @@
import type {
Consumable,
MaybeConsumable,
WritableStreamDefaultWriter,
} from "@yume-chan/stream-extra";
import {
BufferCombiner,
BufferedReadableStream,
ConsumableWritableStream,
Consumable,
} from "@yume-chan/stream-extra";
import type { AsyncExactReadable } from "@yume-chan/struct";
@ -13,7 +13,7 @@ import type { AdbSocket } from "../../adb.js";
import { AutoResetEvent } from "../../utils/index.js";
export class AdbSyncSocketLocked implements AsyncExactReadable {
readonly #writer: WritableStreamDefaultWriter<Consumable<Uint8Array>>;
readonly #writer: WritableStreamDefaultWriter<MaybeConsumable<Uint8Array>>;
readonly #readable: BufferedReadableStream;
readonly #socketLock: AutoResetEvent;
readonly #writeLock = new AutoResetEvent();
@ -24,7 +24,7 @@ export class AdbSyncSocketLocked implements AsyncExactReadable {
}
constructor(
writer: WritableStreamDefaultWriter<Consumable<Uint8Array>>,
writer: WritableStreamDefaultWriter<MaybeConsumable<Uint8Array>>,
readable: BufferedReadableStream,
bufferSize: number,
lock: AutoResetEvent,
@ -35,8 +35,8 @@ export class AdbSyncSocketLocked implements AsyncExactReadable {
this.#combiner = new BufferCombiner(bufferSize);
}
async #writeInnerStream(buffer: Uint8Array) {
await ConsumableWritableStream.write(this.#writer, buffer);
async #writeConsumable(buffer: Uint8Array) {
await Consumable.WritableStream.write(this.#writer, buffer);
}
async flush() {
@ -44,7 +44,7 @@ export class AdbSyncSocketLocked implements AsyncExactReadable {
await this.#writeLock.wait();
const buffer = this.#combiner.flush();
if (buffer) {
await this.#writeInnerStream(buffer);
await this.#writeConsumable(buffer);
}
} finally {
this.#writeLock.notifyOne();
@ -55,7 +55,7 @@ export class AdbSyncSocketLocked implements AsyncExactReadable {
try {
await this.#writeLock.wait();
for (const buffer of this.#combiner.push(data)) {
await this.#writeInnerStream(buffer);
await this.#writeConsumable(buffer);
}
} finally {
this.#writeLock.notifyOne();

View file

@ -1,5 +1,5 @@
import { AutoDisposable } from "@yume-chan/event";
import type { Consumable, ReadableStream } from "@yume-chan/stream-extra";
import type { MaybeConsumable, ReadableStream } from "@yume-chan/stream-extra";
import type { Adb, AdbSocket } from "../../adb.js";
import { AdbFeature } from "../../features.js";
@ -31,7 +31,7 @@ export function dirname(path: string): string {
export interface AdbSyncWriteOptions {
filename: string;
file: ReadableStream<Consumable<Uint8Array>>;
file: ReadableStream<MaybeConsumable<Uint8Array>>;
type?: LinuxFileType;
permission?: number;
mtime?: number;

View file

@ -3,15 +3,12 @@ import {
PromiseResolver,
delay,
} from "@yume-chan/async";
import type {
Consumable,
ReadableWritablePair,
WritableStreamDefaultWriter,
} from "@yume-chan/stream-extra";
import {
AbortController,
ConsumableWritableStream,
Consumable,
WritableStream,
type ReadableWritablePair,
type WritableStreamDefaultWriter,
} from "@yume-chan/stream-extra";
import { EMPTY_UINT8_ARRAY, NumberFieldType } from "@yume-chan/struct";
@ -231,7 +228,10 @@ export class AdbPacketDispatcher implements Closeable {
let payload: Uint8Array;
if (this.options.initialDelayedAckBytes !== 0) {
payload = new Uint8Array(4);
new DataView(payload.buffer).setUint32(0, ackBytes, true);
payload[0] = ackBytes & 0xff;
payload[1] = (ackBytes >> 8) & 0xff;
payload[2] = (ackBytes >> 16) & 0xff;
payload[3] = (ackBytes >> 24) & 0xff;
} else {
payload = EMPTY_UINT8_ARRAY;
}
@ -374,7 +374,7 @@ export class AdbPacketDispatcher implements Closeable {
throw new Error("payload too large");
}
await ConsumableWritableStream.write(this.#writer, {
await Consumable.WritableStream.write(this.#writer, {
command,
arg0,
arg1,

View file

@ -1,4 +1,8 @@
import { ConsumableTransformStream } from "@yume-chan/stream-extra";
import type { Consumable } from "@yume-chan/stream-extra";
import {
ConsumableReadableStream,
TransformStream,
} from "@yume-chan/stream-extra";
import Struct from "@yume-chan/struct";
export enum AdbCommand {
@ -49,26 +53,33 @@ export function calculateChecksum(payload: Uint8Array): number {
return payload.reduce((result, item) => result + item, 0);
}
export class AdbPacketSerializeStream extends ConsumableTransformStream<
AdbPacketInit,
Uint8Array
export class AdbPacketSerializeStream extends TransformStream<
Consumable<AdbPacketInit>,
Consumable<Uint8Array>
> {
constructor() {
const headerBuffer = new Uint8Array(AdbPacketHeader.size);
super({
transform: async (chunk, controller) => {
await chunk.tryConsume(async (chunk) => {
const init = chunk as AdbPacketInit & AdbPacketHeaderInit;
init.payloadLength = init.payload.byteLength;
AdbPacketHeader.serialize(init, headerBuffer);
await controller.enqueue(headerBuffer);
await ConsumableReadableStream.enqueue(
controller,
AdbPacketHeader.serialize(init, headerBuffer),
);
if (init.payload.byteLength) {
// USB protocol preserves packet boundaries,
// so we must write payload separately as native ADB does,
// otherwise the read operation on device will fail.
await controller.enqueue(init.payload);
await ConsumableReadableStream.enqueue(
controller,
init.payload,
);
}
});
},
});
}

View file

@ -1,16 +1,13 @@
import { PromiseResolver } from "@yume-chan/async";
import type { Disposable } from "@yume-chan/event";
import type {
AbortSignal,
Consumable,
PushReadableStreamController,
ReadableStream,
WritableStream,
WritableStreamDefaultController,
} from "@yume-chan/stream-extra";
import {
ConsumableWritableStream,
MaybeConsumable,
PushReadableStream,
type AbortSignal,
type PushReadableStreamController,
type ReadableStream,
type WritableStream,
type WritableStreamDefaultController,
} from "@yume-chan/stream-extra";
import type { AdbSocket } from "../adb.js";
@ -50,7 +47,7 @@ export class AdbDaemonSocketController
}
#writableController!: WritableStreamDefaultController;
readonly writable: WritableStream<Consumable<Uint8Array>>;
readonly writable: WritableStream<MaybeConsumable<Uint8Array>>;
#closed = false;
@ -86,7 +83,7 @@ export class AdbDaemonSocketController
this.#readableController = controller;
});
this.writable = new ConsumableWritableStream<Uint8Array>({
this.writable = new MaybeConsumable.WritableStream<Uint8Array>({
start: (controller) => {
this.#writableController = controller;
},
@ -216,7 +213,7 @@ export class AdbDaemonSocket implements AdbDaemonSocketInfo, AdbSocket {
get readable(): ReadableStream<Uint8Array> {
return this.#controller.readable;
}
get writable(): WritableStream<Consumable<Uint8Array>> {
get writable(): WritableStream<MaybeConsumable<Uint8Array>> {
return this.#controller.writable;
}

View file

@ -1,9 +1,9 @@
import { PromiseResolver } from "@yume-chan/async";
import type { Consumable, ReadableWritablePair } from "@yume-chan/stream-extra";
import {
AbortController,
ConsumableWritableStream,
Consumable,
WritableStream,
type ReadableWritablePair,
} from "@yume-chan/stream-extra";
import type { ValueOrPromise } from "@yume-chan/struct";
import { decodeUtf8, encodeUtf8 } from "@yume-chan/struct";
@ -187,7 +187,10 @@ export class AdbDaemonTransport implements AdbTransport {
// Because we don't know if the device needs it or not.
(init as AdbPacketInit).checksum = calculateChecksum(init.payload);
(init as AdbPacketInit).magic = init.command ^ 0xffffffff;
await ConsumableWritableStream.write(writer, init as AdbPacketInit);
await Consumable.WritableStream.write(
writer,
init as AdbPacketInit,
);
}
const actualFeatures = features.slice();

View file

@ -8,7 +8,7 @@ import type {
} from "@yume-chan/stream-extra";
import {
BufferedReadableStream,
UnwrapConsumableStream,
MaybeConsumable,
WrapWritableStream,
} from "@yume-chan/stream-extra";
import type {
@ -408,7 +408,7 @@ export class AdbServerClient {
readable: readable.release(),
writable: new WrapWritableStream(
connection.writable,
).bePipedThroughFrom(new UnwrapConsumableStream()),
).bePipedThroughFrom(new MaybeConsumable.UnwrapStream()),
get closed() {
return connection.closed;
},

View file

@ -1,5 +1,5 @@
import { AdbCommandBase } from "@yume-chan/adb";
import type { Consumable, ReadableStream } from "@yume-chan/stream-extra";
import type { MaybeConsumable, ReadableStream } from "@yume-chan/stream-extra";
import { ConcatStringStream, DecodeUtf8Stream } from "@yume-chan/stream-extra";
export interface AdbBackupOptions {
@ -15,7 +15,7 @@ export interface AdbBackupOptions {
export interface AdbRestoreOptions {
user: number;
file: ReadableStream<Consumable<Uint8Array>>;
file: ReadableStream<MaybeConsumable<Uint8Array>>;
}
export class AdbBackup extends AdbCommandBase {

View file

@ -5,7 +5,7 @@
import type { Adb } from "@yume-chan/adb";
import { AdbCommandBase, escapeArg } from "@yume-chan/adb";
import type { Consumable, ReadableStream } from "@yume-chan/stream-extra";
import type { MaybeConsumable, ReadableStream } from "@yume-chan/stream-extra";
import {
ConcatStringStream,
DecodeUtf8Stream,
@ -295,7 +295,7 @@ export class PackageManager extends AdbCommandBase {
}
async pushAndInstallStream(
stream: ReadableStream<Consumable<Uint8Array>>,
stream: ReadableStream<MaybeConsumable<Uint8Array>>,
options?: Partial<PackageManagerInstallOptions>,
): Promise<void> {
const sync = await this.adb.sync();
@ -334,7 +334,7 @@ export class PackageManager extends AdbCommandBase {
async installStream(
size: number,
stream: ReadableStream<Consumable<Uint8Array>>,
stream: ReadableStream<MaybeConsumable<Uint8Array>>,
options?: Partial<PackageManagerInstallOptions>,
): Promise<void> {
// Android 7 added both `cmd` command and streaming install support,
@ -548,7 +548,7 @@ export class PackageManager extends AdbCommandBase {
sessionId: number,
splitName: string,
size: number,
stream: ReadableStream<Consumable<Uint8Array>>,
stream: ReadableStream<MaybeConsumable<Uint8Array>>,
) {
const args: string[] = [
"pm",
@ -625,7 +625,7 @@ export class PackageManagerInstallSession {
addSplitStream(
splitName: string,
size: number,
stream: ReadableStream<Consumable<Uint8Array>>,
stream: ReadableStream<MaybeConsumable<Uint8Array>>,
) {
return this.#packageManager.sessionAddSplitStream(
this.#id,

View file

@ -274,7 +274,7 @@ export class HidKeyboard {
static readonly REPORT_SIZE = 8;
#modifiers = 0;
#keys: Set<HidKeyCode> = new Set();
#keys = new Set<HidKeyCode>();
down(key: HidKeyCode) {
if (key >= HidKeyCode.ControlLeft && key <= HidKeyCode.MetaRight) {

View file

@ -85,7 +85,7 @@ export class HidTouchScreen {
static readonly DESCRIPTOR = DESCRIPTOR;
#fingers: Map<number, Finger> = new Map();
#fingers = new Map<number, Finger>();
down(id: number, x: number, y: number) {
if (this.#fingers.size >= 10) {

View file

@ -1,8 +1,7 @@
import type {
import {
Consumable,
WritableStreamDefaultWriter,
type WritableStreamDefaultWriter,
} from "@yume-chan/stream-extra";
import { ConsumableWritableStream } from "@yume-chan/stream-extra";
import type { ScrcpyOptions } from "../options/index.js";
@ -29,7 +28,7 @@ export class ScrcpyControlMessageWriter {
}
async write(message: Uint8Array) {
await ConsumableWritableStream.write(this.#writer, message);
await Consumable.WritableStream.write(this.#writer, message);
}
async injectKeyCode(

View file

@ -5,7 +5,10 @@ import type {
WritableStreamDefaultController,
WritableStreamDefaultWriter,
} from "./stream.js";
import { ReadableStream, TransformStream, WritableStream } from "./stream.js";
import {
WritableStream as NativeWritableStream,
ReadableStream,
} from "./stream.js";
interface Task {
run<T>(callback: () => T): T;
@ -29,6 +32,10 @@ const createTask: Console["createTask"] =
},
}));
function isPromiseLike(value: unknown): value is PromiseLike<unknown> {
return typeof value === "object" && value !== null && "then" in value;
}
export class Consumable<T> {
readonly #task: Task;
readonly #resolver: PromiseResolver<void>;
@ -51,11 +58,23 @@ export class Consumable<T> {
this.#resolver.reject(error);
}
async tryConsume<U>(callback: (value: T) => U) {
tryConsume<U>(callback: (value: T) => U) {
try {
// eslint-disable-next-line @typescript-eslint/await-thenable
const result = await this.#task.run(() => callback(this.value));
let result = this.#task.run(() => callback(this.value));
if (isPromiseLike(result)) {
result = result.then(
(value) => {
this.#resolver.resolve();
return value;
},
(e) => {
this.#resolver.reject(e);
throw e;
},
) as U;
} else {
this.#resolver.resolve();
}
return result;
} catch (e) {
this.#resolver.reject(e);
@ -64,36 +83,70 @@ export class Consumable<T> {
}
}
async function enqueue<T>(
controller: { enqueue: (chunk: Consumable<T>) => void },
export namespace Consumable {
export interface WritableStreamSink<in T> {
start?(
controller: WritableStreamDefaultController,
): void | PromiseLike<void>;
write?(
chunk: T,
) {
const output = new Consumable(chunk);
controller.enqueue(output);
await output.consumed;
}
export class WrapConsumableStream<T> extends TransformStream<T, Consumable<T>> {
constructor() {
super({
async transform(chunk, controller) {
await enqueue(controller, chunk);
},
});
controller: WritableStreamDefaultController,
): void | PromiseLike<void>;
abort?(reason: unknown): void | PromiseLike<void>;
close?(): void | PromiseLike<void>;
}
}
export class UnwrapConsumableStream<T> extends TransformStream<
Consumable<T>,
T
> {
constructor() {
super({
transform(chunk, controller) {
controller.enqueue(chunk.value);
chunk.consume();
export class WritableStream<in T> extends NativeWritableStream<
Consumable<T>
> {
static async write<T>(
writer: WritableStreamDefaultWriter<Consumable<T>>,
value: T,
) {
const consumable = new Consumable(value);
await writer.write(consumable);
await consumable.consumed;
}
constructor(
sink: WritableStreamSink<T>,
strategy?: QueuingStrategy<T>,
) {
let wrappedStrategy: QueuingStrategy<Consumable<T>> | undefined;
if (strategy) {
wrappedStrategy = {};
if ("highWaterMark" in strategy) {
wrappedStrategy.highWaterMark = strategy.highWaterMark;
}
if ("size" in strategy) {
wrappedStrategy.size = (chunk) => {
return strategy.size!(
chunk instanceof Consumable ? chunk.value : chunk,
);
};
}
}
super(
{
start(controller) {
return sink.start?.(controller);
},
});
async write(chunk, controller) {
await chunk.tryConsume((chunk) =>
sink.write?.(chunk, controller),
);
},
abort(reason) {
return sink.abort?.(reason);
},
close() {
return sink.close?.();
},
},
wrappedStrategy,
);
}
}
}
@ -114,6 +167,15 @@ export interface ConsumableReadableStreamSource<T> {
}
export class ConsumableReadableStream<T> extends ReadableStream<Consumable<T>> {
static async enqueue<T>(
controller: { enqueue: (chunk: Consumable<T>) => void },
chunk: T,
) {
const output = new Consumable(chunk);
controller.enqueue(output);
await output.consumed;
}
constructor(
source: ConsumableReadableStreamSource<T>,
strategy?: QueuingStrategy<T>,
@ -140,7 +202,10 @@ export class ConsumableReadableStream<T> extends ReadableStream<Consumable<T>> {
async start(controller) {
wrappedController = {
async enqueue(chunk) {
await enqueue(controller, chunk);
await ConsumableReadableStream.enqueue(
controller,
chunk,
);
},
close() {
controller.close();
@ -163,129 +228,3 @@ export class ConsumableReadableStream<T> extends ReadableStream<Consumable<T>> {
);
}
}
export interface ConsumableWritableStreamSink<T> {
start?(
controller: WritableStreamDefaultController,
): void | PromiseLike<void>;
write?(
chunk: T,
controller: WritableStreamDefaultController,
): void | PromiseLike<void>;
abort?(reason: unknown): void | PromiseLike<void>;
close?(): void | PromiseLike<void>;
}
export class ConsumableWritableStream<T> extends WritableStream<Consumable<T>> {
static async write<T>(
writer: WritableStreamDefaultWriter<Consumable<T>>,
value: T,
) {
const consumable = new Consumable(value);
await writer.write(consumable);
await consumable.consumed;
}
constructor(
sink: ConsumableWritableStreamSink<T>,
strategy?: QueuingStrategy<T>,
) {
let wrappedStrategy: QueuingStrategy<Consumable<T>> | undefined;
if (strategy) {
wrappedStrategy = {};
if ("highWaterMark" in strategy) {
wrappedStrategy.highWaterMark = strategy.highWaterMark;
}
if ("size" in strategy) {
wrappedStrategy.size = (chunk) => {
return strategy.size!(chunk.value);
};
}
}
super(
{
start(controller) {
return sink.start?.(controller);
},
async write(chunk, controller) {
await chunk.tryConsume((value) =>
sink.write?.(value, controller),
);
},
abort(reason) {
return sink.abort?.(reason);
},
close() {
return sink.close?.();
},
},
wrappedStrategy,
);
}
}
export interface ConsumableTransformer<I, O> {
start?(
controller: ConsumableReadableStreamController<O>,
): void | PromiseLike<void>;
transform?(
chunk: I,
controller: ConsumableReadableStreamController<O>,
): void | PromiseLike<void>;
flush?(
controller: ConsumableReadableStreamController<O>,
): void | PromiseLike<void>;
}
export class ConsumableTransformStream<I, O> extends TransformStream<
Consumable<I>,
Consumable<O>
> {
constructor(transformer: ConsumableTransformer<I, O>) {
let wrappedController:
| ConsumableReadableStreamController<O>
| undefined;
super({
async start(controller) {
wrappedController = {
async enqueue(chunk) {
await enqueue(controller, chunk);
},
close() {
controller.terminate();
},
error(reason) {
controller.error(reason);
},
};
await transformer.start?.(wrappedController);
},
async transform(chunk) {
await chunk.tryConsume((value) =>
transformer.transform?.(value, wrappedController!),
);
chunk.consume();
},
async flush() {
await transformer.flush?.(wrappedController!);
},
});
}
}
export class ConsumableInspectStream<T> extends TransformStream<
Consumable<T>,
Consumable<T>
> {
constructor(callback: (value: T) => void) {
super({
transform(chunk, controller) {
callback(chunk.value);
controller.enqueue(chunk);
},
});
}
}

View file

@ -1,10 +1,8 @@
import { describe, expect, it, jest } from "@jest/globals";
import {
ConsumableReadableStream,
ConsumableWritableStream,
} from "./consumable.js";
import { ConsumableReadableStream } from "./consumable.js";
import { DistributionStream } from "./distribution.js";
import { MaybeConsumable } from "./maybe-consumable.js";
const TestData = new Uint8Array(50);
for (let i = 0; i < 50; i += 1) {
@ -32,7 +30,7 @@ async function testInputOutput(
})
.pipeThrough(new DistributionStream(10, combine || undefined))
.pipeTo(
new ConsumableWritableStream({
new MaybeConsumable.WritableStream({
write(chunk) {
// chunk will be reused, so we need to copy it
write(chunk.slice());

View file

@ -1,4 +1,6 @@
import { ConsumableTransformStream } from "./consumable.js";
import { ConsumableReadableStream } from "./consumable.js";
import { MaybeConsumable } from "./maybe-consumable.js";
import { TransformStream } from "./stream.js";
/**
* Splits or combines buffers to specified size.
@ -77,34 +79,42 @@ export class BufferCombiner {
}
}
export class DistributionStream extends ConsumableTransformStream<
Uint8Array,
Uint8Array
export class DistributionStream extends TransformStream<
MaybeConsumable<Uint8Array>,
MaybeConsumable<Uint8Array>
> {
constructor(size: number, combine = false) {
const combiner = combine ? new BufferCombiner(size) : undefined;
super({
async transform(chunk, controller) {
await MaybeConsumable.tryConsume(chunk, async (chunk) => {
if (combiner) {
for (const buffer of combiner.push(chunk)) {
await controller.enqueue(buffer);
await ConsumableReadableStream.enqueue(
controller,
buffer,
);
}
} else {
let offset = 0;
let available = chunk.byteLength;
while (available > 0) {
const end = offset + size;
await controller.enqueue(chunk.subarray(offset, end));
await ConsumableReadableStream.enqueue(
controller,
chunk.subarray(offset, end),
);
offset = end;
available -= size;
}
}
});
},
async flush(controller) {
flush(controller) {
if (combiner) {
const data = combiner.flush();
if (data) {
await controller.enqueue(data);
controller.enqueue(data);
}
}
},

View file

@ -6,6 +6,7 @@ export * from "./decode-utf8.js";
export * from "./distribution.js";
export * from "./duplex.js";
export * from "./inspect.js";
export * from "./maybe-consumable.js";
export * from "./pipe-from.js";
export * from "./push-readable.js";
export * from "./split-string.js";

View file

@ -0,0 +1,95 @@
import { Consumable } from "./consumable.js";
import type { WritableStreamDefaultController } from "./stream.js";
import {
WritableStream as NativeWritableStream,
TransformStream,
type QueuingStrategy,
} from "./stream.js";
export type MaybeConsumable<T> = T | Consumable<T>;
export namespace MaybeConsumable {
export function tryConsume<T, R>(
value: T,
callback: (value: T extends Consumable<infer U> ? U : T) => R,
): R {
if (value instanceof Consumable) {
return value.tryConsume(callback);
} else {
return callback(value as never);
}
}
export class UnwrapStream<T> extends TransformStream<
MaybeConsumable<T>,
T
> {
constructor() {
super({
transform(chunk, controller) {
MaybeConsumable.tryConsume(chunk, (chunk) => {
controller.enqueue(chunk as T);
});
},
});
}
}
export interface WritableStreamSink<in T> {
start?(
controller: WritableStreamDefaultController,
): void | PromiseLike<void>;
write?(
chunk: T,
controller: WritableStreamDefaultController,
): void | PromiseLike<void>;
abort?(reason: unknown): void | PromiseLike<void>;
close?(): void | PromiseLike<void>;
}
export class WritableStream<in T> extends NativeWritableStream<
MaybeConsumable<T>
> {
constructor(
sink: WritableStreamSink<T>,
strategy?: QueuingStrategy<T>,
) {
let wrappedStrategy:
| QueuingStrategy<MaybeConsumable<T>>
| undefined;
if (strategy) {
wrappedStrategy = {};
if ("highWaterMark" in strategy) {
wrappedStrategy.highWaterMark = strategy.highWaterMark;
}
if ("size" in strategy) {
wrappedStrategy.size = (chunk) => {
return strategy.size!(
chunk instanceof Consumable ? chunk.value : chunk,
);
};
}
}
super(
{
start(controller) {
return sink.start?.(controller);
},
async write(chunk, controller) {
await MaybeConsumable.tryConsume(chunk, (chunk) =>
sink.write?.(chunk as T, controller),
);
},
abort(reason) {
return sink.abort?.(reason);
},
close() {
return sink.close?.();
},
},
wrappedStrategy,
);
}
}
}

View file

@ -36,10 +36,10 @@ const Global = globalThis as unknown as GlobalExtension;
export const AbortController = Global.AbortController;
export type ReadableStream<T> = ReadableStreamType<T>;
export type ReadableStream<out T> = ReadableStreamType<T>;
export const ReadableStream = Global.ReadableStream;
export type WritableStream<T> = WritableStreamType<T>;
export type WritableStream<in T> = WritableStreamType<T>;
export const WritableStream = Global.WritableStream;
export type TransformStream<I, O> = TransformStreamType<I, O>;

View file

@ -136,7 +136,7 @@ export declare class ReadableByteStreamController {
*
* @public
*/
export declare class ReadableStream<R> implements AsyncIterable<R> {
export declare class ReadableStream<out R> implements AsyncIterable<R> {
constructor(
underlyingSource: UnderlyingByteSource,
strategy?: {
@ -380,7 +380,7 @@ export declare class ReadableStreamDefaultController<R> {
*
* @public
*/
export declare class ReadableStreamDefaultReader<R> {
export declare class ReadableStreamDefaultReader<out R> {
constructor(stream: ReadableStream<R>);
/**
* Returns a promise that will be fulfilled when the stream becomes closed,
@ -446,7 +446,7 @@ export declare interface ReadableStreamIteratorOptions {
}
/**
* A common interface for a `ReadadableStream` implementation.
* A common interface for a `ReadableStream` implementation.
*
* @public
*/
@ -643,7 +643,7 @@ export declare type UnderlyingByteSourceStartCallback = (
*
* @public
*/
export declare interface UnderlyingSink<W> {
export declare interface UnderlyingSink<in W> {
/**
* A function that is called immediately during creation of the {@link WritableStream}.
*/
@ -749,7 +749,7 @@ export declare type UnderlyingSourceStartCallback<R> = (
*
* @public
*/
export declare class WritableStream<W> {
export declare class WritableStream<in W> {
constructor(
underlyingSink?: UnderlyingSink<W>,
strategy?: QueuingStrategy<W>,
@ -822,7 +822,7 @@ export declare class WritableStreamDefaultController {
*
* @public
*/
export declare class WritableStreamDefaultWriter<W> {
export declare class WritableStreamDefaultWriter<in W> {
constructor(stream: WritableStream<W>);
/**
* Returns a promise that will be fulfilled when the stream becomes closed, or rejected if the stream ever errors or

View file

@ -275,7 +275,7 @@ export class Struct<
#extra: Record<PropertyKey, unknown> = {};
#postDeserialized?: StructPostDeserialized<TFields, unknown> | undefined;
#postDeserialized?: StructPostDeserialized<never, unknown> | undefined;
constructor(options?: Partial<Readonly<StructOptions>>) {
this.options = { ...StructDefaultOptions, ...options };
@ -625,8 +625,8 @@ export class Struct<
// Run `postDeserialized`
if (this.#postDeserialized) {
const override = this.#postDeserialized.call(
value as TFields,
value as TFields,
value as never,
value as never,
);
// If it returns a new value, use that as result
// Otherwise it only inspects/mutates the object in place.
@ -640,15 +640,16 @@ export class Struct<
.valueOrPromise();
}
serialize(init: Evaluate<Omit<TFields, TOmitInitKey>>): Uint8Array;
serialize(
init: Evaluate<Omit<TFields, TOmitInitKey>>,
output: Uint8Array,
): number;
/**
* Serialize a struct value to a buffer.
* @param init Fields of the struct
* @param output The buffer to serialize the struct to. It must be large enough to hold the entire struct. If not provided, a new buffer will be created.
* @returns A view of `output` that contains the serialized struct, or a new buffer if `output` is not provided.
*/
serialize(
init: Evaluate<Omit<TFields, TOmitInitKey>>,
output?: Uint8Array,
): Uint8Array | number {
): Uint8Array {
let structValue: StructValue;
if (isStructValueInit(init)) {
structValue = init[STRUCT_VALUE_SYMBOL];
@ -683,10 +684,10 @@ export class Struct<
structSize += size;
}
let outputType = "number";
if (!output) {
output = new Uint8Array(structSize);
outputType = "Uint8Array";
} else if (output.length < structSize) {
throw new Error("Output buffer is too small");
}
const dataView = new DataView(
@ -700,8 +701,8 @@ export class Struct<
offset += size;
}
if (outputType === "number") {
return structSize;
if (output.length !== structSize) {
return output.subarray(0, structSize);
} else {
return output;
}

View file

@ -1,17 +1,11 @@
/// <reference types="node" />
import eslint from "@eslint/js";
import { existsSync } from "fs";
import { resolve } from "path";
import { dirname, resolve } from "path";
import tslint from "typescript-eslint";
import { fileURLToPath } from "url";
const cwd = process.cwd();
const project = [];
if (existsSync(resolve(cwd, "tsconfig.test.json"))) {
project.push("./tsconfig.test.json");
} else {
project.push("./tsconfig.build.json");
}
const root = resolve(dirname(fileURLToPath(import.meta.url)), "..", "..");
export default tslint.config(
eslint.configs.recommended,
@ -22,8 +16,12 @@ export default tslint.config(
{
languageOptions: {
parserOptions: {
tsconfigRootDir: cwd,
project: project,
tsconfigRootDir: root,
project: [
"libraries/*/tsconfig.test.json",
"libraries/*/tsconfig.build.json",
"apps/*/tsconfig.build.json",
],
},
},
rules: {
@ -67,15 +65,12 @@ export default tslint.config(
"@typescript-eslint/no-namespace": "off",
"@typescript-eslint/array-type": "error",
"@typescript-eslint/consistent-type-definitions": "error",
"@typescript-eslint/consistent-generic-constructors": "error",
"@typescript-eslint/consistent-indexed-object-style": "error",
"@typescript-eslint/no-this-alias": "error",
"@typescript-eslint/consistent-type-imports": [
"error",
{
prefer: "type-imports",
disallowTypeAnnotations: true,
fixStyle: "inline-type-imports",
},
],
"@typescript-eslint/consistent-type-imports": "error",
"@typescript-eslint/no-import-type-side-effects": "error",
},
},
);