diff --git a/apps/cli/src/index.ts b/apps/cli/src/index.ts index 43cebcc9..2fb34a25 100644 --- a/apps/cli/src/index.ts +++ b/apps/cli/src/index.ts @@ -6,10 +6,7 @@ import "source-map-support/register.js"; import { Adb, AdbServerClient } from "@yume-chan/adb"; import { AdbServerNodeTcpConnector } from "@yume-chan/adb-server-node-tcp"; -import { - ConsumableWritableStream, - WritableStream, -} from "@yume-chan/stream-extra"; +import { WritableStream } from "@yume-chan/stream-extra"; import { program } from "commander"; program @@ -142,7 +139,7 @@ createDeviceCommand("shell [args...]") process.stdin.setRawMode(true); process.stdin.on("data", (data: Uint8Array) => { - ConsumableWritableStream.write(stdinWriter, data).catch((e) => { + stdinWriter.write(data).catch((e) => { console.error(e); process.exit(1); }); diff --git a/libraries/adb-daemon-webusb/src/device.ts b/libraries/adb-daemon-webusb/src/device.ts index 626bc0ad..dd7ebf14 100644 --- a/libraries/adb-daemon-webusb/src/device.ts +++ b/libraries/adb-daemon-webusb/src/device.ts @@ -14,8 +14,8 @@ import type { WritableStream, } from "@yume-chan/stream-extra"; import { - ConsumableWritableStream, DuplexStreamFactory, + MaybeConsumable, ReadableStream, pipeFrom, } from "@yume-chan/stream-extra"; @@ -188,7 +188,7 @@ export class AdbDaemonWebUsbConnection const zeroMask = outEndpoint.packetSize - 1; this.#writable = pipeFrom( duplex.createWritable( - new ConsumableWritableStream({ + new MaybeConsumable.WritableStream({ write: async (chunk) => { try { await device.raw.transferOut( diff --git a/libraries/adb-scrcpy/src/client.ts b/libraries/adb-scrcpy/src/client.ts index 6b9aee8a..1c24eb4d 100644 --- a/libraries/adb-scrcpy/src/client.ts +++ b/libraries/adb-scrcpy/src/client.ts @@ -23,6 +23,7 @@ import { } from "@yume-chan/scrcpy"; import type { Consumable, + MaybeConsumable, ReadableStream, ReadableWritablePair, } from "@yume-chan/stream-extra"; @@ -100,7 +101,7 @@ export type AdbScrcpyAudioStreamMetadata = export class AdbScrcpyClient { static async pushServer( adb: Adb, - file: ReadableStream>, + file: ReadableStream>, filename = DEFAULT_SERVER_PATH, ) { const sync = await adb.sync(); diff --git a/libraries/adb-server-node-tcp/src/index.ts b/libraries/adb-server-node-tcp/src/index.ts index 43b7c698..cbff3e38 100644 --- a/libraries/adb-server-node-tcp/src/index.ts +++ b/libraries/adb-server-node-tcp/src/index.ts @@ -8,8 +8,8 @@ import type { AdbServerConnector, } from "@yume-chan/adb"; import { + MaybeConsumable, PushReadableStream, - UnwrapConsumableStream, WrapWritableStream, WritableStream, } from "@yume-chan/stream-extra"; @@ -101,7 +101,7 @@ export class AdbServerNodeTcpConnector implements AdbServerConnector { readable: connection.readable, writable: new WrapWritableStream( connection.writable, - ).bePipedThroughFrom(new UnwrapConsumableStream()), + ).bePipedThroughFrom(new MaybeConsumable.UnwrapStream()), get closed() { return connection.closed; }, diff --git a/libraries/adb/src/adb.ts b/libraries/adb/src/adb.ts index 3ccace61..56cceb16 100644 --- a/libraries/adb/src/adb.ts +++ b/libraries/adb/src/adb.ts @@ -1,4 +1,7 @@ -import type { Consumable, ReadableWritablePair } from "@yume-chan/stream-extra"; +import type { + MaybeConsumable, + ReadableWritablePair, +} from "@yume-chan/stream-extra"; import { ConcatStringStream, DecodeUtf8Stream } from "@yume-chan/stream-extra"; import type { ValueOrPromise } from "@yume-chan/struct"; @@ -20,7 +23,7 @@ export interface Closeable { } export interface AdbSocket - extends ReadableWritablePair>, + extends ReadableWritablePair>, Closeable { get service(): string; diff --git a/libraries/adb/src/commands/subprocess/protocols/none.ts b/libraries/adb/src/commands/subprocess/protocols/none.ts index e012d297..3ad57522 100644 --- a/libraries/adb/src/commands/subprocess/protocols/none.ts +++ b/libraries/adb/src/commands/subprocess/protocols/none.ts @@ -1,4 +1,4 @@ -import type { Consumable, WritableStream } from "@yume-chan/stream-extra"; +import type { MaybeConsumable, WritableStream } from "@yume-chan/stream-extra"; import { ReadableStream } from "@yume-chan/stream-extra"; import type { Adb, AdbSocket } from "../../../adb.js"; @@ -36,7 +36,7 @@ export class AdbSubprocessNoneProtocol implements AdbSubprocessProtocol { readonly #socket: AdbSocket; // Legacy shell forwards all data to stdin. - get stdin(): WritableStream> { + get stdin(): WritableStream> { return this.#socket.writable; } diff --git a/libraries/adb/src/commands/subprocess/protocols/shell.ts b/libraries/adb/src/commands/subprocess/protocols/shell.ts index 476eb1c7..ffabf3c6 100644 --- a/libraries/adb/src/commands/subprocess/protocols/shell.ts +++ b/libraries/adb/src/commands/subprocess/protocols/shell.ts @@ -1,15 +1,12 @@ import { PromiseResolver } from "@yume-chan/async"; -import type { - Consumable, - PushReadableStreamController, - ReadableStream, - WritableStreamDefaultWriter, -} from "@yume-chan/stream-extra"; import { - ConsumableWritableStream, + MaybeConsumable, PushReadableStream, StructDeserializeStream, WritableStream, + type PushReadableStreamController, + type ReadableStream, + type WritableStreamDefaultWriter, } from "@yume-chan/stream-extra"; import type { StructValueType } from "@yume-chan/struct"; import Struct, { placeholder } from "@yume-chan/struct"; @@ -64,9 +61,9 @@ export class AdbSubprocessShellProtocol implements AdbSubprocessProtocol { } readonly #socket: AdbSocket; - #writer: WritableStreamDefaultWriter>; + #writer: WritableStreamDefaultWriter>; - #stdin: WritableStream>; + #stdin: WritableStream>; get stdin() { return this.#stdin; } @@ -140,23 +137,22 @@ export class AdbSubprocessShellProtocol implements AdbSubprocessProtocol { this.#writer = this.#socket.writable.getWriter(); - this.#stdin = new WritableStream>({ + this.#stdin = new MaybeConsumable.WritableStream({ write: async (chunk) => { - await ConsumableWritableStream.write( - this.#writer, - AdbShellProtocolPacket.serialize({ - id: AdbShellProtocolId.Stdin, - data: chunk.value, - }), - ); - chunk.consume(); + await MaybeConsumable.tryConsume(chunk, async (chunk) => { + await this.#writer.write( + AdbShellProtocolPacket.serialize({ + id: AdbShellProtocolId.Stdin, + data: chunk, + }), + ); + }); }, }); } async resize(rows: number, cols: number) { - await ConsumableWritableStream.write( - this.#writer, + await this.#writer.write( AdbShellProtocolPacket.serialize({ id: AdbShellProtocolId.WindowSizeChange, // The "correct" format is `${rows}x${cols},${x_pixels}x${y_pixels}` diff --git a/libraries/adb/src/commands/subprocess/protocols/types.ts b/libraries/adb/src/commands/subprocess/protocols/types.ts index fdc4584b..bfd86043 100644 --- a/libraries/adb/src/commands/subprocess/protocols/types.ts +++ b/libraries/adb/src/commands/subprocess/protocols/types.ts @@ -1,5 +1,5 @@ import type { - Consumable, + MaybeConsumable, ReadableStream, WritableStream, } from "@yume-chan/stream-extra"; @@ -11,7 +11,7 @@ export interface AdbSubprocessProtocol { /** * A WritableStream that writes to the `stdin` stream. */ - readonly stdin: WritableStream>; + readonly stdin: WritableStream>; /** * The `stdout` stream of the process. diff --git a/libraries/adb/src/commands/sync/push.ts b/libraries/adb/src/commands/sync/push.ts index 53ade0ff..ff0e0a99 100644 --- a/libraries/adb/src/commands/sync/push.ts +++ b/libraries/adb/src/commands/sync/push.ts @@ -1,8 +1,8 @@ -import type { Consumable, ReadableStream } from "@yume-chan/stream-extra"; import { AbortController, - ConsumableWritableStream, DistributionStream, + MaybeConsumable, + type ReadableStream, } from "@yume-chan/stream-extra"; import Struct, { placeholder } from "@yume-chan/struct"; @@ -18,7 +18,7 @@ export const ADB_SYNC_MAX_PACKET_SIZE = 64 * 1024; export interface AdbSyncPushV1Options { socket: AdbSyncSocket; filename: string; - file: ReadableStream>; + file: ReadableStream>; type?: LinuxFileType; permission?: number; mtime?: number; @@ -31,7 +31,7 @@ export const AdbSyncOkResponse = new Struct({ littleEndian: true }).uint32( async function pipeFileData( locked: AdbSyncSocketLocked, - file: ReadableStream>, + file: ReadableStream>, packetSize: number, mtime: number, ) { @@ -40,7 +40,7 @@ async function pipeFileData( const abortController = new AbortController(); file.pipeThrough(new DistributionStream(packetSize, true)) .pipeTo( - new ConsumableWritableStream({ + new MaybeConsumable.WritableStream({ write: async (chunk) => { await adbSyncWriteRequest( locked, diff --git a/libraries/adb/src/commands/sync/request.ts b/libraries/adb/src/commands/sync/request.ts index 49bb50db..de574da5 100644 --- a/libraries/adb/src/commands/sync/request.ts +++ b/libraries/adb/src/commands/sync/request.ts @@ -19,10 +19,6 @@ export const AdbSyncNumberRequest = new Struct({ littleEndian: true }) .string("id", { length: 4 }) .uint32("arg"); -export const AdbSyncDataRequest = new Struct({ littleEndian: true }) - .concat(AdbSyncNumberRequest) - .uint8Array("data", { lengthField: "arg" }); - export interface AdbSyncWritable { write(buffer: Uint8Array): Promise; } @@ -33,22 +29,21 @@ export async function adbSyncWriteRequest( value: number | string | Uint8Array, ): Promise { if (typeof value === "number") { - const buffer = AdbSyncNumberRequest.serialize({ - id, - arg: value, - }); - await writable.write(buffer); - } else if (typeof value === "string") { - // Let `writable` buffer writes - const buffer = encodeUtf8(value); await writable.write( - AdbSyncNumberRequest.serialize({ id, arg: buffer.byteLength }), + AdbSyncNumberRequest.serialize({ id, arg: value }), ); - await writable.write(buffer); - } else { - await writable.write( - AdbSyncNumberRequest.serialize({ id, arg: value.byteLength }), - ); - await writable.write(value); + return; } + + if (typeof value === "string") { + value = encodeUtf8(value); + } + + // `writable` will copy inputs to an internal buffer, + // so we write header and `buffer` separately, + // to avoid an extra copy. + await writable.write( + AdbSyncNumberRequest.serialize({ id, arg: value.byteLength }), + ); + await writable.write(value); } diff --git a/libraries/adb/src/commands/sync/socket.ts b/libraries/adb/src/commands/sync/socket.ts index a187ab7e..15ff938a 100644 --- a/libraries/adb/src/commands/sync/socket.ts +++ b/libraries/adb/src/commands/sync/socket.ts @@ -1,11 +1,11 @@ import type { - Consumable, + MaybeConsumable, WritableStreamDefaultWriter, } from "@yume-chan/stream-extra"; import { BufferCombiner, BufferedReadableStream, - ConsumableWritableStream, + Consumable, } from "@yume-chan/stream-extra"; import type { AsyncExactReadable } from "@yume-chan/struct"; @@ -13,7 +13,7 @@ import type { AdbSocket } from "../../adb.js"; import { AutoResetEvent } from "../../utils/index.js"; export class AdbSyncSocketLocked implements AsyncExactReadable { - readonly #writer: WritableStreamDefaultWriter>; + readonly #writer: WritableStreamDefaultWriter>; readonly #readable: BufferedReadableStream; readonly #socketLock: AutoResetEvent; readonly #writeLock = new AutoResetEvent(); @@ -24,7 +24,7 @@ export class AdbSyncSocketLocked implements AsyncExactReadable { } constructor( - writer: WritableStreamDefaultWriter>, + writer: WritableStreamDefaultWriter>, readable: BufferedReadableStream, bufferSize: number, lock: AutoResetEvent, @@ -35,8 +35,8 @@ export class AdbSyncSocketLocked implements AsyncExactReadable { this.#combiner = new BufferCombiner(bufferSize); } - async #writeInnerStream(buffer: Uint8Array) { - await ConsumableWritableStream.write(this.#writer, buffer); + async #writeConsumable(buffer: Uint8Array) { + await Consumable.WritableStream.write(this.#writer, buffer); } async flush() { @@ -44,7 +44,7 @@ export class AdbSyncSocketLocked implements AsyncExactReadable { await this.#writeLock.wait(); const buffer = this.#combiner.flush(); if (buffer) { - await this.#writeInnerStream(buffer); + await this.#writeConsumable(buffer); } } finally { this.#writeLock.notifyOne(); @@ -55,7 +55,7 @@ export class AdbSyncSocketLocked implements AsyncExactReadable { try { await this.#writeLock.wait(); for (const buffer of this.#combiner.push(data)) { - await this.#writeInnerStream(buffer); + await this.#writeConsumable(buffer); } } finally { this.#writeLock.notifyOne(); diff --git a/libraries/adb/src/commands/sync/sync.ts b/libraries/adb/src/commands/sync/sync.ts index d3595adb..02fd5a1e 100644 --- a/libraries/adb/src/commands/sync/sync.ts +++ b/libraries/adb/src/commands/sync/sync.ts @@ -1,5 +1,5 @@ import { AutoDisposable } from "@yume-chan/event"; -import type { Consumable, ReadableStream } from "@yume-chan/stream-extra"; +import type { MaybeConsumable, ReadableStream } from "@yume-chan/stream-extra"; import type { Adb, AdbSocket } from "../../adb.js"; import { AdbFeature } from "../../features.js"; @@ -31,7 +31,7 @@ export function dirname(path: string): string { export interface AdbSyncWriteOptions { filename: string; - file: ReadableStream>; + file: ReadableStream>; type?: LinuxFileType; permission?: number; mtime?: number; diff --git a/libraries/adb/src/daemon/dispatcher.ts b/libraries/adb/src/daemon/dispatcher.ts index 38a19ed7..dd0fb308 100644 --- a/libraries/adb/src/daemon/dispatcher.ts +++ b/libraries/adb/src/daemon/dispatcher.ts @@ -3,15 +3,12 @@ import { PromiseResolver, delay, } from "@yume-chan/async"; -import type { - Consumable, - ReadableWritablePair, - WritableStreamDefaultWriter, -} from "@yume-chan/stream-extra"; import { AbortController, - ConsumableWritableStream, + Consumable, WritableStream, + type ReadableWritablePair, + type WritableStreamDefaultWriter, } from "@yume-chan/stream-extra"; import { EMPTY_UINT8_ARRAY, NumberFieldType } from "@yume-chan/struct"; @@ -231,7 +228,10 @@ export class AdbPacketDispatcher implements Closeable { let payload: Uint8Array; if (this.options.initialDelayedAckBytes !== 0) { payload = new Uint8Array(4); - new DataView(payload.buffer).setUint32(0, ackBytes, true); + payload[0] = ackBytes & 0xff; + payload[1] = (ackBytes >> 8) & 0xff; + payload[2] = (ackBytes >> 16) & 0xff; + payload[3] = (ackBytes >> 24) & 0xff; } else { payload = EMPTY_UINT8_ARRAY; } @@ -374,7 +374,7 @@ export class AdbPacketDispatcher implements Closeable { throw new Error("payload too large"); } - await ConsumableWritableStream.write(this.#writer, { + await Consumable.WritableStream.write(this.#writer, { command, arg0, arg1, diff --git a/libraries/adb/src/daemon/packet.ts b/libraries/adb/src/daemon/packet.ts index 91df199f..051acf30 100644 --- a/libraries/adb/src/daemon/packet.ts +++ b/libraries/adb/src/daemon/packet.ts @@ -1,4 +1,8 @@ -import { ConsumableTransformStream } from "@yume-chan/stream-extra"; +import type { Consumable } from "@yume-chan/stream-extra"; +import { + ConsumableReadableStream, + TransformStream, +} from "@yume-chan/stream-extra"; import Struct from "@yume-chan/struct"; export enum AdbCommand { @@ -49,26 +53,33 @@ export function calculateChecksum(payload: Uint8Array): number { return payload.reduce((result, item) => result + item, 0); } -export class AdbPacketSerializeStream extends ConsumableTransformStream< - AdbPacketInit, - Uint8Array +export class AdbPacketSerializeStream extends TransformStream< + Consumable, + Consumable > { constructor() { const headerBuffer = new Uint8Array(AdbPacketHeader.size); super({ transform: async (chunk, controller) => { - const init = chunk as AdbPacketInit & AdbPacketHeaderInit; - init.payloadLength = init.payload.byteLength; + await chunk.tryConsume(async (chunk) => { + const init = chunk as AdbPacketInit & AdbPacketHeaderInit; + init.payloadLength = init.payload.byteLength; - AdbPacketHeader.serialize(init, headerBuffer); - await controller.enqueue(headerBuffer); + await ConsumableReadableStream.enqueue( + controller, + AdbPacketHeader.serialize(init, headerBuffer), + ); - if (init.payload.byteLength) { - // USB protocol preserves packet boundaries, - // so we must write payload separately as native ADB does, - // otherwise the read operation on device will fail. - await controller.enqueue(init.payload); - } + if (init.payload.byteLength) { + // USB protocol preserves packet boundaries, + // so we must write payload separately as native ADB does, + // otherwise the read operation on device will fail. + await ConsumableReadableStream.enqueue( + controller, + init.payload, + ); + } + }); }, }); } diff --git a/libraries/adb/src/daemon/socket.ts b/libraries/adb/src/daemon/socket.ts index 31befc59..d3937d39 100644 --- a/libraries/adb/src/daemon/socket.ts +++ b/libraries/adb/src/daemon/socket.ts @@ -1,16 +1,13 @@ import { PromiseResolver } from "@yume-chan/async"; import type { Disposable } from "@yume-chan/event"; -import type { - AbortSignal, - Consumable, - PushReadableStreamController, - ReadableStream, - WritableStream, - WritableStreamDefaultController, -} from "@yume-chan/stream-extra"; import { - ConsumableWritableStream, + MaybeConsumable, PushReadableStream, + type AbortSignal, + type PushReadableStreamController, + type ReadableStream, + type WritableStream, + type WritableStreamDefaultController, } from "@yume-chan/stream-extra"; import type { AdbSocket } from "../adb.js"; @@ -50,7 +47,7 @@ export class AdbDaemonSocketController } #writableController!: WritableStreamDefaultController; - readonly writable: WritableStream>; + readonly writable: WritableStream>; #closed = false; @@ -86,7 +83,7 @@ export class AdbDaemonSocketController this.#readableController = controller; }); - this.writable = new ConsumableWritableStream({ + this.writable = new MaybeConsumable.WritableStream({ start: (controller) => { this.#writableController = controller; }, @@ -216,7 +213,7 @@ export class AdbDaemonSocket implements AdbDaemonSocketInfo, AdbSocket { get readable(): ReadableStream { return this.#controller.readable; } - get writable(): WritableStream> { + get writable(): WritableStream> { return this.#controller.writable; } diff --git a/libraries/adb/src/daemon/transport.ts b/libraries/adb/src/daemon/transport.ts index daca6996..d1be3975 100644 --- a/libraries/adb/src/daemon/transport.ts +++ b/libraries/adb/src/daemon/transport.ts @@ -1,9 +1,9 @@ import { PromiseResolver } from "@yume-chan/async"; -import type { Consumable, ReadableWritablePair } from "@yume-chan/stream-extra"; import { AbortController, - ConsumableWritableStream, + Consumable, WritableStream, + type ReadableWritablePair, } from "@yume-chan/stream-extra"; import type { ValueOrPromise } from "@yume-chan/struct"; import { decodeUtf8, encodeUtf8 } from "@yume-chan/struct"; @@ -187,7 +187,10 @@ export class AdbDaemonTransport implements AdbTransport { // Because we don't know if the device needs it or not. (init as AdbPacketInit).checksum = calculateChecksum(init.payload); (init as AdbPacketInit).magic = init.command ^ 0xffffffff; - await ConsumableWritableStream.write(writer, init as AdbPacketInit); + await Consumable.WritableStream.write( + writer, + init as AdbPacketInit, + ); } const actualFeatures = features.slice(); diff --git a/libraries/adb/src/server/client.ts b/libraries/adb/src/server/client.ts index a3615a8a..68ea3c99 100644 --- a/libraries/adb/src/server/client.ts +++ b/libraries/adb/src/server/client.ts @@ -8,7 +8,7 @@ import type { } from "@yume-chan/stream-extra"; import { BufferedReadableStream, - UnwrapConsumableStream, + MaybeConsumable, WrapWritableStream, } from "@yume-chan/stream-extra"; import type { @@ -408,7 +408,7 @@ export class AdbServerClient { readable: readable.release(), writable: new WrapWritableStream( connection.writable, - ).bePipedThroughFrom(new UnwrapConsumableStream()), + ).bePipedThroughFrom(new MaybeConsumable.UnwrapStream()), get closed() { return connection.closed; }, diff --git a/libraries/android-bin/src/bu.ts b/libraries/android-bin/src/bu.ts index c6982b74..4610b011 100644 --- a/libraries/android-bin/src/bu.ts +++ b/libraries/android-bin/src/bu.ts @@ -1,5 +1,5 @@ import { AdbCommandBase } from "@yume-chan/adb"; -import type { Consumable, ReadableStream } from "@yume-chan/stream-extra"; +import type { MaybeConsumable, ReadableStream } from "@yume-chan/stream-extra"; import { ConcatStringStream, DecodeUtf8Stream } from "@yume-chan/stream-extra"; export interface AdbBackupOptions { @@ -15,7 +15,7 @@ export interface AdbBackupOptions { export interface AdbRestoreOptions { user: number; - file: ReadableStream>; + file: ReadableStream>; } export class AdbBackup extends AdbCommandBase { diff --git a/libraries/android-bin/src/pm.ts b/libraries/android-bin/src/pm.ts index 53a60700..f874b579 100644 --- a/libraries/android-bin/src/pm.ts +++ b/libraries/android-bin/src/pm.ts @@ -5,7 +5,7 @@ import type { Adb } from "@yume-chan/adb"; import { AdbCommandBase, escapeArg } from "@yume-chan/adb"; -import type { Consumable, ReadableStream } from "@yume-chan/stream-extra"; +import type { MaybeConsumable, ReadableStream } from "@yume-chan/stream-extra"; import { ConcatStringStream, DecodeUtf8Stream, @@ -295,7 +295,7 @@ export class PackageManager extends AdbCommandBase { } async pushAndInstallStream( - stream: ReadableStream>, + stream: ReadableStream>, options?: Partial, ): Promise { const sync = await this.adb.sync(); @@ -334,7 +334,7 @@ export class PackageManager extends AdbCommandBase { async installStream( size: number, - stream: ReadableStream>, + stream: ReadableStream>, options?: Partial, ): Promise { // Android 7 added both `cmd` command and streaming install support, @@ -548,7 +548,7 @@ export class PackageManager extends AdbCommandBase { sessionId: number, splitName: string, size: number, - stream: ReadableStream>, + stream: ReadableStream>, ) { const args: string[] = [ "pm", @@ -625,7 +625,7 @@ export class PackageManagerInstallSession { addSplitStream( splitName: string, size: number, - stream: ReadableStream>, + stream: ReadableStream>, ) { return this.#packageManager.sessionAddSplitStream( this.#id, diff --git a/libraries/aoa/src/keyboard.ts b/libraries/aoa/src/keyboard.ts index 8a7e5dd7..6b79faf9 100644 --- a/libraries/aoa/src/keyboard.ts +++ b/libraries/aoa/src/keyboard.ts @@ -274,7 +274,7 @@ export class HidKeyboard { static readonly REPORT_SIZE = 8; #modifiers = 0; - #keys: Set = new Set(); + #keys = new Set(); down(key: HidKeyCode) { if (key >= HidKeyCode.ControlLeft && key <= HidKeyCode.MetaRight) { diff --git a/libraries/aoa/src/touchscreen.ts b/libraries/aoa/src/touchscreen.ts index 521c64a0..bdab2048 100644 --- a/libraries/aoa/src/touchscreen.ts +++ b/libraries/aoa/src/touchscreen.ts @@ -85,7 +85,7 @@ export class HidTouchScreen { static readonly DESCRIPTOR = DESCRIPTOR; - #fingers: Map = new Map(); + #fingers = new Map(); down(id: number, x: number, y: number) { if (this.#fingers.size >= 10) { diff --git a/libraries/scrcpy/src/control/writer.ts b/libraries/scrcpy/src/control/writer.ts index 0dc8a061..5397ff76 100644 --- a/libraries/scrcpy/src/control/writer.ts +++ b/libraries/scrcpy/src/control/writer.ts @@ -1,8 +1,7 @@ -import type { +import { Consumable, - WritableStreamDefaultWriter, + type WritableStreamDefaultWriter, } from "@yume-chan/stream-extra"; -import { ConsumableWritableStream } from "@yume-chan/stream-extra"; import type { ScrcpyOptions } from "../options/index.js"; @@ -29,7 +28,7 @@ export class ScrcpyControlMessageWriter { } async write(message: Uint8Array) { - await ConsumableWritableStream.write(this.#writer, message); + await Consumable.WritableStream.write(this.#writer, message); } async injectKeyCode( diff --git a/libraries/stream-extra/src/consumable.ts b/libraries/stream-extra/src/consumable.ts index 9993a9c9..27e16679 100644 --- a/libraries/stream-extra/src/consumable.ts +++ b/libraries/stream-extra/src/consumable.ts @@ -5,7 +5,10 @@ import type { WritableStreamDefaultController, WritableStreamDefaultWriter, } from "./stream.js"; -import { ReadableStream, TransformStream, WritableStream } from "./stream.js"; +import { + WritableStream as NativeWritableStream, + ReadableStream, +} from "./stream.js"; interface Task { run(callback: () => T): T; @@ -29,6 +32,10 @@ const createTask: Console["createTask"] = }, })); +function isPromiseLike(value: unknown): value is PromiseLike { + return typeof value === "object" && value !== null && "then" in value; +} + export class Consumable { readonly #task: Task; readonly #resolver: PromiseResolver; @@ -51,11 +58,23 @@ export class Consumable { this.#resolver.reject(error); } - async tryConsume(callback: (value: T) => U) { + tryConsume(callback: (value: T) => U) { try { - // eslint-disable-next-line @typescript-eslint/await-thenable - const result = await this.#task.run(() => callback(this.value)); - this.#resolver.resolve(); + let result = this.#task.run(() => callback(this.value)); + if (isPromiseLike(result)) { + result = result.then( + (value) => { + this.#resolver.resolve(); + return value; + }, + (e) => { + this.#resolver.reject(e); + throw e; + }, + ) as U; + } else { + this.#resolver.resolve(); + } return result; } catch (e) { this.#resolver.reject(e); @@ -64,36 +83,70 @@ export class Consumable { } } -async function enqueue( - controller: { enqueue: (chunk: Consumable) => void }, - chunk: T, -) { - const output = new Consumable(chunk); - controller.enqueue(output); - await output.consumed; -} - -export class WrapConsumableStream extends TransformStream> { - constructor() { - super({ - async transform(chunk, controller) { - await enqueue(controller, chunk); - }, - }); +export namespace Consumable { + export interface WritableStreamSink { + start?( + controller: WritableStreamDefaultController, + ): void | PromiseLike; + write?( + chunk: T, + controller: WritableStreamDefaultController, + ): void | PromiseLike; + abort?(reason: unknown): void | PromiseLike; + close?(): void | PromiseLike; } -} -export class UnwrapConsumableStream extends TransformStream< - Consumable, - T -> { - constructor() { - super({ - transform(chunk, controller) { - controller.enqueue(chunk.value); - chunk.consume(); - }, - }); + export class WritableStream extends NativeWritableStream< + Consumable + > { + static async write( + writer: WritableStreamDefaultWriter>, + value: T, + ) { + const consumable = new Consumable(value); + await writer.write(consumable); + await consumable.consumed; + } + + constructor( + sink: WritableStreamSink, + strategy?: QueuingStrategy, + ) { + let wrappedStrategy: QueuingStrategy> | undefined; + if (strategy) { + wrappedStrategy = {}; + if ("highWaterMark" in strategy) { + wrappedStrategy.highWaterMark = strategy.highWaterMark; + } + if ("size" in strategy) { + wrappedStrategy.size = (chunk) => { + return strategy.size!( + chunk instanceof Consumable ? chunk.value : chunk, + ); + }; + } + } + + super( + { + start(controller) { + return sink.start?.(controller); + }, + async write(chunk, controller) { + await chunk.tryConsume((chunk) => + sink.write?.(chunk, controller), + ); + }, + abort(reason) { + return sink.abort?.(reason); + }, + close() { + return sink.close?.(); + }, + }, + wrappedStrategy, + ); + } } } @@ -114,6 +167,15 @@ export interface ConsumableReadableStreamSource { } export class ConsumableReadableStream extends ReadableStream> { + static async enqueue( + controller: { enqueue: (chunk: Consumable) => void }, + chunk: T, + ) { + const output = new Consumable(chunk); + controller.enqueue(output); + await output.consumed; + } + constructor( source: ConsumableReadableStreamSource, strategy?: QueuingStrategy, @@ -140,7 +202,10 @@ export class ConsumableReadableStream extends ReadableStream> { async start(controller) { wrappedController = { async enqueue(chunk) { - await enqueue(controller, chunk); + await ConsumableReadableStream.enqueue( + controller, + chunk, + ); }, close() { controller.close(); @@ -163,129 +228,3 @@ export class ConsumableReadableStream extends ReadableStream> { ); } } - -export interface ConsumableWritableStreamSink { - start?( - controller: WritableStreamDefaultController, - ): void | PromiseLike; - write?( - chunk: T, - controller: WritableStreamDefaultController, - ): void | PromiseLike; - abort?(reason: unknown): void | PromiseLike; - close?(): void | PromiseLike; -} - -export class ConsumableWritableStream extends WritableStream> { - static async write( - writer: WritableStreamDefaultWriter>, - value: T, - ) { - const consumable = new Consumable(value); - await writer.write(consumable); - await consumable.consumed; - } - - constructor( - sink: ConsumableWritableStreamSink, - strategy?: QueuingStrategy, - ) { - let wrappedStrategy: QueuingStrategy> | undefined; - if (strategy) { - wrappedStrategy = {}; - if ("highWaterMark" in strategy) { - wrappedStrategy.highWaterMark = strategy.highWaterMark; - } - if ("size" in strategy) { - wrappedStrategy.size = (chunk) => { - return strategy.size!(chunk.value); - }; - } - } - - super( - { - start(controller) { - return sink.start?.(controller); - }, - async write(chunk, controller) { - await chunk.tryConsume((value) => - sink.write?.(value, controller), - ); - }, - abort(reason) { - return sink.abort?.(reason); - }, - close() { - return sink.close?.(); - }, - }, - wrappedStrategy, - ); - } -} - -export interface ConsumableTransformer { - start?( - controller: ConsumableReadableStreamController, - ): void | PromiseLike; - transform?( - chunk: I, - controller: ConsumableReadableStreamController, - ): void | PromiseLike; - flush?( - controller: ConsumableReadableStreamController, - ): void | PromiseLike; -} - -export class ConsumableTransformStream extends TransformStream< - Consumable, - Consumable -> { - constructor(transformer: ConsumableTransformer) { - let wrappedController: - | ConsumableReadableStreamController - | undefined; - - super({ - async start(controller) { - wrappedController = { - async enqueue(chunk) { - await enqueue(controller, chunk); - }, - close() { - controller.terminate(); - }, - error(reason) { - controller.error(reason); - }, - }; - - await transformer.start?.(wrappedController); - }, - async transform(chunk) { - await chunk.tryConsume((value) => - transformer.transform?.(value, wrappedController!), - ); - chunk.consume(); - }, - async flush() { - await transformer.flush?.(wrappedController!); - }, - }); - } -} - -export class ConsumableInspectStream extends TransformStream< - Consumable, - Consumable -> { - constructor(callback: (value: T) => void) { - super({ - transform(chunk, controller) { - callback(chunk.value); - controller.enqueue(chunk); - }, - }); - } -} diff --git a/libraries/stream-extra/src/distribution.spec.ts b/libraries/stream-extra/src/distribution.spec.ts index fe1e617f..0a6744ff 100644 --- a/libraries/stream-extra/src/distribution.spec.ts +++ b/libraries/stream-extra/src/distribution.spec.ts @@ -1,10 +1,8 @@ import { describe, expect, it, jest } from "@jest/globals"; -import { - ConsumableReadableStream, - ConsumableWritableStream, -} from "./consumable.js"; +import { ConsumableReadableStream } from "./consumable.js"; import { DistributionStream } from "./distribution.js"; +import { MaybeConsumable } from "./maybe-consumable.js"; const TestData = new Uint8Array(50); for (let i = 0; i < 50; i += 1) { @@ -32,7 +30,7 @@ async function testInputOutput( }) .pipeThrough(new DistributionStream(10, combine || undefined)) .pipeTo( - new ConsumableWritableStream({ + new MaybeConsumable.WritableStream({ write(chunk) { // chunk will be reused, so we need to copy it write(chunk.slice()); diff --git a/libraries/stream-extra/src/distribution.ts b/libraries/stream-extra/src/distribution.ts index 15b5d1e5..fd2a99c4 100644 --- a/libraries/stream-extra/src/distribution.ts +++ b/libraries/stream-extra/src/distribution.ts @@ -1,4 +1,6 @@ -import { ConsumableTransformStream } from "./consumable.js"; +import { ConsumableReadableStream } from "./consumable.js"; +import { MaybeConsumable } from "./maybe-consumable.js"; +import { TransformStream } from "./stream.js"; /** * Splits or combines buffers to specified size. @@ -77,34 +79,42 @@ export class BufferCombiner { } } -export class DistributionStream extends ConsumableTransformStream< - Uint8Array, - Uint8Array +export class DistributionStream extends TransformStream< + MaybeConsumable, + MaybeConsumable > { constructor(size: number, combine = false) { const combiner = combine ? new BufferCombiner(size) : undefined; super({ async transform(chunk, controller) { - if (combiner) { - for (const buffer of combiner.push(chunk)) { - await controller.enqueue(buffer); + await MaybeConsumable.tryConsume(chunk, async (chunk) => { + if (combiner) { + for (const buffer of combiner.push(chunk)) { + await ConsumableReadableStream.enqueue( + controller, + buffer, + ); + } + } else { + let offset = 0; + let available = chunk.byteLength; + while (available > 0) { + const end = offset + size; + await ConsumableReadableStream.enqueue( + controller, + chunk.subarray(offset, end), + ); + offset = end; + available -= size; + } } - } else { - let offset = 0; - let available = chunk.byteLength; - while (available > 0) { - const end = offset + size; - await controller.enqueue(chunk.subarray(offset, end)); - offset = end; - available -= size; - } - } + }); }, - async flush(controller) { + flush(controller) { if (combiner) { const data = combiner.flush(); if (data) { - await controller.enqueue(data); + controller.enqueue(data); } } }, diff --git a/libraries/stream-extra/src/index.ts b/libraries/stream-extra/src/index.ts index 664394f9..4493aa5e 100644 --- a/libraries/stream-extra/src/index.ts +++ b/libraries/stream-extra/src/index.ts @@ -6,6 +6,7 @@ export * from "./decode-utf8.js"; export * from "./distribution.js"; export * from "./duplex.js"; export * from "./inspect.js"; +export * from "./maybe-consumable.js"; export * from "./pipe-from.js"; export * from "./push-readable.js"; export * from "./split-string.js"; diff --git a/libraries/stream-extra/src/maybe-consumable.ts b/libraries/stream-extra/src/maybe-consumable.ts new file mode 100644 index 00000000..516a781b --- /dev/null +++ b/libraries/stream-extra/src/maybe-consumable.ts @@ -0,0 +1,95 @@ +import { Consumable } from "./consumable.js"; +import type { WritableStreamDefaultController } from "./stream.js"; +import { + WritableStream as NativeWritableStream, + TransformStream, + type QueuingStrategy, +} from "./stream.js"; + +export type MaybeConsumable = T | Consumable; + +export namespace MaybeConsumable { + export function tryConsume( + value: T, + callback: (value: T extends Consumable ? U : T) => R, + ): R { + if (value instanceof Consumable) { + return value.tryConsume(callback); + } else { + return callback(value as never); + } + } + + export class UnwrapStream extends TransformStream< + MaybeConsumable, + T + > { + constructor() { + super({ + transform(chunk, controller) { + MaybeConsumable.tryConsume(chunk, (chunk) => { + controller.enqueue(chunk as T); + }); + }, + }); + } + } + + export interface WritableStreamSink { + start?( + controller: WritableStreamDefaultController, + ): void | PromiseLike; + write?( + chunk: T, + controller: WritableStreamDefaultController, + ): void | PromiseLike; + abort?(reason: unknown): void | PromiseLike; + close?(): void | PromiseLike; + } + + export class WritableStream extends NativeWritableStream< + MaybeConsumable + > { + constructor( + sink: WritableStreamSink, + strategy?: QueuingStrategy, + ) { + let wrappedStrategy: + | QueuingStrategy> + | undefined; + if (strategy) { + wrappedStrategy = {}; + if ("highWaterMark" in strategy) { + wrappedStrategy.highWaterMark = strategy.highWaterMark; + } + if ("size" in strategy) { + wrappedStrategy.size = (chunk) => { + return strategy.size!( + chunk instanceof Consumable ? chunk.value : chunk, + ); + }; + } + } + + super( + { + start(controller) { + return sink.start?.(controller); + }, + async write(chunk, controller) { + await MaybeConsumable.tryConsume(chunk, (chunk) => + sink.write?.(chunk as T, controller), + ); + }, + abort(reason) { + return sink.abort?.(reason); + }, + close() { + return sink.close?.(); + }, + }, + wrappedStrategy, + ); + } + } +} diff --git a/libraries/stream-extra/src/stream.ts b/libraries/stream-extra/src/stream.ts index 623d739e..4e977030 100644 --- a/libraries/stream-extra/src/stream.ts +++ b/libraries/stream-extra/src/stream.ts @@ -36,10 +36,10 @@ const Global = globalThis as unknown as GlobalExtension; export const AbortController = Global.AbortController; -export type ReadableStream = ReadableStreamType; +export type ReadableStream = ReadableStreamType; export const ReadableStream = Global.ReadableStream; -export type WritableStream = WritableStreamType; +export type WritableStream = WritableStreamType; export const WritableStream = Global.WritableStream; export type TransformStream = TransformStreamType; diff --git a/libraries/stream-extra/src/types.ts b/libraries/stream-extra/src/types.ts index f66ce907..92c008c6 100644 --- a/libraries/stream-extra/src/types.ts +++ b/libraries/stream-extra/src/types.ts @@ -136,7 +136,7 @@ export declare class ReadableByteStreamController { * * @public */ -export declare class ReadableStream implements AsyncIterable { +export declare class ReadableStream implements AsyncIterable { constructor( underlyingSource: UnderlyingByteSource, strategy?: { @@ -380,7 +380,7 @@ export declare class ReadableStreamDefaultController { * * @public */ -export declare class ReadableStreamDefaultReader { +export declare class ReadableStreamDefaultReader { constructor(stream: ReadableStream); /** * Returns a promise that will be fulfilled when the stream becomes closed, @@ -446,7 +446,7 @@ export declare interface ReadableStreamIteratorOptions { } /** - * A common interface for a `ReadadableStream` implementation. + * A common interface for a `ReadableStream` implementation. * * @public */ @@ -643,7 +643,7 @@ export declare type UnderlyingByteSourceStartCallback = ( * * @public */ -export declare interface UnderlyingSink { +export declare interface UnderlyingSink { /** * A function that is called immediately during creation of the {@link WritableStream}. */ @@ -749,7 +749,7 @@ export declare type UnderlyingSourceStartCallback = ( * * @public */ -export declare class WritableStream { +export declare class WritableStream { constructor( underlyingSink?: UnderlyingSink, strategy?: QueuingStrategy, @@ -822,7 +822,7 @@ export declare class WritableStreamDefaultController { * * @public */ -export declare class WritableStreamDefaultWriter { +export declare class WritableStreamDefaultWriter { constructor(stream: WritableStream); /** * Returns a promise that will be fulfilled when the stream becomes closed, or rejected if the stream ever errors or diff --git a/libraries/struct/src/struct.ts b/libraries/struct/src/struct.ts index 000a30b4..69c7c0b0 100644 --- a/libraries/struct/src/struct.ts +++ b/libraries/struct/src/struct.ts @@ -275,7 +275,7 @@ export class Struct< #extra: Record = {}; - #postDeserialized?: StructPostDeserialized | undefined; + #postDeserialized?: StructPostDeserialized | undefined; constructor(options?: Partial>) { this.options = { ...StructDefaultOptions, ...options }; @@ -625,8 +625,8 @@ export class Struct< // Run `postDeserialized` if (this.#postDeserialized) { const override = this.#postDeserialized.call( - value as TFields, - value as TFields, + value as never, + value as never, ); // If it returns a new value, use that as result // Otherwise it only inspects/mutates the object in place. @@ -640,15 +640,16 @@ export class Struct< .valueOrPromise(); } - serialize(init: Evaluate>): Uint8Array; - serialize( - init: Evaluate>, - output: Uint8Array, - ): number; + /** + * Serialize a struct value to a buffer. + * @param init Fields of the struct + * @param output The buffer to serialize the struct to. It must be large enough to hold the entire struct. If not provided, a new buffer will be created. + * @returns A view of `output` that contains the serialized struct, or a new buffer if `output` is not provided. + */ serialize( init: Evaluate>, output?: Uint8Array, - ): Uint8Array | number { + ): Uint8Array { let structValue: StructValue; if (isStructValueInit(init)) { structValue = init[STRUCT_VALUE_SYMBOL]; @@ -683,10 +684,10 @@ export class Struct< structSize += size; } - let outputType = "number"; if (!output) { output = new Uint8Array(structSize); - outputType = "Uint8Array"; + } else if (output.length < structSize) { + throw new Error("Output buffer is too small"); } const dataView = new DataView( @@ -700,8 +701,8 @@ export class Struct< offset += size; } - if (outputType === "number") { - return structSize; + if (output.length !== structSize) { + return output.subarray(0, structSize); } else { return output; } diff --git a/toolchain/eslint-config/eslint.config.js b/toolchain/eslint-config/eslint.config.js index 566aa164..999dee11 100644 --- a/toolchain/eslint-config/eslint.config.js +++ b/toolchain/eslint-config/eslint.config.js @@ -1,17 +1,11 @@ /// import eslint from "@eslint/js"; -import { existsSync } from "fs"; -import { resolve } from "path"; +import { dirname, resolve } from "path"; import tslint from "typescript-eslint"; +import { fileURLToPath } from "url"; -const cwd = process.cwd(); -const project = []; -if (existsSync(resolve(cwd, "tsconfig.test.json"))) { - project.push("./tsconfig.test.json"); -} else { - project.push("./tsconfig.build.json"); -} +const root = resolve(dirname(fileURLToPath(import.meta.url)), "..", ".."); export default tslint.config( eslint.configs.recommended, @@ -22,8 +16,12 @@ export default tslint.config( { languageOptions: { parserOptions: { - tsconfigRootDir: cwd, - project: project, + tsconfigRootDir: root, + project: [ + "libraries/*/tsconfig.test.json", + "libraries/*/tsconfig.build.json", + "apps/*/tsconfig.build.json", + ], }, }, rules: { @@ -67,15 +65,12 @@ export default tslint.config( "@typescript-eslint/no-namespace": "off", "@typescript-eslint/array-type": "error", "@typescript-eslint/consistent-type-definitions": "error", + "@typescript-eslint/consistent-generic-constructors": "error", + "@typescript-eslint/consistent-indexed-object-style": "error", "@typescript-eslint/no-this-alias": "error", - "@typescript-eslint/consistent-type-imports": [ - "error", - { - prefer: "type-imports", - disallowTypeAnnotations: true, - fixStyle: "inline-type-imports", - }, - ], + "@typescript-eslint/consistent-type-imports": "error", + + "@typescript-eslint/no-import-type-side-effects": "error", }, }, );