From 8a521c8d93359c68b7adf17f4e08bce8720df94c Mon Sep 17 00:00:00 2001 From: Simon Chan <1330321+yume-chan@users.noreply.github.com> Date: Sun, 3 Apr 2022 12:49:38 +0800 Subject: [PATCH] refactor(adb): let backends deserialize packets by themselves for better optimization --- apps/demo/components/connect.tsx | 14 ++----- .../adb-backend-direct-sockets/src/index.ts | 9 ++-- libraries/adb-backend-webusb/package.json | 1 + libraries/adb-backend-webusb/src/backend.ts | 42 +++++++++++-------- libraries/adb-backend-webusb/tsconfig.json | 3 ++ libraries/adb-backend-ws/src/index.ts | 7 +++- libraries/adb/src/adb.ts | 22 ++-------- libraries/adb/src/backend.ts | 3 +- libraries/adb/src/commands/reverse.ts | 4 +- libraries/adb/src/socket/dispatcher.ts | 12 +++--- libraries/adb/src/stream/transform.ts | 14 +++---- 11 files changed, 61 insertions(+), 70 deletions(-) diff --git a/apps/demo/components/connect.tsx b/apps/demo/components/connect.tsx index 3764a502..5af8c034 100644 --- a/apps/demo/components/connect.tsx +++ b/apps/demo/components/connect.tsx @@ -158,25 +158,17 @@ function _Connect(): JSX.Element | null { try { setConnecting(true); - const dataStreamPair = await selectedBackend.connect(); - - const packetStreamPair = Adb.createConnection({ - readable: dataStreamPair.readable - .pipeThrough(new InspectStream(chunk => { - byteInAcc.current += chunk.byteLength; - })), - writable: dataStreamPair.writable, - }); + const streams = await selectedBackend.connect(); // Use `TransformStream` to intercept packets and log them - const readable = packetStreamPair.readable + const readable = streams.readable .pipeThrough( new InspectStream(packet => { globalState.appendLog('Incoming', packet); }) ); const writable = pipeFrom( - packetStreamPair.writable, + streams.writable, new InspectStream(packet => { globalState.appendLog('Outgoing', packet); }) diff --git a/libraries/adb-backend-direct-sockets/src/index.ts b/libraries/adb-backend-direct-sockets/src/index.ts index c0873a9c..0cfa251f 100644 --- a/libraries/adb-backend-direct-sockets/src/index.ts +++ b/libraries/adb-backend-direct-sockets/src/index.ts @@ -1,4 +1,4 @@ -import { AdbBackend, ReadableStream, WrapReadableStream, WrapWritableStream, WritableStream } from '@yume-chan/adb'; +import { AdbBackend, AdbPacket, AdbPacketSerializeStream, pipeFrom, ReadableStream, StructDeserializeStream, WrapReadableStream, WrapWritableStream, WritableStream } from '@yume-chan/adb'; declare global { interface TCPSocket { @@ -55,6 +55,7 @@ export default class AdbDirectSocketsBackend implements AdbBackend { remotePort: this.port, noDelay: true, }); + // Native streams can't `pipeTo()` or `pipeThrough()` polyfilled streams, so we need to wrap them return { readable: new WrapReadableStream, void>({ @@ -64,15 +65,15 @@ export default class AdbDirectSocketsBackend implements AdbBackend { state: undefined, }; } - }), - writable: new WrapWritableStream({ + }).pipeThrough(new StructDeserializeStream(AdbPacket)), + writable: pipeFrom(new WrapWritableStream({ async start() { return { writable, state: undefined, }; } - }), + }), new AdbPacketSerializeStream()), }; } } diff --git a/libraries/adb-backend-webusb/package.json b/libraries/adb-backend-webusb/package.json index c301b1e1..34c18208 100644 --- a/libraries/adb-backend-webusb/package.json +++ b/libraries/adb-backend-webusb/package.json @@ -33,6 +33,7 @@ "dependencies": { "@types/w3c-web-usb": "^1.0.4", "@yume-chan/adb": "^0.0.10", + "@yume-chan/struct": "^0.0.10", "tslib": "^2.3.1" }, "devDependencies": { diff --git a/libraries/adb-backend-webusb/src/backend.ts b/libraries/adb-backend-webusb/src/backend.ts index d7bff01f..ed9fb4d8 100644 --- a/libraries/adb-backend-webusb/src/backend.ts +++ b/libraries/adb-backend-webusb/src/backend.ts @@ -1,4 +1,5 @@ -import { DuplexStreamFactory, type AdbBackend, type ReadableStream, type ReadableWritablePair, type WritableStream } from '@yume-chan/adb'; +import { AdbPacket, AdbPacketSerializeStream, DuplexStreamFactory, pipeFrom, ReadableStream, type AdbBackend, type AdbPacketCore, type AdbPacketInit, type ReadableWritablePair, type WritableStream } from '@yume-chan/adb'; +import type { StructAsyncDeserializeStream } from "@yume-chan/struct"; export const WebUsbDeviceFilter: USBDeviceFilter = { classCode: 0xFF, @@ -6,15 +7,15 @@ export const WebUsbDeviceFilter: USBDeviceFilter = { protocolCode: 1, }; -export class AdbWebUsbBackendStream implements ReadableWritablePair{ - private _readable: ReadableStream; +export class AdbWebUsbBackendStream implements ReadableWritablePair{ + private _readable: ReadableStream; public get readable() { return this._readable; } - private _writable: WritableStream; + private _writable: WritableStream; public get writable() { return this._writable; } public constructor(device: USBDevice, inEndpoint: USBEndpoint, outEndpoint: USBEndpoint) { - const factory = new DuplexStreamFactory({ + const factory = new DuplexStreamFactory({ close: async () => { navigator.usb.removeEventListener('disconnect', handleUsbDisconnect); try { @@ -33,9 +34,13 @@ export class AdbWebUsbBackendStream implements ReadableWritablePair { - const result = await device.transferIn(inEndpoint.endpointNumber, inEndpoint.packetSize); + const incomingStream: StructAsyncDeserializeStream = { + async read(length) { + // `ReadableStream` don't know how many bytes the consumer need in each `pull`, + // But `transferIn(endpointNumber, packetSize)` is much slower than `transferIn(endpointNumber, length)` + // So `AdbBackend` is refactored to use `ReadableStream` directly, + // (let each backend deserialize the packets in their own way) + const result = await device.transferIn(inEndpoint.endpointNumber, length); // `USBTransferResult` has three states: "ok", "stall" and "babble", // but ADBd on Android won't enter "stall" (halt) state, @@ -44,22 +49,25 @@ export class AdbWebUsbBackendStream implements ReadableWritablePair({ + async pull(controller) { + const value = await AdbPacket.deserialize(incomingStream); + controller.enqueue(value); + }, + })); + + this._writable = pipeFrom(factory.createWritable({ write: async (chunk) => { await device.transferOut(outEndpoint.endpointNumber, chunk); }, }, { highWaterMark: 16 * 1024, size(chunk) { return chunk.byteLength; }, - }); + }), new AdbPacketSerializeStream()); } } diff --git a/libraries/adb-backend-webusb/tsconfig.json b/libraries/adb-backend-webusb/tsconfig.json index a2965de0..61c34fb9 100644 --- a/libraries/adb-backend-webusb/tsconfig.json +++ b/libraries/adb-backend-webusb/tsconfig.json @@ -12,6 +12,9 @@ "references": [ { "path": "../adb/tsconfig.build.json" + }, + { + "path": "../struct/tsconfig.json" } ] } diff --git a/libraries/adb-backend-ws/src/index.ts b/libraries/adb-backend-ws/src/index.ts index 27b7c045..fadf47bb 100644 --- a/libraries/adb-backend-ws/src/index.ts +++ b/libraries/adb-backend-ws/src/index.ts @@ -1,4 +1,4 @@ -import { DuplexStreamFactory, type AdbBackend } from '@yume-chan/adb'; +import { AdbPacket, AdbPacketSerializeStream, DuplexStreamFactory, pipeFrom, StructDeserializeStream, type AdbBackend } from '@yume-chan/adb'; export default class AdbWsBackend implements AdbBackend { public readonly serial: string; @@ -51,6 +51,9 @@ export default class AdbWsBackend implements AdbBackend { size(chunk) { return chunk.byteLength; }, }); - return { readable, writable }; + return { + readable: readable.pipeThrough(new StructDeserializeStream(AdbPacket)), + writable: pipeFrom(writable, new AdbPacketSerializeStream()), + }; } } diff --git a/libraries/adb/src/adb.ts b/libraries/adb/src/adb.ts index ebd83ce7..3bf1bbf8 100644 --- a/libraries/adb/src/adb.ts +++ b/libraries/adb/src/adb.ts @@ -2,9 +2,9 @@ import { PromiseResolver } from '@yume-chan/async'; import { AdbAuthenticationHandler, AdbDefaultAuthenticators, type AdbCredentialStore } from './auth.js'; import { AdbPower, AdbReverseCommand, AdbSubprocess, AdbSync, AdbTcpIpCommand, escapeArg, framebuffer, install, type AdbFrameBuffer } from './commands/index.js'; import { AdbFeatures } from './features.js'; -import { AdbCommand, AdbPacket, AdbPacketSerializeStream, calculateChecksum, type AdbPacketCore, type AdbPacketInit } from './packet.js'; +import { AdbCommand, AdbPacket, calculateChecksum, type AdbPacketCore, type AdbPacketInit } from './packet.js'; import { AdbPacketDispatcher, AdbSocket } from './socket/index.js'; -import { AbortController, DecodeUtf8Stream, GatherStringStream, pipeFrom, StructDeserializeStream, WritableStream, type ReadableWritablePair } from "./stream/index.js"; +import { AbortController, DecodeUtf8Stream, GatherStringStream, WritableStream, type ReadableWritablePair } from "./stream/index.js"; import { decodeUtf8, encodeUtf8 } from "./utils/index.js"; export enum AdbPropKey { @@ -17,27 +17,13 @@ export enum AdbPropKey { export const VERSION_OMIT_CHECKSUM = 0x01000001; export class Adb { - public static createConnection( - connection: ReadableWritablePair - ): ReadableWritablePair { - return { - readable: connection.readable.pipeThrough( - new StructDeserializeStream(AdbPacket) - ), - writable: pipeFrom( - connection.writable, - new AdbPacketSerializeStream() - ), - }; - } - /** * It's possible to call `authenticate` multiple times on a single connection, * every time the device receives a `CNXN` packet it will reset its internal state, * and begin authentication again. */ public static async authenticate( - connection: ReadableWritablePair, + connection: ReadableWritablePair, credentialStore: AdbCredentialStore, authenticators = AdbDefaultAuthenticators, ) { @@ -157,7 +143,7 @@ export class Adb { public readonly tcpip: AdbTcpIpCommand; public constructor( - connection: ReadableWritablePair, + connection: ReadableWritablePair, version: number, maxPayloadSize: number, banner: string, diff --git a/libraries/adb/src/backend.ts b/libraries/adb/src/backend.ts index 0700747e..ca988ce8 100644 --- a/libraries/adb/src/backend.ts +++ b/libraries/adb/src/backend.ts @@ -1,4 +1,5 @@ import type { ValueOrPromise } from '@yume-chan/struct'; +import type { AdbPacketCore, AdbPacketInit } from "./packet.js"; import type { ReadableWritablePair } from "./stream/index.js"; export interface AdbBackend { @@ -6,5 +7,5 @@ export interface AdbBackend { readonly name: string | undefined; - connect(): ValueOrPromise>; + connect(): ValueOrPromise>; } diff --git a/libraries/adb/src/commands/reverse.ts b/libraries/adb/src/commands/reverse.ts index d299efdb..401cbc78 100644 --- a/libraries/adb/src/commands/reverse.ts +++ b/libraries/adb/src/commands/reverse.ts @@ -2,13 +2,13 @@ import { AutoDisposable } from '@yume-chan/event'; import Struct from '@yume-chan/struct'; -import type { AdbPacket } from '../packet.js'; +import type { AdbPacketCore } from '../packet.js'; import type { AdbIncomingSocketEventArgs, AdbPacketDispatcher, AdbSocket } from '../socket/index.js'; import { AdbBufferedStream } from '../stream/index.js'; import { decodeUtf8 } from "../utils/index.js"; export interface AdbReverseHandler { - onSocket(packet: AdbPacket, socket: AdbSocket): void; + onSocket(packet: AdbPacketCore, socket: AdbSocket): void; } export interface AdbForwardListener { diff --git a/libraries/adb/src/socket/dispatcher.ts b/libraries/adb/src/socket/dispatcher.ts index 4bdd3d69..33dbc73d 100644 --- a/libraries/adb/src/socket/dispatcher.ts +++ b/libraries/adb/src/socket/dispatcher.ts @@ -1,6 +1,6 @@ import { AsyncOperationManager, PromiseResolver } from '@yume-chan/async'; import { AutoDisposable, EventEmitter } from '@yume-chan/event'; -import { AdbCommand, AdbPacket, calculateChecksum, type AdbPacketCore, type AdbPacketInit } from '../packet.js'; +import { AdbCommand, calculateChecksum, type AdbPacketCore, type AdbPacketInit } from '../packet.js'; import { AbortController, WritableStream, WritableStreamDefaultWriter, type ReadableWritablePair } from '../stream/index.js'; import { decodeUtf8, encodeUtf8 } from '../utils/index.js'; import { AdbSocket } from './socket.js'; @@ -8,7 +8,7 @@ import { AdbSocket } from './socket.js'; export interface AdbIncomingSocketEventArgs { handled: boolean; - packet: AdbPacket; + packet: AdbPacketCore; serviceString: string; @@ -41,7 +41,7 @@ export class AdbPacketDispatcher extends AutoDisposable { private _abortController = new AbortController(); public constructor( - connection: ReadableWritablePair, + connection: ReadableWritablePair, ) { super(); @@ -91,7 +91,7 @@ export class AdbPacketDispatcher extends AutoDisposable { this._writer = connection.writable.getWriter(); } - private handleOk(packet: AdbPacket) { + private handleOk(packet: AdbPacketCore) { if (this.initializers.resolve(packet.arg1, packet.arg0)) { // Device successfully created the socket return; @@ -109,7 +109,7 @@ export class AdbPacketDispatcher extends AutoDisposable { this.sendPacket(AdbCommand.Close, packet.arg1, packet.arg0); } - private async handleClose(packet: AdbPacket) { + private async handleClose(packet: AdbPacketCore) { // From https://android.googlesource.com/platform/packages/modules/adb/+/65d18e2c1cc48b585811954892311b28a4c3d188/adb.cpp#459 /* According to protocol.txt, p->msg.arg0 might be 0 to indicate * a failed OPEN only. However, due to a bug in previous ADB @@ -139,7 +139,7 @@ export class AdbPacketDispatcher extends AutoDisposable { // Just ignore it } - private async handleOpen(packet: AdbPacket) { + private async handleOpen(packet: AdbPacketCore) { // AsyncOperationManager doesn't support get and skip an ID // Use `add` + `resolve` to simulate this behavior const [localId] = this.initializers.add(); diff --git a/libraries/adb/src/stream/transform.ts b/libraries/adb/src/stream/transform.ts index 00366e7d..da153f66 100644 --- a/libraries/adb/src/stream/transform.ts +++ b/libraries/adb/src/stream/transform.ts @@ -179,15 +179,11 @@ export class StructDeserializeStream> ) ); - this._readable = new PushReadableStream>( - async controller => { + this._readable = new ReadableStream>({ + async pull(controller) { try { - // Unless we make `deserialize` be capable of pausing/resuming, - // We always need at least one pull loop - while (true) { - const value = await struct.deserialize(incomingStream); - await controller.enqueue(value); - } + const value = await struct.deserialize(incomingStream); + controller.enqueue(value); } catch (e) { if (e instanceof BufferedStreamEndedError) { controller.close(); @@ -196,7 +192,7 @@ export class StructDeserializeStream> throw e; } } - ); + }); this._writable = new WritableStream({ async write(chunk) {