diff --git a/.vscode/settings.json b/.vscode/settings.json index d4f89909..0ef8ed8b 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -67,5 +67,6 @@ ], "url": "https://developer.microsoft.com/json-schemas/rush/v5/version-policies.schema.json" } - ] + ], + "typescript.preferences.quoteStyle": "single" } diff --git a/common/config/rush/pnpm-lock.yaml b/common/config/rush/pnpm-lock.yaml index 340cd550..a0afcd97 100644 --- a/common/config/rush/pnpm-lock.yaml +++ b/common/config/rush/pnpm-lock.yaml @@ -24,6 +24,7 @@ specifiers: '@rush-temp/demo': file:./projects/demo.tgz '@rush-temp/event': file:./projects/event.tgz '@rush-temp/scrcpy': file:./projects/scrcpy.tgz + '@rush-temp/stream-extra': file:./projects/stream-extra.tgz '@rush-temp/struct': file:./projects/struct.tgz '@rush-temp/ts-package-builder': file:./projects/ts-package-builder.tgz '@rush-temp/unofficial-adb-book': file:./projects/unofficial-adb-book.tgz @@ -85,6 +86,7 @@ dependencies: '@rush-temp/demo': file:projects/demo.tgz_@mdx-js+react@1.6.22 '@rush-temp/event': file:projects/event.tgz_@types+node@17.0.33 '@rush-temp/scrcpy': file:projects/scrcpy.tgz_@types+node@17.0.33 + '@rush-temp/stream-extra': file:projects/stream-extra.tgz_@types+node@17.0.33 '@rush-temp/struct': file:projects/struct.tgz_@types+node@17.0.33 '@rush-temp/ts-package-builder': file:projects/ts-package-builder.tgz '@rush-temp/unofficial-adb-book': file:projects/unofficial-adb-book.tgz_d45f1a34685929383f8ab73cab148e80 @@ -10692,7 +10694,7 @@ packages: dev: false /through/2.3.8: - resolution: {integrity: sha1-DdTJ/6q8NXlgsbckEV1+Doai4fU=} + resolution: {integrity: sha512-w89qg7PI8wAdvX60bMDP+bFoD5Dvhm9oLheFp5O4a2QF0cSBGsBX4qZmadPMvVqlLJBBci+WqGGOAPvcDeNSVg==} dev: false /thunky/1.1.0: @@ -10700,7 +10702,7 @@ packages: dev: false /timed-out/4.0.1: - resolution: {integrity: sha1-8y6srFoXW+ol1/q1Zas+2HQe9W8=} + resolution: {integrity: sha512-G7r3AhovYtr5YKOWQkta8RKAPb+J9IsO4uVmzjl8AZwfhs8UcUwTiD6gcJYSgOtzyjvQKrKYn41syHbUWMkafA==} engines: {node: '>=0.10.0'} dev: false @@ -10762,7 +10764,7 @@ packages: dev: false /trim-repeated/1.0.0: - resolution: {integrity: sha1-42RqLqTokTEr9+rObPsFOAvAHCE=} + resolution: {integrity: sha512-pkonvlKk8/ZuR0D5tLW8ljt5I8kmxp2XKymhepUeOdCEfKpZaktSArkLHZt76OB1ZvO9bssUsDty4SWhLvZpLg==} engines: {node: '>=0.10.0'} dependencies: escape-string-regexp: 1.0.5 @@ -11858,6 +11860,31 @@ packages: - ts-node dev: false + file:projects/stream-extra.tgz_@types+node@17.0.33: + resolution: {integrity: sha512-rJ7QINFBmWnprVVBMtJ4JF8i51LLk1oPvLLiVG642Y/LstYOFULaDJt1lytyBQSfWynM79Hv8JN+ZKaXcQDBhg==, tarball: file:projects/stream-extra.tgz} + id: file:projects/stream-extra.tgz + name: '@rush-temp/stream-extra' + version: 0.0.0 + dependencies: + '@jest/globals': 28.1.0 + '@yume-chan/async': 2.1.4 + cross-env: 7.0.3 + jest: 28.1.0_@types+node@17.0.33 + ts-jest: 28.0.2_jest@28.1.0+typescript@4.7.2 + tslib: 2.4.0 + typescript: 4.7.2 + web-streams-polyfill: 4.0.0-beta.3 + transitivePeerDependencies: + - '@babel/core' + - '@types/jest' + - '@types/node' + - babel-jest + - esbuild + - node-notifier + - supports-color + - ts-node + dev: false + file:projects/struct.tgz_@types+node@17.0.33: resolution: {integrity: sha512-N4tRna6p02qJC7klmbbeELeqARWvpb3GXHjZRRi5DzTumGngXNQyzYpG3VCNZnmZV9a4vgDVJX/CZOiOKbOKMQ==, tarball: file:projects/struct.tgz} id: file:projects/struct.tgz diff --git a/common/config/rush/repo-state.json b/common/config/rush/repo-state.json index b75382bd..8edc08d7 100644 --- a/common/config/rush/repo-state.json +++ b/common/config/rush/repo-state.json @@ -1,4 +1,4 @@ // DO NOT MODIFY THIS FILE MANUALLY BUT DO COMMIT IT. It is generated and used by Rush. { - "pnpmShrinkwrapHash": "3798fc5f6f6c673b1888e77923d3eeace5651923" + "pnpmShrinkwrapHash": "bc1c6912e108986d20555db6e1b3427c76999db1" } diff --git a/libraries/adb-backend-direct-sockets/package.json b/libraries/adb-backend-direct-sockets/package.json index 690a2470..1ef51250 100644 --- a/libraries/adb-backend-direct-sockets/package.json +++ b/libraries/adb-backend-direct-sockets/package.json @@ -36,6 +36,7 @@ }, "dependencies": { "@yume-chan/adb": "^0.0.16", + "@yume-chan/stream-extra": "^0.0.16", "tslib": "^2.3.1" } } diff --git a/libraries/adb-backend-direct-sockets/src/index.ts b/libraries/adb-backend-direct-sockets/src/index.ts index a789b235..6cf2c078 100644 --- a/libraries/adb-backend-direct-sockets/src/index.ts +++ b/libraries/adb-backend-direct-sockets/src/index.ts @@ -1,4 +1,5 @@ -import { AdbBackend, AdbPacket, AdbPacketSerializeStream, pipeFrom, ReadableStream, StructDeserializeStream, WrapReadableStream, WrapWritableStream, WritableStream } from '@yume-chan/adb'; +import { AdbBackend, AdbPacket, AdbPacketSerializeStream } from '@yume-chan/adb'; +import { pipeFrom, ReadableStream, StructDeserializeStream, WrapReadableStream, WrapWritableStream, WritableStream } from '@yume-chan/stream-extra'; declare global { interface TCPSocket { diff --git a/libraries/adb-backend-webusb/package.json b/libraries/adb-backend-webusb/package.json index 9f8d40f4..dcc6dd7e 100644 --- a/libraries/adb-backend-webusb/package.json +++ b/libraries/adb-backend-webusb/package.json @@ -33,6 +33,7 @@ "dependencies": { "@types/w3c-web-usb": "^1.0.4", "@yume-chan/adb": "^0.0.16", + "@yume-chan/stream-extra": "^0.0.16", "@yume-chan/struct": "^0.0.16", "tslib": "^2.3.1" }, diff --git a/libraries/adb-backend-webusb/src/backend.ts b/libraries/adb-backend-webusb/src/backend.ts index 53c62f49..ee6430f6 100644 --- a/libraries/adb-backend-webusb/src/backend.ts +++ b/libraries/adb-backend-webusb/src/backend.ts @@ -1,5 +1,6 @@ -import { AdbPacketHeader, AdbPacketSerializeStream, DuplexStreamFactory, pipeFrom, ReadableStream, WritableStream, type AdbBackend, type AdbPacketData, type AdbPacketInit, type ReadableWritablePair } from '@yume-chan/adb'; -import { EMPTY_UINT8_ARRAY, StructDeserializeStream } from "@yume-chan/struct"; +import { AdbPacketHeader, AdbPacketSerializeStream, type AdbBackend, type AdbPacketData, type AdbPacketInit } from '@yume-chan/adb'; +import { DuplexStreamFactory, pipeFrom, ReadableStream, WritableStream } from '@yume-chan/stream-extra'; +import { EMPTY_UINT8_ARRAY, StructDeserializeStream } from '@yume-chan/struct'; export const ADB_DEVICE_FILTER: USBDeviceFilter = { classCode: 0xFF, @@ -173,6 +174,6 @@ export class AdbWebUsbBackend implements AdbBackend { } } - throw new Error('Unknown error'); + throw new Error('Can not find ADB interface'); } } diff --git a/libraries/adb-backend-ws/package.json b/libraries/adb-backend-ws/package.json index 435168d7..2a186462 100644 --- a/libraries/adb-backend-ws/package.json +++ b/libraries/adb-backend-ws/package.json @@ -36,6 +36,7 @@ }, "dependencies": { "@yume-chan/adb": "^0.0.16", + "@yume-chan/stream-extra": "^0.0.16", "tslib": "^2.3.1" } } diff --git a/libraries/adb-backend-ws/src/index.ts b/libraries/adb-backend-ws/src/index.ts index 196e150d..d5e93dc3 100644 --- a/libraries/adb-backend-ws/src/index.ts +++ b/libraries/adb-backend-ws/src/index.ts @@ -1,4 +1,5 @@ -import { AdbPacket, AdbPacketSerializeStream, DuplexStreamFactory, pipeFrom, ReadableStream, StructDeserializeStream, WritableStream, type AdbBackend } from '@yume-chan/adb'; +import { AdbPacket, AdbPacketSerializeStream, type AdbBackend } from '@yume-chan/adb'; +import { DuplexStreamFactory, pipeFrom, ReadableStream, StructDeserializeStream, WritableStream } from '@yume-chan/stream-extra'; export default class AdbWsBackend implements AdbBackend { public readonly serial: string; @@ -12,7 +13,7 @@ export default class AdbWsBackend implements AdbBackend { public async connect() { const socket = new WebSocket(this.serial); - socket.binaryType = "arraybuffer"; + socket.binaryType = 'arraybuffer'; await new Promise((resolve, reject) => { socket.onopen = resolve; diff --git a/libraries/adb/package.json b/libraries/adb/package.json index 7d45cbec..9f460ecd 100644 --- a/libraries/adb/package.json +++ b/libraries/adb/package.json @@ -34,6 +34,7 @@ "@yume-chan/async": "^2.1.4", "@yume-chan/dataview-bigint-polyfill": "^0.0.16", "@yume-chan/event": "^0.0.16", + "@yume-chan/stream-extra": "^0.0.16", "@yume-chan/struct": "^0.0.16", "tslib": "^2.3.1", "web-streams-polyfill": "^4.0.0-beta.3" diff --git a/libraries/adb/src/adb.ts b/libraries/adb/src/adb.ts index 0d400dd5..673f65eb 100644 --- a/libraries/adb/src/adb.ts +++ b/libraries/adb/src/adb.ts @@ -1,13 +1,14 @@ // cspell: ignore libusb import { PromiseResolver } from '@yume-chan/async'; +import { AbortController, DecodeUtf8Stream, GatherStringStream, WritableStream, type ReadableWritablePair } from '@yume-chan/stream-extra'; + import { AdbAuthenticationProcessor, ADB_DEFAULT_AUTHENTICATORS, type AdbCredentialStore } from './auth.js'; import { AdbPower, AdbReverseCommand, AdbSubprocess, AdbSync, AdbTcpIpCommand, escapeArg, framebuffer, install, type AdbFrameBuffer } from './commands/index.js'; import { AdbFeatures } from './features.js'; import { AdbCommand, calculateChecksum, type AdbPacketData, type AdbPacketInit } from './packet.js'; import { AdbIncomingSocketHandler, AdbPacketDispatcher, type AdbSocket, type Closeable } from './socket/index.js'; -import { AbortController, DecodeUtf8Stream, GatherStringStream, WritableStream, type ReadableWritablePair } from "./stream/index.js"; -import { decodeUtf8, encodeUtf8 } from "./utils/index.js"; +import { decodeUtf8, encodeUtf8 } from './utils/index.js'; export enum AdbPropKey { Product = 'ro.product.name', diff --git a/libraries/adb/src/auth.ts b/libraries/adb/src/auth.ts index 10ca6547..9894a644 100644 --- a/libraries/adb/src/auth.ts +++ b/libraries/adb/src/auth.ts @@ -1,6 +1,7 @@ import { PromiseResolver } from '@yume-chan/async'; import type { Disposable } from '@yume-chan/event'; import type { ValueOrPromise } from '@yume-chan/struct'; + import { calculatePublicKey, calculatePublicKeyLength, sign } from './crypto.js'; import { AdbCommand, type AdbPacketData } from './packet.js'; import { calculateBase64EncodedLength, encodeBase64 } from './utils/index.js'; diff --git a/libraries/adb/src/backend.ts b/libraries/adb/src/backend.ts index b7f1c5a4..b1e6c56f 100644 --- a/libraries/adb/src/backend.ts +++ b/libraries/adb/src/backend.ts @@ -1,6 +1,7 @@ +import type { ReadableWritablePair } from '@yume-chan/stream-extra'; import type { ValueOrPromise } from '@yume-chan/struct'; -import type { AdbPacketData, AdbPacketInit } from "./packet.js"; -import type { ReadableWritablePair } from "./stream/index.js"; + +import type { AdbPacketData, AdbPacketInit } from './packet.js'; export interface AdbBackend { readonly serial: string; diff --git a/libraries/adb/src/commands/base.ts b/libraries/adb/src/commands/base.ts index df413032..1e5ac5a5 100644 --- a/libraries/adb/src/commands/base.ts +++ b/libraries/adb/src/commands/base.ts @@ -1,4 +1,5 @@ import { AutoDisposable } from '@yume-chan/event'; + import type { Adb } from '../adb.js'; export class AdbCommandBase extends AutoDisposable { diff --git a/libraries/adb/src/commands/framebuffer.ts b/libraries/adb/src/commands/framebuffer.ts index 7515b34c..cf645b17 100644 --- a/libraries/adb/src/commands/framebuffer.ts +++ b/libraries/adb/src/commands/framebuffer.ts @@ -1,6 +1,7 @@ -import Struct from "@yume-chan/struct"; +import { BufferedStream } from '@yume-chan/stream-extra'; +import Struct from '@yume-chan/struct'; + import type { Adb } from '../adb.js'; -import { AdbBufferedStream } from '../stream/index.js'; const Version = new Struct({ littleEndian: true }) @@ -60,7 +61,7 @@ export type AdbFrameBuffer = AdbFrameBufferV1 | AdbFrameBufferV2; export async function framebuffer(adb: Adb): Promise { const socket = await adb.createSocket('framebuffer:'); - const stream = new AdbBufferedStream(socket); + const stream = new BufferedStream(socket.readable); const { version } = await Version.deserialize(stream); switch (version) { case 1: diff --git a/libraries/adb/src/commands/install.ts b/libraries/adb/src/commands/install.ts index 801421b3..1a4d43d7 100644 --- a/libraries/adb/src/commands/install.ts +++ b/libraries/adb/src/commands/install.ts @@ -1,7 +1,8 @@ -import type { Adb } from "../adb.js"; -import { WrapWritableStream, WritableStream } from "../stream/index.js"; -import { escapeArg } from "./subprocess/index.js"; -import type { AdbSync } from "./sync/index.js"; +import { WrapWritableStream, WritableStream } from '@yume-chan/stream-extra'; + +import type { Adb } from '../adb.js'; +import { escapeArg } from './subprocess/index.js'; +import type { AdbSync } from './sync/index.js'; export function install( adb: Adb, diff --git a/libraries/adb/src/commands/power.ts b/libraries/adb/src/commands/power.ts index 696ed345..6cc68e47 100644 --- a/libraries/adb/src/commands/power.ts +++ b/libraries/adb/src/commands/power.ts @@ -3,7 +3,7 @@ // cspell: ignore keyevent // cspell: ignore longpress -import { AdbCommandBase } from "./base.js"; +import { AdbCommandBase } from './base.js'; export class AdbPower extends AdbCommandBase { public reboot(name: string = '') { diff --git a/libraries/adb/src/commands/reverse.ts b/libraries/adb/src/commands/reverse.ts index 4b8700b2..4219e214 100644 --- a/libraries/adb/src/commands/reverse.ts +++ b/libraries/adb/src/commands/reverse.ts @@ -1,11 +1,12 @@ // cspell: ignore killforward import { AutoDisposable } from '@yume-chan/event'; +import { BufferedStream, BufferedStreamEndedError } from '@yume-chan/stream-extra'; import Struct from '@yume-chan/struct'; -import type { Adb } from "../adb.js"; + +import type { Adb } from '../adb.js'; import type { AdbIncomingSocketHandler, AdbSocket } from '../socket/index.js'; -import { AdbBufferedStream, BufferedStreamEndedError } from '../stream/index.js'; -import { decodeUtf8 } from "../utils/index.js"; +import { decodeUtf8 } from '../utils/index.js'; export interface AdbForwardListener { deviceSerial: string; @@ -52,7 +53,7 @@ export class AdbReverseCommand extends AutoDisposable { private async createBufferedStream(service: string) { const socket = await this.adb.createSocket(service); - return new AdbBufferedStream(socket); + return new BufferedStream(socket.readable); } private async sendRequest(service: string) { diff --git a/libraries/adb/src/commands/subprocess/command.ts b/libraries/adb/src/commands/subprocess/command.ts new file mode 100644 index 00000000..d85bc63d --- /dev/null +++ b/libraries/adb/src/commands/subprocess/command.ts @@ -0,0 +1,131 @@ +import { GatherStringStream, DecodeUtf8Stream } from '@yume-chan/stream-extra'; + +import { AdbCommandBase } from '../base.js'; +import { AdbSubprocessNoneProtocol, AdbSubprocessProtocol, AdbSubprocessProtocolConstructor, AdbSubprocessShellProtocol } from './protocols/index.js'; + +export interface AdbSubprocessOptions { + /** + * A list of `AdbSubprocessProtocolConstructor`s to be used. + * + * Different `AdbSubprocessProtocol` has different capabilities, thus requires specific adaptations. + * Check their documentations for details. + * + * The first protocol whose `isSupported` returns `true` will be used. + * If no `AdbSubprocessProtocol` is supported, an error will be thrown. + * + * @default [AdbSubprocessShellProtocol, AdbSubprocessNoneProtocol] + */ + protocols: AdbSubprocessProtocolConstructor[]; +} + +const DEFAULT_OPTIONS: AdbSubprocessOptions = { + protocols: [AdbSubprocessShellProtocol, AdbSubprocessNoneProtocol], +}; + +export interface AdbSubprocessWaitResult { + stdout: string; + stderr: string; + exitCode: number; +} + +export class AdbSubprocess extends AdbCommandBase { + private async createProtocol( + mode: 'pty' | 'raw', + command?: string | string[], + options?: Partial + ): Promise { + const { protocols } = { ...DEFAULT_OPTIONS, ...options }; + + let Constructor: AdbSubprocessProtocolConstructor | undefined; + for (const item of protocols) { + // It's async so can't use `Array#find` + if (await item.isSupported(this.adb)) { + Constructor = item; + break; + } + } + + if (!Constructor) { + throw new Error('No specified protocol is supported by the device'); + } + + if (Array.isArray(command)) { + command = command.join(' '); + } else if (command === undefined) { + // spawn the default shell + command = ''; + } + return await Constructor[mode](this.adb, command); + } + + /** + * Spawns an executable in PTY (interactive) mode. + * @param command The command to run. If omitted, the default shell will be spawned. + * @param options The options for creating the `AdbSubprocessProtocol` + * @returns A new `AdbSubprocessProtocol` instance connecting to the spawned process. + */ + public shell( + command?: string | string[], + options?: Partial + ): Promise { + return this.createProtocol('pty', command, options); + } + + /** + * Spawns an executable and pipe the output. + * @param command The command to run, or an array of strings containing both command and args. + * @param options The options for creating the `AdbSubprocessProtocol` + * @returns A new `AdbSubprocessProtocol` instance connecting to the spawned process. + */ + public spawn( + command: string | string[], + options?: Partial + ): Promise { + return this.createProtocol('raw', command, options); + } + + /** + * Spawns a new process, waits until it exits, and returns the entire output. + * @param command The command to run + * @param options The options for creating the `AdbSubprocessProtocol` + * @returns The entire output of the command + */ + public async spawnAndWait( + command: string | string[], + options?: Partial + ): Promise { + const shell = await this.spawn(command, options); + + const stdout = new GatherStringStream(); + const stderr = new GatherStringStream(); + + const [, , exitCode] = await Promise.all([ + shell.stdout + .pipeThrough(new DecodeUtf8Stream()) + .pipeTo(stdout), + shell.stderr + .pipeThrough(new DecodeUtf8Stream()) + .pipeTo(stderr), + shell.exit + ]); + + return { + stdout: stdout.result, + stderr: stderr.result, + exitCode, + }; + } + + /** + * Spawns a new process, waits until it exits, and returns the entire output. + * @param command The command to run + * @returns The entire output of the command + */ + public async spawnAndWaitLegacy(command: string | string[]): Promise { + const { stdout } = await this.spawnAndWait( + command, + { protocols: [AdbSubprocessNoneProtocol] } + ); + return stdout; + } +} diff --git a/libraries/adb/src/commands/subprocess/index.ts b/libraries/adb/src/commands/subprocess/index.ts index 16974065..5d5b3cf1 100644 --- a/libraries/adb/src/commands/subprocess/index.ts +++ b/libraries/adb/src/commands/subprocess/index.ts @@ -1,139 +1,3 @@ -import type { Adb } from '../../adb.js'; -import { DecodeUtf8Stream, GatherStringStream } from "../../stream/index.js"; -import { AdbSubprocessNoneProtocol, AdbSubprocessShellProtocol, type AdbSubprocessProtocol, type AdbSubprocessProtocolConstructor } from './protocols/index.js'; - +export * from './command.js'; export * from './protocols/index.js'; export * from './utils.js'; - -export interface AdbSubprocessOptions { - /** - * A list of `AdbSubprocessProtocolConstructor`s to be used. - * - * Different `AdbSubprocessProtocol` has different capabilities, thus requires specific adaptations. - * Check their documentations for details. - * - * The first protocol whose `isSupported` returns `true` will be used. - * If no `AdbSubprocessProtocol` is supported, an error will be thrown. - * - * @default [AdbSubprocessShellProtocol, AdbSubprocessNoneProtocol] - */ - protocols: AdbSubprocessProtocolConstructor[]; -} - -const DEFAULT_OPTIONS: AdbSubprocessOptions = { - protocols: [AdbSubprocessShellProtocol, AdbSubprocessNoneProtocol], -}; - -export interface AdbSubprocessWaitResult { - stdout: string; - stderr: string; - exitCode: number; -} - -export class AdbSubprocess { - public readonly adb: Adb; - - public constructor(adb: Adb) { - this.adb = adb; - } - - private async createProtocol( - mode: 'pty' | 'raw', - command?: string | string[], - options?: Partial - ): Promise { - const { protocols } = { ...DEFAULT_OPTIONS, ...options }; - - let Constructor: AdbSubprocessProtocolConstructor | undefined; - for (const item of protocols) { - // It's async so can't use `Array#find` - if (await item.isSupported(this.adb)) { - Constructor = item; - break; - } - } - - if (!Constructor) { - throw new Error('No specified protocol is supported by the device'); - } - - if (Array.isArray(command)) { - command = command.join(' '); - } else if (command === undefined) { - // spawn the default shell - command = ''; - } - return await Constructor[mode](this.adb, command); - } - - /** - * Spawns an executable in PTY (interactive) mode. - * @param command The command to run. If omitted, the default shell will be spawned. - * @param options The options for creating the `AdbSubprocessProtocol` - * @returns A new `AdbSubprocessProtocol` instance connecting to the spawned process. - */ - public shell( - command?: string | string[], - options?: Partial - ): Promise { - return this.createProtocol('pty', command, options); - } - - /** - * Spawns an executable and pipe the output. - * @param command The command to run, or an array of strings containing both command and args. - * @param options The options for creating the `AdbSubprocessProtocol` - * @returns A new `AdbSubprocessProtocol` instance connecting to the spawned process. - */ - public spawn( - command: string | string[], - options?: Partial - ): Promise { - return this.createProtocol('raw', command, options); - } - - /** - * Spawns a new process, waits until it exits, and returns the entire output. - * @param command The command to run - * @param options The options for creating the `AdbSubprocessProtocol` - * @returns The entire output of the command - */ - public async spawnAndWait( - command: string | string[], - options?: Partial - ): Promise { - const shell = await this.spawn(command, options); - - const stdout = new GatherStringStream(); - const stderr = new GatherStringStream(); - - const [, , exitCode] = await Promise.all([ - shell.stdout - .pipeThrough(new DecodeUtf8Stream()) - .pipeTo(stdout), - shell.stderr - .pipeThrough(new DecodeUtf8Stream()) - .pipeTo(stderr), - shell.exit - ]); - - return { - stdout: stdout.result, - stderr: stderr.result, - exitCode, - }; - } - - /** - * Spawns a new process, waits until it exits, and returns the entire output. - * @param command The command to run - * @returns The entire output of the command - */ - public async spawnAndWaitLegacy(command: string | string[]): Promise { - const { stdout } = await this.spawnAndWait( - command, - { protocols: [AdbSubprocessNoneProtocol] } - ); - return stdout; - } -} diff --git a/libraries/adb/src/commands/subprocess/protocols/none.ts b/libraries/adb/src/commands/subprocess/protocols/none.ts index ae516cc2..10f70040 100644 --- a/libraries/adb/src/commands/subprocess/protocols/none.ts +++ b/libraries/adb/src/commands/subprocess/protocols/none.ts @@ -1,7 +1,8 @@ -import type { Adb } from "../../../adb.js"; -import type { AdbSocket } from "../../../socket/index.js"; -import { DuplexStreamFactory, ReadableStream } from "../../../stream/index.js"; -import type { AdbSubprocessProtocol } from "./types.js"; +import { DuplexStreamFactory, ReadableStream } from '@yume-chan/stream-extra'; + +import type { Adb } from '../../../adb.js'; +import type { AdbSocket } from '../../../socket/index.js'; +import type { AdbSubprocessProtocol } from './types.js'; /** * The legacy shell diff --git a/libraries/adb/src/commands/subprocess/protocols/shell.ts b/libraries/adb/src/commands/subprocess/protocols/shell.ts index 7d45d351..a3f7053f 100644 --- a/libraries/adb/src/commands/subprocess/protocols/shell.ts +++ b/libraries/adb/src/commands/subprocess/protocols/shell.ts @@ -1,11 +1,12 @@ -import { PromiseResolver } from "@yume-chan/async"; -import Struct, { placeholder, type StructValueType } from "@yume-chan/struct"; -import type { Adb } from "../../../adb.js"; -import { AdbFeatures } from "../../../features.js"; -import type { AdbSocket } from "../../../socket/index.js"; -import { pipeFrom, PushReadableStream, ReadableStream, StructDeserializeStream, StructSerializeStream, TransformStream, WritableStream, WritableStreamDefaultWriter, type PushReadableStreamController } from "../../../stream/index.js"; -import { encodeUtf8 } from "../../../utils/index.js"; -import type { AdbSubprocessProtocol } from "./types.js"; +import { PromiseResolver } from '@yume-chan/async'; +import { pipeFrom, PushReadableStream, StructDeserializeStream, StructSerializeStream, TransformStream, WritableStream, type WritableStreamDefaultWriter, type PushReadableStreamController, type ReadableStream } from '@yume-chan/stream-extra'; +import Struct, { placeholder, type StructValueType } from '@yume-chan/struct'; + +import type { Adb } from '../../../adb.js'; +import { AdbFeatures } from '../../../features.js'; +import type { AdbSocket } from '../../../socket/index.js'; +import { encodeUtf8 } from '../../../utils/index.js'; +import type { AdbSubprocessProtocol } from './types.js'; export enum AdbShellProtocolId { Stdin, diff --git a/libraries/adb/src/commands/subprocess/protocols/types.ts b/libraries/adb/src/commands/subprocess/protocols/types.ts index 61e82c89..83a4a8e5 100644 --- a/libraries/adb/src/commands/subprocess/protocols/types.ts +++ b/libraries/adb/src/commands/subprocess/protocols/types.ts @@ -1,7 +1,8 @@ -import type { ValueOrPromise } from "@yume-chan/struct"; -import type { Adb } from "../../../adb.js"; -import type { AdbSocket } from "../../../socket/index.js"; -import type { ReadableStream, WritableStream } from "../../../stream/index.js"; +import type { ReadableStream, WritableStream } from '@yume-chan/stream-extra'; +import type { ValueOrPromise } from '@yume-chan/struct'; + +import type { Adb } from '../../../adb.js'; +import type { AdbSocket } from '../../../socket/index.js'; export interface AdbSubprocessProtocol { /** diff --git a/libraries/adb/src/commands/sync/index.ts b/libraries/adb/src/commands/sync/index.ts index aa90a681..0b08f95f 100644 --- a/libraries/adb/src/commands/sync/index.ts +++ b/libraries/adb/src/commands/sync/index.ts @@ -1,7 +1,7 @@ export * from './list.js'; export * from './pull.js'; +export * from './push.js'; export * from './request.js'; export * from './response.js'; -export * from './push.js'; export * from './stat.js'; export * from './sync.js'; diff --git a/libraries/adb/src/commands/sync/list.ts b/libraries/adb/src/commands/sync/list.ts index 08bb8069..3882bad3 100644 --- a/libraries/adb/src/commands/sync/list.ts +++ b/libraries/adb/src/commands/sync/list.ts @@ -1,5 +1,6 @@ +import type { BufferedStream, WritableStreamDefaultWriter } from '@yume-chan/stream-extra'; import Struct from '@yume-chan/struct'; -import type { AdbBufferedStream, WritableStreamDefaultWriter } from '../../stream/index.js'; + import { AdbSyncRequestId, adbSyncWriteRequest } from './request.js'; import { AdbSyncDoneResponse, adbSyncReadResponse, AdbSyncResponseId } from './response.js'; import { AdbSyncLstatResponse, AdbSyncStatResponse, type AdbSyncStat } from './stat.js'; @@ -37,7 +38,7 @@ const LIST_V2_RESPONSE_TYPES = { }; export async function* adbSyncOpenDir( - stream: AdbBufferedStream, + stream: BufferedStream, writer: WritableStreamDefaultWriter, path: string, v2: boolean, diff --git a/libraries/adb/src/commands/sync/pull.ts b/libraries/adb/src/commands/sync/pull.ts index 604553fc..184ddbb9 100644 --- a/libraries/adb/src/commands/sync/pull.ts +++ b/libraries/adb/src/commands/sync/pull.ts @@ -1,5 +1,6 @@ +import { BufferedStream, ReadableStream, WritableStreamDefaultWriter } from '@yume-chan/stream-extra'; import Struct from '@yume-chan/struct'; -import { AdbBufferedStream, ReadableStream, WritableStreamDefaultWriter } from '../../stream/index.js'; + import { AdbSyncRequestId, adbSyncWriteRequest } from './request.js'; import { AdbSyncDoneResponse, adbSyncReadResponse, AdbSyncResponseId } from './response.js'; @@ -15,7 +16,7 @@ const RESPONSE_TYPES = { }; export function adbSyncPull( - stream: AdbBufferedStream, + stream: BufferedStream, writer: WritableStreamDefaultWriter, path: string, ): ReadableStream { diff --git a/libraries/adb/src/commands/sync/push.ts b/libraries/adb/src/commands/sync/push.ts index c68cfad2..be720959 100644 --- a/libraries/adb/src/commands/sync/push.ts +++ b/libraries/adb/src/commands/sync/push.ts @@ -1,5 +1,6 @@ +import { BufferedStream, ChunkStream, pipeFrom, WritableStream, WritableStreamDefaultWriter } from '@yume-chan/stream-extra'; import Struct from '@yume-chan/struct'; -import { AdbBufferedStream, ChunkStream, pipeFrom, WritableStream, WritableStreamDefaultWriter } from '../../stream/index.js'; + import { AdbSyncRequestId, adbSyncWriteRequest } from './request.js'; import { adbSyncReadResponse, AdbSyncResponseId } from './response.js'; import { LinuxFileType } from './stat.js'; @@ -15,7 +16,7 @@ const ResponseTypes = { export const ADB_SYNC_MAX_PACKET_SIZE = 64 * 1024; export function adbSyncPush( - stream: AdbBufferedStream, + stream: BufferedStream, writer: WritableStreamDefaultWriter, filename: string, mode: number = (LinuxFileType.File << 12) | 0o666, diff --git a/libraries/adb/src/commands/sync/request.ts b/libraries/adb/src/commands/sync/request.ts index e835dcfe..cf850b28 100644 --- a/libraries/adb/src/commands/sync/request.ts +++ b/libraries/adb/src/commands/sync/request.ts @@ -1,6 +1,7 @@ +import type { WritableStreamDefaultWriter } from '@yume-chan/stream-extra'; import Struct from '@yume-chan/struct'; -import type { WritableStreamDefaultWriter } from "../../stream/index.js"; -import { encodeUtf8 } from "../../utils/index.js"; + +import { encodeUtf8 } from '../../utils/index.js'; export enum AdbSyncRequestId { List = 'LIST', diff --git a/libraries/adb/src/commands/sync/response.ts b/libraries/adb/src/commands/sync/response.ts index 3c9487db..38c0e113 100644 --- a/libraries/adb/src/commands/sync/response.ts +++ b/libraries/adb/src/commands/sync/response.ts @@ -1,6 +1,7 @@ +import type { BufferedStream } from '@yume-chan/stream-extra'; import Struct, { type StructAsyncDeserializeStream, type StructLike, type StructValueType } from '@yume-chan/struct'; -import type { AdbBufferedStream } from '../../stream/index.js'; -import { decodeUtf8 } from "../../utils/index.js"; + +import { decodeUtf8 } from '../../utils/index.js'; export enum AdbSyncResponseId { Entry = 'DENT', @@ -42,7 +43,7 @@ export const AdbSyncFailResponse = }); export async function adbSyncReadResponse>>( - stream: AdbBufferedStream, + stream: BufferedStream, types: T, // When `T` is a union type, `T[keyof T]` only includes their common keys. // For example, let `type T = { a: string, b: string } | { a: string, c: string}`, diff --git a/libraries/adb/src/commands/sync/stat.ts b/libraries/adb/src/commands/sync/stat.ts index 0d8c44ec..766769cb 100644 --- a/libraries/adb/src/commands/sync/stat.ts +++ b/libraries/adb/src/commands/sync/stat.ts @@ -1,5 +1,6 @@ +import type { BufferedStream, WritableStreamDefaultWriter } from '@yume-chan/stream-extra'; import Struct, { placeholder } from '@yume-chan/struct'; -import type { AdbBufferedStream, WritableStreamDefaultWriter } from '../../stream/index.js'; + import { AdbSyncRequestId, adbSyncWriteRequest } from './request.js'; import { adbSyncReadResponse, AdbSyncResponseId } from './response.js'; @@ -108,7 +109,7 @@ const LSTAT_V2_RESPONSE_TYPES = { }; export async function adbSyncLstat( - stream: AdbBufferedStream, + stream: BufferedStream, writer: WritableStreamDefaultWriter, path: string, v2: boolean, @@ -142,7 +143,7 @@ export async function adbSyncLstat( } export async function adbSyncStat( - stream: AdbBufferedStream, + stream: BufferedStream, writer: WritableStreamDefaultWriter, path: string, ): Promise { diff --git a/libraries/adb/src/commands/sync/sync.ts b/libraries/adb/src/commands/sync/sync.ts index 9f6d27f4..0415480b 100644 --- a/libraries/adb/src/commands/sync/sync.ts +++ b/libraries/adb/src/commands/sync/sync.ts @@ -1,10 +1,11 @@ import { AutoDisposable } from '@yume-chan/event'; +import { BufferedStream, ReadableStream, WrapReadableStream, WrapWritableStream, WritableStream, WritableStreamDefaultWriter } from '@yume-chan/stream-extra'; + import type { Adb } from '../../adb.js'; import { AdbFeatures } from '../../features.js'; import type { AdbSocket } from '../../socket/index.js'; -import { AdbBufferedStream, ReadableStream, WrapReadableStream, WrapWritableStream, WritableStream, WritableStreamDefaultWriter } from '../../stream/index.js'; import { AutoResetEvent } from '../../utils/index.js'; -import { escapeArg } from "../index.js"; +import { escapeArg } from '../subprocess/index.js'; import { adbSyncOpenDir, type AdbSyncEntry } from './list.js'; import { adbSyncPull } from './pull.js'; import { adbSyncPush } from './push.js'; @@ -29,7 +30,7 @@ export function dirname(path: string): string { export class AdbSync extends AutoDisposable { protected adb: Adb; - protected stream: AdbBufferedStream; + protected stream: BufferedStream; protected writer: WritableStreamDefaultWriter; @@ -56,7 +57,7 @@ export class AdbSync extends AutoDisposable { super(); this.adb = adb; - this.stream = new AdbBufferedStream(socket); + this.stream = new BufferedStream(socket.readable); this.writer = socket.writable.getWriter(); } diff --git a/libraries/adb/src/features.ts b/libraries/adb/src/features.ts index cb432f28..aad677e8 100644 --- a/libraries/adb/src/features.ts +++ b/libraries/adb/src/features.ts @@ -1,7 +1,7 @@ // The order follows // https://android.googlesource.com/platform/packages/modules/adb/+/79010dc6d5ca7490c493df800d4421730f5466ca/transport.cpp#1252 export enum AdbFeatures { - ShellV2 = "shell_v2", + ShellV2 = 'shell_v2', Cmd = 'cmd', StatV2 = 'stat_v2', ListV2 = 'ls_v2', diff --git a/libraries/adb/src/index.ts b/libraries/adb/src/index.ts index cc899f5f..ea616238 100644 --- a/libraries/adb/src/index.ts +++ b/libraries/adb/src/index.ts @@ -7,5 +7,4 @@ export * from './crypto.js'; export * from './features.js'; export * from './packet.js'; export * from './socket/index.js'; -export * from './stream/index.js'; export * from './utils/index.js'; diff --git a/libraries/adb/src/packet.ts b/libraries/adb/src/packet.ts index fc15da6b..fd44db94 100644 --- a/libraries/adb/src/packet.ts +++ b/libraries/adb/src/packet.ts @@ -1,5 +1,5 @@ +import { TransformStream } from '@yume-chan/stream-extra'; import Struct from '@yume-chan/struct'; -import { TransformStream } from "./stream/index.js"; export enum AdbCommand { Auth = 0x48545541, // 'AUTH' diff --git a/libraries/adb/src/socket/dispatcher.ts b/libraries/adb/src/socket/dispatcher.ts index 4bb8bce7..6c615b60 100644 --- a/libraries/adb/src/socket/dispatcher.ts +++ b/libraries/adb/src/socket/dispatcher.ts @@ -1,9 +1,9 @@ import { AsyncOperationManager, PromiseResolver } from '@yume-chan/async'; import type { RemoveEventListener } from '@yume-chan/event'; -import { EMPTY_UINT8_ARRAY, type ValueOrPromise } from "@yume-chan/struct"; +import { AbortController, WritableStream, WritableStreamDefaultWriter, type ReadableWritablePair } from '@yume-chan/stream-extra'; +import { EMPTY_UINT8_ARRAY, type ValueOrPromise } from '@yume-chan/struct'; import { AdbCommand, calculateChecksum, type AdbPacketData, type AdbPacketInit } from '../packet.js'; -import { AbortController, WritableStream, WritableStreamDefaultWriter, type ReadableWritablePair } from '../stream/index.js'; import { decodeUtf8, encodeUtf8 } from '../utils/index.js'; import { AdbSocket, AdbSocketController } from './socket.js'; diff --git a/libraries/adb/src/socket/index.ts b/libraries/adb/src/socket/index.ts index ecebfbfb..91dc80f3 100644 --- a/libraries/adb/src/socket/index.ts +++ b/libraries/adb/src/socket/index.ts @@ -1,2 +1,2 @@ -export * from './socket.js'; export * from './dispatcher.js'; +export * from './socket.js'; diff --git a/libraries/adb/src/socket/socket.ts b/libraries/adb/src/socket/socket.ts index d42d0d0a..79c2cce6 100644 --- a/libraries/adb/src/socket/socket.ts +++ b/libraries/adb/src/socket/socket.ts @@ -1,7 +1,8 @@ -import { PromiseResolver } from "@yume-chan/async"; -import type { Disposable } from "@yume-chan/event"; +import { PromiseResolver } from '@yume-chan/async'; +import type { Disposable } from '@yume-chan/event'; +import { ChunkStream, DuplexStreamFactory, pipeFrom, PushReadableStream, WritableStream, type PushReadableStreamController, type ReadableStream, type ReadableWritablePair } from '@yume-chan/stream-extra'; + import { AdbCommand } from '../packet.js'; -import { ChunkStream, DuplexStreamFactory, pipeFrom, PushReadableStream, WritableStream, type PushReadableStreamController, type ReadableStream, type ReadableWritablePair } from '../stream/index.js'; import type { AdbPacketDispatcher, Closeable } from './dispatcher.js'; export interface AdbSocketInfo { diff --git a/libraries/adb/src/stream/detect.ts b/libraries/adb/src/stream/detect.ts deleted file mode 100644 index ea2dabfa..00000000 --- a/libraries/adb/src/stream/detect.ts +++ /dev/null @@ -1,7 +0,0 @@ -// cspell: ignore vercel - -// Always use polyfilled version because -// Vercel doesn't support Node.js 16 (`streams/web` module) yet -export * from './detect.polyfill.js'; - -// export * from './detect.native.js'; diff --git a/libraries/adb/src/stream/index.ts b/libraries/adb/src/stream/index.ts deleted file mode 100644 index bba009b6..00000000 --- a/libraries/adb/src/stream/index.ts +++ /dev/null @@ -1,3 +0,0 @@ -export * from './buffered.js'; -export * from './detect.js'; -export * from './transform.js'; diff --git a/libraries/adb/src/stream/transform.ts b/libraries/adb/src/stream/transform.ts deleted file mode 100644 index 9229963d..00000000 --- a/libraries/adb/src/stream/transform.ts +++ /dev/null @@ -1,474 +0,0 @@ -import { PromiseResolver } from "@yume-chan/async"; -import type Struct from "@yume-chan/struct"; -import type { StructValueType, ValueOrPromise } from "@yume-chan/struct"; -import { decodeUtf8 } from "../utils/index.js"; -import { BufferedStream, BufferedStreamEndedError } from "./buffered.js"; -import { AbortController, AbortSignal, ReadableStream, ReadableStreamDefaultReader, TransformStream, WritableStream, WritableStreamDefaultWriter, type QueuingStrategy, type ReadableStreamDefaultController, type ReadableWritablePair } from "./detect.js"; - -export interface DuplexStreamFactoryOptions { - /** - * Callback when any `ReadableStream` is cancelled (the user doesn't need any more data), - * or `WritableStream` is ended (the user won't produce any more data), - * or `DuplexStreamFactory#close` is called. - * - * Usually you want to let the other peer know that the duplex stream should be clsoed. - * - * `dispose` will automatically be called after `close` completes, - * but if you want to wait another peer for a close confirmation and call - * `DuplexStreamFactory#dispose` yourself, you can return `false` - * (or a `Promise` that resolves to `false`) to disable the automatic call. - */ - close?: (() => ValueOrPromise) | undefined; - - /** - * Callback when any `ReadableStream` is closed (the other peer doesn't produce any more data), - * or `WritableStream` is aborted (the other peer can't receive any more data), - * or `DuplexStreamFactory#abort` is called. - * - * Usually indicates the other peer has closed the duplex stream. You can clean up - * any resources you have allocated now. - */ - dispose?: (() => void | Promise) | undefined; -} - -/** - * A factory for creating a duplex stream. - * - * It can create multiple `ReadableStream`s and `WritableStream`s, - * when any of them is closed, all other streams will be closed as well. - */ -export class DuplexStreamFactory { - private readableControllers: ReadableStreamDefaultController[] = []; - private writers: WritableStreamDefaultWriter[] = []; - - private _writableClosed = false; - public get writableClosed() { return this._writableClosed; } - - private _closed = new PromiseResolver(); - public get closed() { return this._closed.promise; } - - private options: DuplexStreamFactoryOptions; - - public constructor(options?: DuplexStreamFactoryOptions) { - this.options = options ?? {}; - } - - public wrapReadable(readable: ReadableStream): WrapReadableStream { - return new WrapReadableStream({ - start: (controller) => { - this.readableControllers.push(controller); - return readable; - }, - cancel: async () => { - // cancel means the local peer closes the connection first. - await this.close(); - }, - close: async () => { - // stream end means the remote peer closed the connection first. - await this.dispose(); - }, - }); - } - - public createWritable(stream: WritableStream): WritableStream { - const writer = stream.getWriter(); - this.writers.push(writer); - - // `WritableStream` has no way to tell if the remote peer has closed the connection. - // So it only triggers `close`. - return new WritableStream({ - write: async (chunk) => { - await writer.ready; - await writer.write(chunk); - }, - abort: async (reason) => { - await writer.abort(reason); - await this.close(); - }, - close: async () => { - try { await writer.close(); } catch { } - await this.close(); - }, - }); - } - - public async close() { - if (this._writableClosed) { - return; - } - this._writableClosed = true; - - // Call `close` first, so it can still write data to `WritableStream`s. - if (await this.options.close?.() !== false) { - // `close` can return `false` to disable automatic `dispose`. - await this.dispose(); - } - - for (const writer of this.writers) { - try { await writer.close(); } catch { } - } - } - - public async dispose() { - this._writableClosed = true; - this._closed.resolve(); - - for (const controller of this.readableControllers) { - try { controller.close(); } catch { } - } - - await this.options.dispose?.(); - } -} - -export class DecodeUtf8Stream extends TransformStream{ - public constructor() { - super({ - transform(chunk, controller) { - controller.enqueue(decodeUtf8(chunk)); - }, - }); - } -} - -export class GatherStringStream extends WritableStream{ - // Optimization: rope (concat strings) is faster than `[].join('')` - private _result = ''; - public get result() { return this._result; } - - public constructor() { - super({ - write: (chunk) => { - this._result += chunk; - }, - }); - } -} - -// TODO: StructTransformStream: Looking for better implementation -export class StructDeserializeStream> - implements ReadableWritablePair>{ - private _readable: ReadableStream>; - public get readable() { return this._readable; } - - private _writable: WritableStream; - public get writable() { return this._writable; } - - public constructor(struct: T) { - // Convert incoming chunks to a `BufferedStream` - let incomingStreamController!: PushReadableStreamController; - const incomingStream = new BufferedStream( - new PushReadableStream( - controller => incomingStreamController = controller, - ) - ); - - this._readable = new ReadableStream>({ - async pull(controller) { - try { - const value = await struct.deserialize(incomingStream); - controller.enqueue(value); - } catch (e) { - if (e instanceof BufferedStreamEndedError) { - controller.close(); - return; - } - throw e; - } - } - }); - - this._writable = new WritableStream({ - async write(chunk) { - await incomingStreamController.enqueue(chunk); - }, - abort() { - incomingStreamController.close(); - }, - close() { - incomingStreamController.close(); - }, - }); - } -} - -export class StructSerializeStream> - extends TransformStream{ - constructor(struct: T) { - super({ - transform(chunk, controller) { - controller.enqueue(struct.serialize(chunk)); - }, - }); - } -} - -export type WrapWritableStreamStart = () => ValueOrPromise>; - -export interface WritableStreamWrapper { - start: WrapWritableStreamStart; - close?(): Promise; -} - -async function getWrappedWritableStream( - wrapper: WritableStream | WrapWritableStreamStart | WritableStreamWrapper -) { - if ('start' in wrapper) { - return await wrapper.start(); - } else if (typeof wrapper === 'function') { - return await wrapper(); - } else { - // Can't use `wrapper instanceof WritableStream` - // Because we want to be compatible with any WritableStream-like objects - return wrapper; - } -} - -export class WrapWritableStream extends WritableStream { - public writable!: WritableStream; - - private writer!: WritableStreamDefaultWriter; - - public constructor(wrapper: WritableStream | WrapWritableStreamStart | WritableStreamWrapper) { - super({ - start: async () => { - // `start` is invoked before `ReadableStream`'s constructor finish, - // so using `this` synchronously causes - // "Must call super constructor in derived class before accessing 'this' or returning from derived constructor". - // Queue a microtask to avoid this. - await Promise.resolve(); - - this.writable = await getWrappedWritableStream(wrapper); - this.writer = this.writable.getWriter(); - }, - write: async (chunk) => { - // Maintain back pressure - await this.writer.ready; - await this.writer.write(chunk); - }, - abort: async (reason) => { - await this.writer.abort(reason); - if ('close' in wrapper) { - await wrapper.close?.(); - } - }, - close: async () => { - // Close the inner stream first. - // Usually the inner stream is a logical sub-stream over the outer stream, - // closing the outer stream first will make the inner stream incapable of - // sending data in its `close` handler. - await this.writer.close(); - if ('close' in wrapper) { - await wrapper.close?.(); - } - }, - }); - } -} - -export type WrapReadableStreamStart = (controller: ReadableStreamDefaultController) => ValueOrPromise>; - -export interface ReadableStreamWrapper { - start: WrapReadableStreamStart; - cancel?(reason?: any): ValueOrPromise; - close?(): ValueOrPromise; -} - -function getWrappedReadableStream( - wrapper: ReadableStream | WrapReadableStreamStart | ReadableStreamWrapper, - controller: ReadableStreamDefaultController -) { - if ('start' in wrapper) { - return wrapper.start(controller); - } else if (typeof wrapper === 'function') { - return wrapper(controller); - } else { - // Can't use `wrapper instanceof ReadableStream` - // Because we want to be compatible with any ReadableStream-like objects - return wrapper; - } -} - -/** - * This class has multiple usages: - * - * 1. Get notified when the stream is cancelled or closed. - * 2. Synchronously create a `ReadableStream` by asynchronously return another `ReadableStream`. - * 3. Convert native `ReadableStream`s to polyfilled ones so they can `pipe` between. - */ -export class WrapReadableStream extends ReadableStream{ - public readable!: ReadableStream; - - private reader!: ReadableStreamDefaultReader; - - public constructor(wrapper: ReadableStream | WrapReadableStreamStart | ReadableStreamWrapper) { - super({ - start: async (controller) => { - // `start` is invoked before `ReadableStream`'s constructor finish, - // so using `this` synchronously causes - // "Must call super constructor in derived class before accessing 'this' or returning from derived constructor". - // Queue a microtask to avoid this. - await Promise.resolve(); - - this.readable = await getWrappedReadableStream(wrapper, controller); - this.reader = this.readable.getReader(); - }, - cancel: async (reason) => { - await this.reader.cancel(reason); - if ('cancel' in wrapper) { - await wrapper.cancel?.(reason); - } - }, - pull: async (controller) => { - const result = await this.reader.read(); - if (result.done) { - controller.close(); - if ('close' in wrapper) { - await wrapper.close?.(); - } - } else { - controller.enqueue(result.value); - } - } - }); - } -} - -export class ChunkStream extends TransformStream{ - public constructor(size: number) { - super({ - transform(chunk, controller) { - for (let start = 0; start < chunk.byteLength;) { - const end = start + size; - controller.enqueue(chunk.subarray(start, end)); - start = end; - } - } - }); - } -} - -function* splitLines(text: string): Generator { - let start = 0; - - while (true) { - const index = text.indexOf('\n', start); - if (index === -1) { - return; - } - - const line = text.substring(start, index); - yield line; - - start = index + 1; - } -} - -export class SplitLineStream extends TransformStream { - public constructor() { - super({ - transform(chunk, controller) { - for (const line of splitLines(chunk)) { - controller.enqueue(line); - } - } - }); - } -} - -/** - * Create a new `WritableStream` that, when written to, will write that chunk to - * `pair.writable`, when pipe `pair.readable` to `writable`. - * - * It's the opposite of `ReadableStream.pipeThrough`. - * - * @param writable The `WritableStream` to write to. - * @param pair A `TransformStream` that converts chunks. - * @returns A new `WritableStream`. - */ -export function pipeFrom(writable: WritableStream, pair: ReadableWritablePair) { - const writer = pair.writable.getWriter(); - const pipe = pair.readable - .pipeTo(writable); - return new WritableStream({ - async write(chunk) { - await writer.ready; - await writer.write(chunk); - }, - async close() { - await writer.close(); - await pipe; - } - }); -} - -export class InspectStream extends TransformStream { - constructor(callback: (value: T) => void) { - super({ - transform(chunk, controller) { - callback(chunk); - controller.enqueue(chunk); - } - }); - } -} - -export interface PushReadableStreamController { - abortSignal: AbortSignal; - - enqueue(chunk: T): Promise; - - close(): void; - - error(e?: any): void; -} - -export type PushReadableStreamSource = (controller: PushReadableStreamController) => void; - -export class PushReadableStream extends ReadableStream { - public constructor(source: PushReadableStreamSource, strategy?: QueuingStrategy) { - let waterMarkLow: PromiseResolver | undefined; - const canceled: AbortController = new AbortController(); - - super({ - start: (controller) => { - source({ - abortSignal: canceled.signal, - async enqueue(chunk) { - if (canceled.signal.aborted) { - // If the stream is already cancelled, - // throw immediately. - throw canceled.signal.reason ?? new Error('Aborted'); - } - - // Only when the stream is errored, `desiredSize` will be `null`. - // But since `null <= 0` is `true` - // (`null <= 0` is evaluated as `!(null > 0)` => `!false` => `true`), - // not handling it will cause a deadlock. - if ((controller.desiredSize ?? 1) <= 0) { - waterMarkLow = new PromiseResolver(); - await waterMarkLow.promise; - } - - // `controller.enqueue` will throw error for us - // if the stream is already errored. - controller.enqueue(chunk); - }, - close() { - controller.close(); - }, - error(e) { - controller.error(e); - }, - }); - }, - pull: () => { - waterMarkLow?.resolve(); - }, - cancel: async (reason) => { - canceled.abort(reason); - waterMarkLow?.reject(reason); - }, - }, strategy); - } -} diff --git a/libraries/adb/src/utils/base64.spec.ts b/libraries/adb/src/utils/base64.spec.ts index 27645a60..46efab60 100644 --- a/libraries/adb/src/utils/base64.spec.ts +++ b/libraries/adb/src/utils/base64.spec.ts @@ -1,3 +1,5 @@ +import { describe, expect, it } from '@jest/globals'; + import { calculateBase64EncodedLength, decodeBase64, encodeBase64 } from './base64.js'; describe('base64', () => { @@ -117,7 +119,7 @@ describe('base64', () => { let inputIndex = inputOffset + input.length - 1; let outputIndex = outputOffset + correct.length - 1; - const paddingLength = correct.filter(x => x === "=".charCodeAt(0)).length; + const paddingLength = correct.filter(x => x === '='.charCodeAt(0)).length; if (paddingLength !== 0) { inputIndex -= (3 - paddingLength); outputIndex -= 4; diff --git a/libraries/android-bin/package.json b/libraries/android-bin/package.json index 99c899e7..c5d98ba1 100644 --- a/libraries/android-bin/package.json +++ b/libraries/android-bin/package.json @@ -35,6 +35,7 @@ }, "dependencies": { "@yume-chan/adb": "^0.0.16", + "@yume-chan/stream-extra": "^0.0.16", "@yume-chan/struct": "^0.0.16", "tslib": "^2.3.1" } diff --git a/libraries/android-bin/src/bug-report.ts b/libraries/android-bin/src/bug-report.ts index b4d7c78e..6ddd7519 100644 --- a/libraries/android-bin/src/bug-report.ts +++ b/libraries/android-bin/src/bug-report.ts @@ -1,7 +1,8 @@ // cspell: ignore bugreport // cspell: ignore bugreportz -import { AdbCommandBase, AdbSubprocessShellProtocol, DecodeUtf8Stream, PushReadableStream, ReadableStream, SplitLineStream, WrapReadableStream, WritableStream } from "@yume-chan/adb"; +import { AdbCommandBase, AdbSubprocessShellProtocol } from '@yume-chan/adb'; +import { DecodeUtf8Stream, PushReadableStream, ReadableStream, SplitStringStream, WrapReadableStream, WritableStream } from '@yume-chan/stream-extra'; export interface BugReportZVersion { major: number; @@ -78,7 +79,7 @@ export class BugReportZ extends AdbCommandBase { await process.stdout .pipeThrough(new DecodeUtf8Stream()) - .pipeThrough(new SplitLineStream()) + .pipeThrough(new SplitStringStream('\n')) .pipeTo(new WritableStream({ write(line) { // `BEGIN:` and `PROGRESS:` only appear when `-p` is specified. diff --git a/libraries/android-bin/src/logcat.ts b/libraries/android-bin/src/logcat.ts index d9b57c8c..0740a932 100644 --- a/libraries/android-bin/src/logcat.ts +++ b/libraries/android-bin/src/logcat.ts @@ -1,7 +1,8 @@ // cspell: ignore logcat -import { AdbCommandBase, AdbSubprocessNoneProtocol, BufferedStream, BufferedStreamEndedError, DecodeUtf8Stream, ReadableStream, SplitLineStream, WritableStream } from "@yume-chan/adb"; -import Struct, { decodeUtf8, StructAsyncDeserializeStream } from "@yume-chan/struct"; +import { AdbCommandBase, AdbSubprocessNoneProtocol } from '@yume-chan/adb'; +import { BufferedStream, BufferedStreamEndedError, DecodeUtf8Stream, ReadableStream, SplitStringStream, WritableStream } from '@yume-chan/stream-extra'; +import Struct, { decodeUtf8, StructAsyncDeserializeStream } from '@yume-chan/struct'; // `adb logcat` is an alias to `adb shell logcat` // so instead of adding to core library, it's implemented here @@ -144,7 +145,7 @@ export class Logcat extends AdbCommandBase { const result: LogSize[] = []; await stdout .pipeThrough(new DecodeUtf8Stream()) - .pipeThrough(new SplitLineStream()) + .pipeThrough(new SplitStringStream('\n')) .pipeTo(new WritableStream({ write(chunk) { let match = chunk.match(Logcat.LOG_SIZE_REGEX_11); diff --git a/libraries/scrcpy/package.json b/libraries/scrcpy/package.json index 4867a123..55f38411 100644 --- a/libraries/scrcpy/package.json +++ b/libraries/scrcpy/package.json @@ -38,6 +38,7 @@ "@yume-chan/adb": "^0.0.16", "@yume-chan/async": "^2.1.4", "@yume-chan/event": "^0.0.16", + "@yume-chan/stream-extra": "^0.0.16", "@yume-chan/struct": "^0.0.16", "tslib": "^2.3.1" }, diff --git a/libraries/scrcpy/src/adb-client.ts b/libraries/scrcpy/src/adb-client.ts index 0a3ec5ad..11baed79 100644 --- a/libraries/scrcpy/src/adb-client.ts +++ b/libraries/scrcpy/src/adb-client.ts @@ -1,37 +1,8 @@ -import { AdbCommandBase, AdbSubprocessNoneProtocol, AdbSubprocessProtocol, AdbSync, DecodeUtf8Stream, ReadableStream, TransformStream, WrapWritableStream, WritableStream } from "@yume-chan/adb"; -import { ScrcpyClient } from "./client.js"; -import { DEFAULT_SERVER_PATH, type ScrcpyOptions } from "./options/index.js"; +import { AdbCommandBase, AdbSubprocessNoneProtocol, AdbSubprocessProtocol, AdbSync } from '@yume-chan/adb'; +import { DecodeUtf8Stream, ReadableStream, SplitStringStream, WrapWritableStream, WritableStream } from '@yume-chan/stream-extra'; -function* splitLines(text: string): Generator { - let start = 0; - - while (true) { - const index = text.indexOf('\n', start); - if (index === -1) { - return; - } - - const line = text.substring(start, index); - yield line; - - start = index + 1; - } -} - -class SplitLinesStream extends TransformStream{ - constructor() { - super({ - transform(chunk, controller) { - for (const line of splitLines(chunk)) { - if (line === '') { - continue; - } - controller.enqueue(line); - } - }, - }); - } -} +import { ScrcpyClient } from './client.js'; +import { DEFAULT_SERVER_PATH, type ScrcpyOptions } from './options/index.js'; class ArrayToStream extends ReadableStream{ private array!: T[]; @@ -135,7 +106,7 @@ export class AdbScrcpyClient extends AdbCommandBase { const stdout = process.stdout .pipeThrough(new DecodeUtf8Stream()) - .pipeThrough(new SplitLinesStream()); + .pipeThrough(new SplitStringStream('\n')); // Read stdout, otherwise `process.exit` won't resolve. const output: string[] = []; diff --git a/libraries/scrcpy/src/client.ts b/libraries/scrcpy/src/client.ts index 9058a5a6..b9c647ac 100644 --- a/libraries/scrcpy/src/client.ts +++ b/libraries/scrcpy/src/client.ts @@ -1,8 +1,9 @@ -import { AbortController, BufferedStream, InspectStream, ReadableStream, ReadableWritablePair, TransformStream, type WritableStreamDefaultWriter } from '@yume-chan/adb'; import { EventEmitter } from '@yume-chan/event'; +import { AbortController, BufferedStream, InspectStream, ReadableStream, ReadableWritablePair, TransformStream, type WritableStreamDefaultWriter } from '@yume-chan/stream-extra'; import Struct from '@yume-chan/struct'; + import { AndroidMotionEventAction, ScrcpyControlMessageType, ScrcpyInjectKeyCodeControlMessage, ScrcpyInjectTextControlMessage, ScrcpyInjectTouchControlMessage, ScrcpySimpleControlMessage, type AndroidKeyEventAction } from './message.js'; -import type { ScrcpyInjectScrollControlMessage1_22, ScrcpyOptions, VideoStreamPacket } from "./options/index.js"; +import type { ScrcpyInjectScrollControlMessage1_22, ScrcpyOptions, VideoStreamPacket } from './options/index.js'; const ClipboardMessage = new Struct() diff --git a/libraries/scrcpy/src/connection.ts b/libraries/scrcpy/src/connection.ts index 0a849f7b..577356fa 100644 --- a/libraries/scrcpy/src/connection.ts +++ b/libraries/scrcpy/src/connection.ts @@ -1,7 +1,9 @@ -import type { Adb, ReadableStream, ReadableWritablePair } from "@yume-chan/adb"; -import type { Disposable } from "@yume-chan/event"; -import type { ValueOrPromise } from "@yume-chan/struct"; -import { delay } from "./utils.js"; +import type { Adb } from '@yume-chan/adb'; +import type { Disposable } from '@yume-chan/event'; +import type { ReadableStream, ReadableWritablePair } from '@yume-chan/stream-extra'; +import type { ValueOrPromise } from '@yume-chan/struct'; + +import { delay } from './utils.js'; export interface ScrcpyClientConnectionOptions { control: boolean; diff --git a/libraries/scrcpy/src/decoder/index.ts b/libraries/scrcpy/src/decoder/index.ts index 0f4029b0..3accd5a4 100644 --- a/libraries/scrcpy/src/decoder/index.ts +++ b/libraries/scrcpy/src/decoder/index.ts @@ -1,3 +1,3 @@ -export * from './types.js'; export * from './tinyh264/index.js'; +export * from './types.js'; export * from './web-codecs/index.js'; diff --git a/libraries/scrcpy/src/decoder/tinyh264/index.ts b/libraries/scrcpy/src/decoder/tinyh264/index.ts index 4bbf6292..6ba1b074 100644 --- a/libraries/scrcpy/src/decoder/tinyh264/index.ts +++ b/libraries/scrcpy/src/decoder/tinyh264/index.ts @@ -1,9 +1,10 @@ -import { WritableStream } from "@yume-chan/adb"; -import { PromiseResolver } from "@yume-chan/async"; -import { AndroidCodecLevel, AndroidCodecProfile } from "../../codec.js"; -import type { VideoStreamPacket } from "../../options/index.js"; -import type { H264Configuration, H264Decoder } from "../types.js"; -import { createTinyH264Wrapper, type TinyH264Wrapper } from "./wrapper.js"; +import { PromiseResolver } from '@yume-chan/async'; +import { WritableStream } from '@yume-chan/stream-extra'; + +import { AndroidCodecLevel, AndroidCodecProfile } from '../../codec.js'; +import type { VideoStreamPacket } from '../../options/index.js'; +import type { H264Configuration, H264Decoder } from '../types.js'; +import { createTinyH264Wrapper, type TinyH264Wrapper } from './wrapper.js'; let cachedInitializePromise: Promise<{ YuvBuffer: typeof import('yuv-buffer'), YuvCanvas: typeof import('yuv-canvas').default; }> | undefined; function initialize() { diff --git a/libraries/scrcpy/src/decoder/tinyh264/worker.ts b/libraries/scrcpy/src/decoder/tinyh264/worker.ts index b50e9507..879684ae 100644 --- a/libraries/scrcpy/src/decoder/tinyh264/worker.ts +++ b/libraries/scrcpy/src/decoder/tinyh264/worker.ts @@ -1,2 +1,3 @@ import { init } from 'tinyh264'; + init(); diff --git a/libraries/scrcpy/src/decoder/types.ts b/libraries/scrcpy/src/decoder/types.ts index 4dc0fe78..df3ca354 100644 --- a/libraries/scrcpy/src/decoder/types.ts +++ b/libraries/scrcpy/src/decoder/types.ts @@ -1,7 +1,8 @@ -import type { WritableStream } from '@yume-chan/adb'; -import type { Disposable } from "@yume-chan/event"; -import type { AndroidCodecLevel, AndroidCodecProfile } from "../codec.js"; -import type { VideoStreamPacket } from "../options/index.js"; +import type { Disposable } from '@yume-chan/event'; +import type { WritableStream } from '@yume-chan/stream-extra'; + +import type { AndroidCodecLevel, AndroidCodecProfile } from '../codec.js'; +import type { VideoStreamPacket } from '../options/index.js'; export interface H264Configuration { profileIndex: number; diff --git a/libraries/scrcpy/src/decoder/web-codecs/index.ts b/libraries/scrcpy/src/decoder/web-codecs/index.ts index eb73b304..bd574083 100644 --- a/libraries/scrcpy/src/decoder/web-codecs/index.ts +++ b/libraries/scrcpy/src/decoder/web-codecs/index.ts @@ -1,6 +1,7 @@ -import { WritableStream } from '@yume-chan/adb'; -import type { VideoStreamPacket } from "../../options/index.js"; -import type { H264Configuration, H264Decoder } from "../types.js"; +import { WritableStream } from '@yume-chan/stream-extra'; + +import type { VideoStreamPacket } from '../../options/index.js'; +import type { H264Configuration, H264Decoder } from '../types.js'; function toHex(value: number) { return value.toString(16).padStart(2, '0').toUpperCase(); diff --git a/libraries/scrcpy/src/message.ts b/libraries/scrcpy/src/message.ts index 0ff1a114..31a70ebb 100644 --- a/libraries/scrcpy/src/message.ts +++ b/libraries/scrcpy/src/message.ts @@ -2,6 +2,7 @@ import Struct, { placeholder } from '@yume-chan/struct'; // https://github.com/Genymobile/scrcpy/blob/fa5b2a29e983a46b49531def9cf3d80c40c3de37/app/src/control_msg.h#L23 // For their message bodies, see https://github.com/Genymobile/scrcpy/blob/5c62f3419d252d10cd8c9cbb7c918b358b81f2d0/app/src/control_msg.c#L92 +// Their IDs change between versions, so always use `options.getControlMessageTypes()` export enum ScrcpyControlMessageType { InjectKeycode, InjectText, diff --git a/libraries/scrcpy/src/options/1_16/index.ts b/libraries/scrcpy/src/options/1_16/index.ts index fd3afdde..737ff3ef 100644 --- a/libraries/scrcpy/src/options/1_16/index.ts +++ b/libraries/scrcpy/src/options/1_16/index.ts @@ -1,12 +1,14 @@ -import { StructDeserializeStream, TransformStream, type Adb } from "@yume-chan/adb"; -import Struct from "@yume-chan/struct"; -import type { AndroidCodecLevel, AndroidCodecProfile } from "../../codec.js"; -import { ScrcpyClientConnection, ScrcpyClientForwardConnection, ScrcpyClientReverseConnection, type ScrcpyClientConnectionOptions } from "../../connection.js"; -import { AndroidKeyEventAction, ScrcpyControlMessageType, ScrcpySimpleControlMessage } from "../../message.js"; -import type { ScrcpyBackOrScreenOnEvent1_18 } from "../1_18.js"; -import type { ScrcpyInjectScrollControlMessage1_22 } from "../1_22.js"; -import { toScrcpyOptionValue, type ScrcpyOptions, type ScrcpyOptionValue, type VideoStreamPacket } from "../common.js"; -import { parse_sequence_parameter_set } from "./sps.js"; +import type { Adb } from '@yume-chan/adb'; +import { StructDeserializeStream, TransformStream } from '@yume-chan/stream-extra'; +import Struct from '@yume-chan/struct'; + +import type { AndroidCodecLevel, AndroidCodecProfile } from '../../codec.js'; +import { ScrcpyClientConnection, ScrcpyClientForwardConnection, ScrcpyClientReverseConnection, type ScrcpyClientConnectionOptions } from '../../connection.js'; +import { AndroidKeyEventAction, ScrcpyControlMessageType, ScrcpySimpleControlMessage } from '../../message.js'; +import type { ScrcpyBackOrScreenOnEvent1_18 } from '../1_18.js'; +import type { ScrcpyInjectScrollControlMessage1_22 } from '../1_22.js'; +import { toScrcpyOptionValue, type ScrcpyOptions, type ScrcpyOptionValue, type VideoStreamPacket } from '../common.js'; +import { parse_sequence_parameter_set } from './sps.js'; export enum ScrcpyLogLevel { Verbose = 'verbose', @@ -209,7 +211,8 @@ export class ScrcpyOptions1_16()); -export type ScrcpyBackOrScreenOnEvent1_18 = typeof ScrcpyBackOrScreenOnEvent1_18["TInit"]; +export type ScrcpyBackOrScreenOnEvent1_18 = typeof ScrcpyBackOrScreenOnEvent1_18['TInit']; export class ScrcpyOptions1_18 extends ScrcpyOptions1_16 { constructor(value: Partial) { diff --git a/libraries/scrcpy/src/options/1_21.ts b/libraries/scrcpy/src/options/1_21.ts index 77b6519b..fda35a2b 100644 --- a/libraries/scrcpy/src/options/1_21.ts +++ b/libraries/scrcpy/src/options/1_21.ts @@ -1,7 +1,7 @@ // cspell: ignore autosync import { ScrcpyOptions1_18, type ScrcpyOptionsInit1_18 } from './1_18.js'; -import { toScrcpyOptionValue } from "./common.js"; +import { toScrcpyOptionValue } from './common.js'; export interface ScrcpyOptionsInit1_21 extends ScrcpyOptionsInit1_18 { clipboardAutosync?: boolean; diff --git a/libraries/scrcpy/src/options/1_22.ts b/libraries/scrcpy/src/options/1_22.ts index 17a0015e..6ab13054 100644 --- a/libraries/scrcpy/src/options/1_22.ts +++ b/libraries/scrcpy/src/options/1_22.ts @@ -1,8 +1,9 @@ -import type { Adb } from "@yume-chan/adb"; -import Struct from "@yume-chan/struct"; -import { ScrcpyClientForwardConnection, ScrcpyClientReverseConnection, type ScrcpyClientConnection } from "../connection.js"; -import { ScrcpyInjectScrollControlMessage1_16 } from "./1_16/index.js"; -import { ScrcpyOptions1_21, type ScrcpyOptionsInit1_21 } from "./1_21.js"; +import type { Adb } from '@yume-chan/adb'; +import Struct from '@yume-chan/struct'; + +import { ScrcpyClientForwardConnection, ScrcpyClientReverseConnection, type ScrcpyClientConnection } from '../connection.js'; +import { ScrcpyInjectScrollControlMessage1_16 } from './1_16/index.js'; +import { ScrcpyOptions1_21, type ScrcpyOptionsInit1_21 } from './1_21.js'; export interface ScrcpyOptionsInit1_22 extends ScrcpyOptionsInit1_21 { downsizeOnError: boolean; @@ -32,9 +33,9 @@ export interface ScrcpyOptionsInit1_22 extends ScrcpyOptionsInit1_21 { export const ScrcpyInjectScrollControlMessage1_22 = new Struct() .fields(ScrcpyInjectScrollControlMessage1_16) - .int32("buttons"); + .int32('buttons'); -export type ScrcpyInjectScrollControlMessage1_22 = typeof ScrcpyInjectScrollControlMessage1_22["TInit"]; +export type ScrcpyInjectScrollControlMessage1_22 = typeof ScrcpyInjectScrollControlMessage1_22['TInit']; export class ScrcpyOptions1_22 extends ScrcpyOptions1_21 { public constructor(init: Partial) { @@ -58,15 +59,15 @@ export class ScrcpyOptions1_22 { it('input 3 small buffers 2', () => { return runTest([3, 3, 3], [7, 2]); }); - }); }); diff --git a/libraries/adb/src/stream/buffered.ts b/libraries/stream-extra/src/buffered.ts similarity index 85% rename from libraries/adb/src/stream/buffered.ts rename to libraries/stream-extra/src/buffered.ts index b6b8dd07..96fdad8a 100644 --- a/libraries/adb/src/stream/buffered.ts +++ b/libraries/stream-extra/src/buffered.ts @@ -1,7 +1,5 @@ -import type { StructAsyncDeserializeStream } from '@yume-chan/struct'; -import type { AdbSocket, AdbSocketInfo } from '../socket/index.js'; -import type { ReadableStream, ReadableStreamDefaultReader } from './detect.js'; -import { PushReadableStream } from "./transform.js"; +import { PushReadableStream } from "./push-readable.js"; +import type { ReadableStream, ReadableStreamDefaultReader } from "./stream.js"; export class BufferedStreamEndedError extends Error { public constructor() { @@ -148,21 +146,3 @@ export class BufferedStream { await this.reader.cancel(); } } - -export class AdbBufferedStream - extends BufferedStream - implements AdbSocketInfo, StructAsyncDeserializeStream { - protected readonly socket: AdbSocket; - - public get localId() { return this.socket.localId; } - public get remoteId() { return this.socket.remoteId; } - public get localCreated() { return this.socket.localCreated; } - public get serviceString() { return this.socket.serviceString; } - - public get writable() { return this.socket.writable; } - - public constructor(socket: AdbSocket) { - super(socket.readable); - this.socket = socket; - } -} diff --git a/libraries/stream-extra/src/chunk.ts b/libraries/stream-extra/src/chunk.ts new file mode 100644 index 00000000..82094c6b --- /dev/null +++ b/libraries/stream-extra/src/chunk.ts @@ -0,0 +1,15 @@ +import { TransformStream } from "./stream.js"; + +export class ChunkStream extends TransformStream{ + public constructor(size: number) { + super({ + transform(chunk, controller) { + for (let start = 0; start < chunk.byteLength;) { + const end = start + size; + controller.enqueue(chunk.subarray(start, end)); + start = end; + } + } + }); + } +} diff --git a/libraries/stream-extra/src/decode-utf8.ts b/libraries/stream-extra/src/decode-utf8.ts new file mode 100644 index 00000000..c432c846 --- /dev/null +++ b/libraries/stream-extra/src/decode-utf8.ts @@ -0,0 +1,12 @@ +import { decodeUtf8 } from '@yume-chan/struct'; +import { TransformStream } from "./stream.js"; + +export class DecodeUtf8Stream extends TransformStream{ + public constructor() { + super({ + transform(chunk, controller) { + controller.enqueue(decodeUtf8(chunk)); + }, + }); + } +} diff --git a/libraries/adb/src/stream/transform.spec.ts b/libraries/stream-extra/src/duplex.spec.ts similarity index 70% rename from libraries/adb/src/stream/transform.spec.ts rename to libraries/stream-extra/src/duplex.spec.ts index d942b324..be2ed165 100644 --- a/libraries/adb/src/stream/transform.spec.ts +++ b/libraries/stream-extra/src/duplex.spec.ts @@ -1,5 +1,6 @@ -import { ReadableStream } from "./detect.js"; -import { DuplexStreamFactory } from './transform.js'; +import { describe, it } from '@jest/globals'; +import { DuplexStreamFactory } from "./duplex.js"; +import { ReadableStream } from "./stream.js"; describe('DuplexStreamFactory', () => { it('should close all readable', async () => { diff --git a/libraries/stream-extra/src/duplex.ts b/libraries/stream-extra/src/duplex.ts new file mode 100644 index 00000000..76a6f6e0 --- /dev/null +++ b/libraries/stream-extra/src/duplex.ts @@ -0,0 +1,120 @@ +import { PromiseResolver } from "@yume-chan/async"; +import type { ValueOrPromise } from "@yume-chan/struct"; +import { WritableStream, type ReadableStream, type ReadableStreamDefaultController, type WritableStreamDefaultWriter } from "./stream.js"; +import { WrapReadableStream } from "./wrap-readable.js"; + +export interface DuplexStreamFactoryOptions { + /** + * Callback when any `ReadableStream` is cancelled (the user doesn't need any more data), + * or `WritableStream` is ended (the user won't produce any more data), + * or `DuplexStreamFactory#close` is called. + * + * Usually you want to let the other peer know that the duplex stream should be clsoed. + * + * `dispose` will automatically be called after `close` completes, + * but if you want to wait another peer for a close confirmation and call + * `DuplexStreamFactory#dispose` yourself, you can return `false` + * (or a `Promise` that resolves to `false`) to disable the automatic call. + */ + close?: (() => ValueOrPromise) | undefined; + + /** + * Callback when any `ReadableStream` is closed (the other peer doesn't produce any more data), + * or `WritableStream` is aborted (the other peer can't receive any more data), + * or `DuplexStreamFactory#abort` is called. + * + * Usually indicates the other peer has closed the duplex stream. You can clean up + * any resources you have allocated now. + */ + dispose?: (() => void | Promise) | undefined; +} + +/** + * A factory for creating a duplex stream. + * + * It can create multiple `ReadableStream`s and `WritableStream`s, + * when any of them is closed, all other streams will be closed as well. + */ +export class DuplexStreamFactory { + private readableControllers: ReadableStreamDefaultController[] = []; + private writers: WritableStreamDefaultWriter[] = []; + + private _writableClosed = false; + public get writableClosed() { return this._writableClosed; } + + private _closed = new PromiseResolver(); + public get closed() { return this._closed.promise; } + + private options: DuplexStreamFactoryOptions; + + public constructor(options?: DuplexStreamFactoryOptions) { + this.options = options ?? {}; + } + + public wrapReadable(readable: ReadableStream): WrapReadableStream { + return new WrapReadableStream({ + start: (controller) => { + this.readableControllers.push(controller); + return readable; + }, + cancel: async () => { + // cancel means the local peer closes the connection first. + await this.close(); + }, + close: async () => { + // stream end means the remote peer closed the connection first. + await this.dispose(); + }, + }); + } + + public createWritable(stream: WritableStream): WritableStream { + const writer = stream.getWriter(); + this.writers.push(writer); + + // `WritableStream` has no way to tell if the remote peer has closed the connection. + // So it only triggers `close`. + return new WritableStream({ + write: async (chunk) => { + await writer.ready; + await writer.write(chunk); + }, + abort: async (reason) => { + await writer.abort(reason); + await this.close(); + }, + close: async () => { + try { await writer.close(); } catch { } + await this.close(); + }, + }); + } + + public async close() { + if (this._writableClosed) { + return; + } + this._writableClosed = true; + + // Call `close` first, so it can still write data to `WritableStream`s. + if (await this.options.close?.() !== false) { + // `close` can return `false` to disable automatic `dispose`. + await this.dispose(); + } + + for (const writer of this.writers) { + try { await writer.close(); } catch { } + } + } + + public async dispose() { + this._writableClosed = true; + this._closed.resolve(); + + for (const controller of this.readableControllers) { + try { controller.close(); } catch { } + } + + await this.options.dispose?.(); + } +} diff --git a/libraries/stream-extra/src/gather-string.ts b/libraries/stream-extra/src/gather-string.ts new file mode 100644 index 00000000..db25f9bf --- /dev/null +++ b/libraries/stream-extra/src/gather-string.ts @@ -0,0 +1,15 @@ +import { WritableStream } from "./stream.js"; + +export class GatherStringStream extends WritableStream{ + // PERF: rope (concat strings) is faster than `[].join('')` + private _result = ''; + public get result() { return this._result; } + + public constructor() { + super({ + write: (chunk) => { + this._result += chunk; + }, + }); + } +} diff --git a/libraries/stream-extra/src/index.ts b/libraries/stream-extra/src/index.ts new file mode 100644 index 00000000..d27a3d38 --- /dev/null +++ b/libraries/stream-extra/src/index.ts @@ -0,0 +1,14 @@ +export * from './buffered.js'; +export * from './chunk.js'; +export * from './decode-utf8.js'; +export * from './duplex.js'; +export * from './gather-string.js'; +export * from './inspect.js'; +export * from './pipe-from.js'; +export * from './push-readable.js'; +export * from './split-string.js'; +export * from './stream.js'; +export * from './struct-deserialize.js'; +export * from './struct-serialize.js'; +export * from './wrap-readable.js'; +export * from './wrap-writable.js'; diff --git a/libraries/stream-extra/src/inspect.ts b/libraries/stream-extra/src/inspect.ts new file mode 100644 index 00000000..d7aa94e0 --- /dev/null +++ b/libraries/stream-extra/src/inspect.ts @@ -0,0 +1,12 @@ +import { TransformStream } from "./stream.js"; + +export class InspectStream extends TransformStream { + constructor(callback: (value: T) => void) { + super({ + transform(chunk, controller) { + callback(chunk); + controller.enqueue(chunk); + } + }); + } +} diff --git a/libraries/adb/src/stream/detect.native.ts b/libraries/stream-extra/src/native.ts similarity index 100% rename from libraries/adb/src/stream/detect.native.ts rename to libraries/stream-extra/src/native.ts diff --git a/libraries/stream-extra/src/pipe-from.ts b/libraries/stream-extra/src/pipe-from.ts new file mode 100644 index 00000000..9769ad91 --- /dev/null +++ b/libraries/stream-extra/src/pipe-from.ts @@ -0,0 +1,27 @@ +import { WritableStream, type ReadableWritablePair } from "./stream.js"; + +/** + * Create a new `WritableStream` that, when written to, will write that chunk to + * `pair.writable`, when pipe `pair.readable` to `writable`. + * + * It's the opposite of `ReadableStream.pipeThrough`. + * + * @param writable The `WritableStream` to write to. + * @param pair A `TransformStream` that converts chunks. + * @returns A new `WritableStream`. + */ +export function pipeFrom(writable: WritableStream, pair: ReadableWritablePair) { + const writer = pair.writable.getWriter(); + const pipe = pair.readable + .pipeTo(writable); + return new WritableStream({ + async write(chunk) { + await writer.ready; + await writer.write(chunk); + }, + async close() { + await writer.close(); + await pipe; + } + }); +} diff --git a/libraries/stream-extra/src/push-readable.ts b/libraries/stream-extra/src/push-readable.ts new file mode 100644 index 00000000..0592c63c --- /dev/null +++ b/libraries/stream-extra/src/push-readable.ts @@ -0,0 +1,62 @@ +import { PromiseResolver } from '@yume-chan/async'; +import { AbortController, AbortSignal, QueuingStrategy, ReadableStream } from "./stream.js"; + +export interface PushReadableStreamController { + abortSignal: AbortSignal; + + enqueue(chunk: T): Promise; + + close(): void; + + error(e?: any): void; +} + +export type PushReadableStreamSource = (controller: PushReadableStreamController) => void; + +export class PushReadableStream extends ReadableStream { + public constructor(source: PushReadableStreamSource, strategy?: QueuingStrategy) { + let waterMarkLow: PromiseResolver | undefined; + const canceled: AbortController = new AbortController(); + + super({ + start: (controller) => { + source({ + abortSignal: canceled.signal, + async enqueue(chunk) { + if (canceled.signal.aborted) { + // If the stream is already cancelled, + // throw immediately. + throw canceled.signal.reason ?? new Error('Aborted'); + } + + // Only when the stream is errored, `desiredSize` will be `null`. + // But since `null <= 0` is `true` + // (`null <= 0` is evaluated as `!(null > 0)` => `!false` => `true`), + // not handling it will cause a deadlock. + if ((controller.desiredSize ?? 1) <= 0) { + waterMarkLow = new PromiseResolver(); + await waterMarkLow.promise; + } + + // `controller.enqueue` will throw error for us + // if the stream is already errored. + controller.enqueue(chunk); + }, + close() { + controller.close(); + }, + error(e) { + controller.error(e); + }, + }); + }, + pull: () => { + waterMarkLow?.resolve(); + }, + cancel: async (reason) => { + canceled.abort(reason); + waterMarkLow?.reject(reason); + }, + }, strategy); + } +} diff --git a/libraries/stream-extra/src/split-string.ts b/libraries/stream-extra/src/split-string.ts new file mode 100644 index 00000000..e04e6013 --- /dev/null +++ b/libraries/stream-extra/src/split-string.ts @@ -0,0 +1,29 @@ +import { TransformStream } from "./stream.js"; + +function* split(input: string, separator: string): Generator { + let start = 0; + + while (true) { + const index = input.indexOf(separator, start); + if (index === -1) { + return; + } + + const part = input.substring(start, index); + yield part; + + start = index + 1; + } +} + +export class SplitStringStream extends TransformStream { + public constructor(separator: string) { + super({ + transform(chunk, controller) { + for (const part of split(chunk, separator)) { + controller.enqueue(part); + } + } + }); + } +} diff --git a/libraries/adb/src/stream/detect.polyfill.ts b/libraries/stream-extra/src/stream.ts similarity index 100% rename from libraries/adb/src/stream/detect.polyfill.ts rename to libraries/stream-extra/src/stream.ts diff --git a/libraries/stream-extra/src/struct-deserialize.ts b/libraries/stream-extra/src/struct-deserialize.ts new file mode 100644 index 00000000..d797196c --- /dev/null +++ b/libraries/stream-extra/src/struct-deserialize.ts @@ -0,0 +1,52 @@ +import type Struct from "@yume-chan/struct"; +import type { StructValueType } from "@yume-chan/struct"; +import { BufferedStream, BufferedStreamEndedError } from "./buffered.js"; +import { PushReadableStream, PushReadableStreamController } from "./push-readable.js"; +import { ReadableStream, WritableStream, type ReadableWritablePair } from "./stream.js"; + +// TODO: StructTransformStream: Looking for better implementation +export class StructDeserializeStream> + implements ReadableWritablePair>{ + private _readable: ReadableStream>; + public get readable() { return this._readable; } + + private _writable: WritableStream; + public get writable() { return this._writable; } + + public constructor(struct: T) { + // Convert incoming chunks to a `BufferedStream` + let incomingStreamController!: PushReadableStreamController; + const incomingStream = new BufferedStream( + new PushReadableStream( + controller => incomingStreamController = controller, + ) + ); + + this._readable = new ReadableStream>({ + async pull(controller) { + try { + const value = await struct.deserialize(incomingStream); + controller.enqueue(value); + } catch (e) { + if (e instanceof BufferedStreamEndedError) { + controller.close(); + return; + } + throw e; + } + } + }); + + this._writable = new WritableStream({ + async write(chunk) { + await incomingStreamController.enqueue(chunk); + }, + abort() { + incomingStreamController.close(); + }, + close() { + incomingStreamController.close(); + }, + }); + } +} diff --git a/libraries/stream-extra/src/struct-serialize.ts b/libraries/stream-extra/src/struct-serialize.ts new file mode 100644 index 00000000..8c3948a0 --- /dev/null +++ b/libraries/stream-extra/src/struct-serialize.ts @@ -0,0 +1,13 @@ +import type Struct from "@yume-chan/struct"; +import { TransformStream } from "./stream.js"; + +export class StructSerializeStream> + extends TransformStream{ + constructor(struct: T) { + super({ + transform(chunk, controller) { + controller.enqueue(struct.serialize(chunk)); + }, + }); + } +} diff --git a/libraries/stream-extra/src/wrap-readable.ts b/libraries/stream-extra/src/wrap-readable.ts new file mode 100644 index 00000000..9db6c3ea --- /dev/null +++ b/libraries/stream-extra/src/wrap-readable.ts @@ -0,0 +1,70 @@ +import type { ValueOrPromise } from "@yume-chan/struct"; +import { ReadableStream, ReadableStreamDefaultController, ReadableStreamDefaultReader } from "./stream.js"; + +export type WrapReadableStreamStart = (controller: ReadableStreamDefaultController) => ValueOrPromise>; + +export interface ReadableStreamWrapper { + start: WrapReadableStreamStart; + cancel?(reason?: any): ValueOrPromise; + close?(): ValueOrPromise; +} + +function getWrappedReadableStream( + wrapper: ReadableStream | WrapReadableStreamStart | ReadableStreamWrapper, + controller: ReadableStreamDefaultController +) { + if ('start' in wrapper) { + return wrapper.start(controller); + } else if (typeof wrapper === 'function') { + return wrapper(controller); + } else { + // Can't use `wrapper instanceof ReadableStream` + // Because we want to be compatible with any ReadableStream-like objects + return wrapper; + } +} + +/** + * This class has multiple usages: + * + * 1. Get notified when the stream is cancelled or closed. + * 2. Synchronously create a `ReadableStream` by asynchronously return another `ReadableStream`. + * 3. Convert native `ReadableStream`s to polyfilled ones so they can `pipe` between. + */ +export class WrapReadableStream extends ReadableStream{ + public readable!: ReadableStream; + + private reader!: ReadableStreamDefaultReader; + + public constructor(wrapper: ReadableStream | WrapReadableStreamStart | ReadableStreamWrapper) { + super({ + start: async (controller) => { + // `start` is invoked before `ReadableStream`'s constructor finish, + // so using `this` synchronously causes + // "Must call super constructor in derived class before accessing 'this' or returning from derived constructor". + // Queue a microtask to avoid this. + await Promise.resolve(); + + this.readable = await getWrappedReadableStream(wrapper, controller); + this.reader = this.readable.getReader(); + }, + cancel: async (reason) => { + await this.reader.cancel(reason); + if ('cancel' in wrapper) { + await wrapper.cancel?.(reason); + } + }, + pull: async (controller) => { + const result = await this.reader.read(); + if (result.done) { + controller.close(); + if ('close' in wrapper) { + await wrapper.close?.(); + } + } else { + controller.enqueue(result.value); + } + } + }); + } +} diff --git a/libraries/stream-extra/src/wrap-writable.ts b/libraries/stream-extra/src/wrap-writable.ts new file mode 100644 index 00000000..f89b6a21 --- /dev/null +++ b/libraries/stream-extra/src/wrap-writable.ts @@ -0,0 +1,65 @@ +import type { ValueOrPromise } from "@yume-chan/struct"; +import { WritableStream, WritableStreamDefaultWriter } from "./stream.js"; + +export type WrapWritableStreamStart = () => ValueOrPromise>; + +export interface WritableStreamWrapper { + start: WrapWritableStreamStart; + close?(): Promise; +} + +async function getWrappedWritableStream( + wrapper: WritableStream | WrapWritableStreamStart | WritableStreamWrapper +) { + if ('start' in wrapper) { + return await wrapper.start(); + } else if (typeof wrapper === 'function') { + return await wrapper(); + } else { + // Can't use `wrapper instanceof WritableStream` + // Because we want to be compatible with any WritableStream-like objects + return wrapper; + } +} + +export class WrapWritableStream extends WritableStream { + public writable!: WritableStream; + + private writer!: WritableStreamDefaultWriter; + + public constructor(wrapper: WritableStream | WrapWritableStreamStart | WritableStreamWrapper) { + super({ + start: async () => { + // `start` is invoked before `ReadableStream`'s constructor finish, + // so using `this` synchronously causes + // "Must call super constructor in derived class before accessing 'this' or returning from derived constructor". + // Queue a microtask to avoid this. + await Promise.resolve(); + + this.writable = await getWrappedWritableStream(wrapper); + this.writer = this.writable.getWriter(); + }, + write: async (chunk) => { + // Maintain back pressure + await this.writer.ready; + await this.writer.write(chunk); + }, + abort: async (reason) => { + await this.writer.abort(reason); + if ('close' in wrapper) { + await wrapper.close?.(); + } + }, + close: async () => { + // Close the inner stream first. + // Usually the inner stream is a logical sub-stream over the outer stream, + // closing the outer stream first will make the inner stream incapable of + // sending data in its `close` handler. + await this.writer.close(); + if ('close' in wrapper) { + await wrapper.close?.(); + } + }, + }); + } +} diff --git a/libraries/stream-extra/tsconfig.build.json b/libraries/stream-extra/tsconfig.build.json new file mode 100644 index 00000000..b4bb73dd --- /dev/null +++ b/libraries/stream-extra/tsconfig.build.json @@ -0,0 +1,3 @@ +{ + "extends": "./node_modules/@yume-chan/ts-package-builder/tsconfig.base.json" +} diff --git a/libraries/stream-extra/tsconfig.json b/libraries/stream-extra/tsconfig.json new file mode 100644 index 00000000..85fc5a7a --- /dev/null +++ b/libraries/stream-extra/tsconfig.json @@ -0,0 +1,10 @@ +{ + "references": [ + { + "path": "./tsconfig.test.json" + }, + { + "path": "./tsconfig.build.json" + }, + ] +} diff --git a/libraries/stream-extra/tsconfig.test.json b/libraries/stream-extra/tsconfig.test.json new file mode 100644 index 00000000..e987f757 --- /dev/null +++ b/libraries/stream-extra/tsconfig.test.json @@ -0,0 +1,8 @@ +{ + "extends": "./tsconfig.build.json", + "compilerOptions": { + "types": [ + ], + }, + "exclude": [] +} diff --git a/rush.json b/rush.json index bbebdb40..3aa883ab 100644 --- a/rush.json +++ b/rush.json @@ -489,6 +489,11 @@ "projectFolder": "libraries/scrcpy", "versionPolicyName": "adb" }, + { + "packageName": "@yume-chan/stream-extra", + "projectFolder": "libraries/stream-extra", + "versionPolicyName": "adb" + }, { "packageName": "demo", "projectFolder": "apps/demo"