refactor(adb): let backends deserialize packets by themselves for better optimization

This commit is contained in:
Simon Chan 2022-04-03 12:49:38 +08:00
parent 38a76a2e0c
commit 8a521c8d93
No known key found for this signature in database
GPG key ID: A8B69F750B9BCEDD
11 changed files with 61 additions and 70 deletions

View file

@ -158,25 +158,17 @@ function _Connect(): JSX.Element | null {
try {
setConnecting(true);
const dataStreamPair = await selectedBackend.connect();
const packetStreamPair = Adb.createConnection({
readable: dataStreamPair.readable
.pipeThrough(new InspectStream(chunk => {
byteInAcc.current += chunk.byteLength;
})),
writable: dataStreamPair.writable,
});
const streams = await selectedBackend.connect();
// Use `TransformStream` to intercept packets and log them
const readable = packetStreamPair.readable
const readable = streams.readable
.pipeThrough(
new InspectStream(packet => {
globalState.appendLog('Incoming', packet);
})
);
const writable = pipeFrom(
packetStreamPair.writable,
streams.writable,
new InspectStream(packet => {
globalState.appendLog('Outgoing', packet);
})

View file

@ -1,4 +1,4 @@
import { AdbBackend, ReadableStream, WrapReadableStream, WrapWritableStream, WritableStream } from '@yume-chan/adb';
import { AdbBackend, AdbPacket, AdbPacketSerializeStream, pipeFrom, ReadableStream, StructDeserializeStream, WrapReadableStream, WrapWritableStream, WritableStream } from '@yume-chan/adb';
declare global {
interface TCPSocket {
@ -55,6 +55,7 @@ export default class AdbDirectSocketsBackend implements AdbBackend {
remotePort: this.port,
noDelay: true,
});
// Native streams can't `pipeTo()` or `pipeThrough()` polyfilled streams, so we need to wrap them
return {
readable: new WrapReadableStream<Uint8Array, ReadableStream<Uint8Array>, void>({
@ -64,15 +65,15 @@ export default class AdbDirectSocketsBackend implements AdbBackend {
state: undefined,
};
}
}),
writable: new WrapWritableStream({
}).pipeThrough(new StructDeserializeStream(AdbPacket)),
writable: pipeFrom(new WrapWritableStream({
async start() {
return {
writable,
state: undefined,
};
}
}),
}), new AdbPacketSerializeStream()),
};
}
}

View file

@ -33,6 +33,7 @@
"dependencies": {
"@types/w3c-web-usb": "^1.0.4",
"@yume-chan/adb": "^0.0.10",
"@yume-chan/struct": "^0.0.10",
"tslib": "^2.3.1"
},
"devDependencies": {

View file

@ -1,4 +1,5 @@
import { DuplexStreamFactory, type AdbBackend, type ReadableStream, type ReadableWritablePair, type WritableStream } from '@yume-chan/adb';
import { AdbPacket, AdbPacketSerializeStream, DuplexStreamFactory, pipeFrom, ReadableStream, type AdbBackend, type AdbPacketCore, type AdbPacketInit, type ReadableWritablePair, type WritableStream } from '@yume-chan/adb';
import type { StructAsyncDeserializeStream } from "@yume-chan/struct";
export const WebUsbDeviceFilter: USBDeviceFilter = {
classCode: 0xFF,
@ -6,15 +7,15 @@ export const WebUsbDeviceFilter: USBDeviceFilter = {
protocolCode: 1,
};
export class AdbWebUsbBackendStream implements ReadableWritablePair<Uint8Array, Uint8Array>{
private _readable: ReadableStream<Uint8Array>;
export class AdbWebUsbBackendStream implements ReadableWritablePair<AdbPacketCore, AdbPacketInit>{
private _readable: ReadableStream<AdbPacketCore>;
public get readable() { return this._readable; }
private _writable: WritableStream<Uint8Array>;
private _writable: WritableStream<AdbPacketInit>;
public get writable() { return this._writable; }
public constructor(device: USBDevice, inEndpoint: USBEndpoint, outEndpoint: USBEndpoint) {
const factory = new DuplexStreamFactory<Uint8Array, Uint8Array>({
const factory = new DuplexStreamFactory<AdbPacketCore, Uint8Array>({
close: async () => {
navigator.usb.removeEventListener('disconnect', handleUsbDisconnect);
try {
@ -33,9 +34,13 @@ export class AdbWebUsbBackendStream implements ReadableWritablePair<Uint8Array,
navigator.usb.addEventListener('disconnect', handleUsbDisconnect);
this._readable = factory.createReadable({
pull: async (controller) => {
const result = await device.transferIn(inEndpoint.endpointNumber, inEndpoint.packetSize);
const incomingStream: StructAsyncDeserializeStream = {
async read(length) {
// `ReadableStream<Uin8Array>` don't know how many bytes the consumer need in each `pull`,
// But `transferIn(endpointNumber, packetSize)` is much slower than `transferIn(endpointNumber, length)`
// So `AdbBackend` is refactored to use `ReadableStream<AdbPacketCore>` directly,
// (let each backend deserialize the packets in their own way)
const result = await device.transferIn(inEndpoint.endpointNumber, length);
// `USBTransferResult` has three states: "ok", "stall" and "babble",
// but ADBd on Android won't enter "stall" (halt) state,
@ -44,22 +49,25 @@ export class AdbWebUsbBackendStream implements ReadableWritablePair<Uint8Array,
// "babble" just means there is more data to be read.
// From spec, the `result.data` always covers the whole `buffer`.
const chunk = new Uint8Array(result.data!.buffer);
controller.enqueue(chunk);
},
}, {
highWaterMark: 16 * 1024,
size(chunk) { return chunk.byteLength; },
});
return new Uint8Array(result.data!.buffer);
}
};
this._writable = factory.createWritable({
this._readable = factory.createWrapReadable(new ReadableStream<AdbPacketCore>({
async pull(controller) {
const value = await AdbPacket.deserialize(incomingStream);
controller.enqueue(value);
},
}));
this._writable = pipeFrom(factory.createWritable({
write: async (chunk) => {
await device.transferOut(outEndpoint.endpointNumber, chunk);
},
}, {
highWaterMark: 16 * 1024,
size(chunk) { return chunk.byteLength; },
});
}), new AdbPacketSerializeStream());
}
}

View file

@ -12,6 +12,9 @@
"references": [
{
"path": "../adb/tsconfig.build.json"
},
{
"path": "../struct/tsconfig.json"
}
]
}

View file

@ -1,4 +1,4 @@
import { DuplexStreamFactory, type AdbBackend } from '@yume-chan/adb';
import { AdbPacket, AdbPacketSerializeStream, DuplexStreamFactory, pipeFrom, StructDeserializeStream, type AdbBackend } from '@yume-chan/adb';
export default class AdbWsBackend implements AdbBackend {
public readonly serial: string;
@ -51,6 +51,9 @@ export default class AdbWsBackend implements AdbBackend {
size(chunk) { return chunk.byteLength; },
});
return { readable, writable };
return {
readable: readable.pipeThrough(new StructDeserializeStream(AdbPacket)),
writable: pipeFrom(writable, new AdbPacketSerializeStream()),
};
}
}

View file

@ -2,9 +2,9 @@ import { PromiseResolver } from '@yume-chan/async';
import { AdbAuthenticationHandler, AdbDefaultAuthenticators, type AdbCredentialStore } from './auth.js';
import { AdbPower, AdbReverseCommand, AdbSubprocess, AdbSync, AdbTcpIpCommand, escapeArg, framebuffer, install, type AdbFrameBuffer } from './commands/index.js';
import { AdbFeatures } from './features.js';
import { AdbCommand, AdbPacket, AdbPacketSerializeStream, calculateChecksum, type AdbPacketCore, type AdbPacketInit } from './packet.js';
import { AdbCommand, AdbPacket, calculateChecksum, type AdbPacketCore, type AdbPacketInit } from './packet.js';
import { AdbPacketDispatcher, AdbSocket } from './socket/index.js';
import { AbortController, DecodeUtf8Stream, GatherStringStream, pipeFrom, StructDeserializeStream, WritableStream, type ReadableWritablePair } from "./stream/index.js";
import { AbortController, DecodeUtf8Stream, GatherStringStream, WritableStream, type ReadableWritablePair } from "./stream/index.js";
import { decodeUtf8, encodeUtf8 } from "./utils/index.js";
export enum AdbPropKey {
@ -17,27 +17,13 @@ export enum AdbPropKey {
export const VERSION_OMIT_CHECKSUM = 0x01000001;
export class Adb {
public static createConnection(
connection: ReadableWritablePair<Uint8Array, Uint8Array>
): ReadableWritablePair<AdbPacket, AdbPacketCore> {
return {
readable: connection.readable.pipeThrough(
new StructDeserializeStream(AdbPacket)
),
writable: pipeFrom(
connection.writable,
new AdbPacketSerializeStream()
),
};
}
/**
* It's possible to call `authenticate` multiple times on a single connection,
* every time the device receives a `CNXN` packet it will reset its internal state,
* and begin authentication again.
*/
public static async authenticate(
connection: ReadableWritablePair<AdbPacket, AdbPacketCore>,
connection: ReadableWritablePair<AdbPacketCore, AdbPacketCore>,
credentialStore: AdbCredentialStore,
authenticators = AdbDefaultAuthenticators,
) {
@ -157,7 +143,7 @@ export class Adb {
public readonly tcpip: AdbTcpIpCommand;
public constructor(
connection: ReadableWritablePair<AdbPacket, AdbPacketInit>,
connection: ReadableWritablePair<AdbPacketCore, AdbPacketInit>,
version: number,
maxPayloadSize: number,
banner: string,

View file

@ -1,4 +1,5 @@
import type { ValueOrPromise } from '@yume-chan/struct';
import type { AdbPacketCore, AdbPacketInit } from "./packet.js";
import type { ReadableWritablePair } from "./stream/index.js";
export interface AdbBackend {
@ -6,5 +7,5 @@ export interface AdbBackend {
readonly name: string | undefined;
connect(): ValueOrPromise<ReadableWritablePair<Uint8Array, Uint8Array>>;
connect(): ValueOrPromise<ReadableWritablePair<AdbPacketCore, AdbPacketInit>>;
}

View file

@ -2,13 +2,13 @@
import { AutoDisposable } from '@yume-chan/event';
import Struct from '@yume-chan/struct';
import type { AdbPacket } from '../packet.js';
import type { AdbPacketCore } from '../packet.js';
import type { AdbIncomingSocketEventArgs, AdbPacketDispatcher, AdbSocket } from '../socket/index.js';
import { AdbBufferedStream } from '../stream/index.js';
import { decodeUtf8 } from "../utils/index.js";
export interface AdbReverseHandler {
onSocket(packet: AdbPacket, socket: AdbSocket): void;
onSocket(packet: AdbPacketCore, socket: AdbSocket): void;
}
export interface AdbForwardListener {

View file

@ -1,6 +1,6 @@
import { AsyncOperationManager, PromiseResolver } from '@yume-chan/async';
import { AutoDisposable, EventEmitter } from '@yume-chan/event';
import { AdbCommand, AdbPacket, calculateChecksum, type AdbPacketCore, type AdbPacketInit } from '../packet.js';
import { AdbCommand, calculateChecksum, type AdbPacketCore, type AdbPacketInit } from '../packet.js';
import { AbortController, WritableStream, WritableStreamDefaultWriter, type ReadableWritablePair } from '../stream/index.js';
import { decodeUtf8, encodeUtf8 } from '../utils/index.js';
import { AdbSocket } from './socket.js';
@ -8,7 +8,7 @@ import { AdbSocket } from './socket.js';
export interface AdbIncomingSocketEventArgs {
handled: boolean;
packet: AdbPacket;
packet: AdbPacketCore;
serviceString: string;
@ -41,7 +41,7 @@ export class AdbPacketDispatcher extends AutoDisposable {
private _abortController = new AbortController();
public constructor(
connection: ReadableWritablePair<AdbPacket, AdbPacketInit>,
connection: ReadableWritablePair<AdbPacketCore, AdbPacketInit>,
) {
super();
@ -91,7 +91,7 @@ export class AdbPacketDispatcher extends AutoDisposable {
this._writer = connection.writable.getWriter();
}
private handleOk(packet: AdbPacket) {
private handleOk(packet: AdbPacketCore) {
if (this.initializers.resolve(packet.arg1, packet.arg0)) {
// Device successfully created the socket
return;
@ -109,7 +109,7 @@ export class AdbPacketDispatcher extends AutoDisposable {
this.sendPacket(AdbCommand.Close, packet.arg1, packet.arg0);
}
private async handleClose(packet: AdbPacket) {
private async handleClose(packet: AdbPacketCore) {
// From https://android.googlesource.com/platform/packages/modules/adb/+/65d18e2c1cc48b585811954892311b28a4c3d188/adb.cpp#459
/* According to protocol.txt, p->msg.arg0 might be 0 to indicate
* a failed OPEN only. However, due to a bug in previous ADB
@ -139,7 +139,7 @@ export class AdbPacketDispatcher extends AutoDisposable {
// Just ignore it
}
private async handleOpen(packet: AdbPacket) {
private async handleOpen(packet: AdbPacketCore) {
// AsyncOperationManager doesn't support get and skip an ID
// Use `add` + `resolve` to simulate this behavior
const [localId] = this.initializers.add<number>();

View file

@ -179,15 +179,11 @@ export class StructDeserializeStream<T extends Struct<any, any, any, any>>
)
);
this._readable = new PushReadableStream<StructValueType<T>>(
async controller => {
this._readable = new ReadableStream<StructValueType<T>>({
async pull(controller) {
try {
// Unless we make `deserialize` be capable of pausing/resuming,
// We always need at least one pull loop
while (true) {
const value = await struct.deserialize(incomingStream);
await controller.enqueue(value);
}
controller.enqueue(value);
} catch (e) {
if (e instanceof BufferedStreamEndedError) {
controller.close();
@ -196,7 +192,7 @@ export class StructDeserializeStream<T extends Struct<any, any, any, any>>
throw e;
}
}
);
});
this._writable = new WritableStream({
async write(chunk) {