refactor: performance optimizations

also renamed some internal types, and adding more code comments. (not thoroughly tested yet)
This commit is contained in:
Simon Chan 2024-05-04 23:47:52 +08:00
parent 65b8671b66
commit 8223d79886
No known key found for this signature in database
GPG key ID: A8B69F750B9BCEDD
49 changed files with 932 additions and 767 deletions

View file

@ -24,27 +24,56 @@ import { AdbCommand, calculateChecksum } from "./packet.js";
import { AdbDaemonSocketController } from "./socket.js";
export interface AdbPacketDispatcherOptions {
calculateChecksum: boolean;
/**
* Before Android 9.0, ADB uses `char*` to parse service string,
* From Android 9.0, ADB stopped checking the checksum in packet header to improve performance.
*
* The value should be inferred from the device's ADB protocol version.
*/
calculateChecksum: boolean;
/**
* Before Android 9.0, ADB uses `char*` to parse service strings,
* thus requires a null character to terminate.
*
* Usually it should have the same value as `calculateChecksum`.
* The value should be inferred from the device's ADB protocol version.
* Usually it should have the same value as `calculateChecksum`, since they both changed
* in Android 9.0.
*/
appendNullToServiceString: boolean;
maxPayloadSize: number;
/**
* The number of bytes the device can send before receiving an ack packet.
* Set to 0 or any negative value to disable delayed ack.
* Otherwise the value must be in the range of unsigned 32-bit integer.
*/
initialDelayedAckBytes: number;
maxPayloadSize: number;
/**
* Whether to preserve the connection open after the `AdbPacketDispatcher` is closed.
* Whether to keep the `connection` open (don't call `writable.close` and `readable.cancel`)
* when `AdbPacketDispatcher.close` is called.
*
* @default false
*/
preserveConnection?: boolean | undefined;
debugSlowRead?: boolean | undefined;
/**
* The number of bytes the device can send before receiving an ack packet.
* Using delayed ack can improve the throughput,
* especially when the device is connected over Wi-Fi (so the latency is higher).
*
* This must be the negotiated value between the client and device. If the device enabled
* delayed ack but the client didn't, the device will throw an error when the client sends
* the first `WRTE` packet. And vice versa.
*/
initialDelayedAckBytes: number;
/**
* When set, the dispatcher will throw an error when
* one of the socket readable stalls for this amount of milliseconds.
*
* Because ADB is a multiplexed protocol, blocking one socket will also block all other sockets.
* It's important to always read from all sockets to prevent stalling.
*
* This option is helpful to detect bugs in the client code.
*
* @default false
*/
readTimeLimit?: number | undefined;
}
interface SocketOpenResult {
@ -225,12 +254,19 @@ export class AdbPacketDispatcher implements Closeable {
// Maybe the device is responding to a packet of last connection
// Tell the device to close the socket
void this.sendPacket(AdbCommand.Close, packet.arg1, packet.arg0);
void this.sendPacket(
AdbCommand.Close,
packet.arg1,
packet.arg0,
EMPTY_UINT8_ARRAY,
);
}
#sendOkay(localId: number, remoteId: number, ackBytes: number) {
let payload: Uint8Array;
if (this.options.initialDelayedAckBytes !== 0) {
// TODO: try reusing this buffer to reduce memory allocation
// However, that requires blocking reentrance of `sendOkay`, which might be more expensive
payload = new Uint8Array(4);
setUint32LittleEndian(payload, 0, ackBytes);
} else {
@ -241,22 +277,24 @@ export class AdbPacketDispatcher implements Closeable {
}
async #handleOpen(packet: AdbPacketData) {
// `AsyncOperationManager` doesn't support skipping IDs
// Use `add` + `resolve` to simulate this behavior
// Allocate a local ID for the socket from `#initializers`.
// `AsyncOperationManager` doesn't directly support returning the next ID,
// so use `add` + `resolve` to simulate this
const [localId] = this.#initializers.add<number>();
this.#initializers.resolve(localId, undefined);
const remoteId = packet.arg0;
let initialDelayedAckBytes = packet.arg1;
let availableWriteBytes = packet.arg1;
const service = decodeUtf8(packet.payload);
// Check remote delayed ack enablement is consistent with local
if (this.options.initialDelayedAckBytes === 0) {
if (initialDelayedAckBytes !== 0) {
if (availableWriteBytes !== 0) {
throw new Error("Invalid OPEN packet. arg1 should be 0");
}
initialDelayedAckBytes = Infinity;
availableWriteBytes = Infinity;
} else {
if (initialDelayedAckBytes === 0) {
if (availableWriteBytes === 0) {
throw new Error(
"Invalid OPEN packet. arg1 should be greater than 0",
);
@ -265,7 +303,12 @@ export class AdbPacketDispatcher implements Closeable {
const handler = this.#incomingSocketHandlers.get(service);
if (!handler) {
await this.sendPacket(AdbCommand.Close, 0, remoteId);
await this.sendPacket(
AdbCommand.Close,
0,
remoteId,
EMPTY_UINT8_ARRAY,
);
return;
}
@ -275,8 +318,8 @@ export class AdbPacketDispatcher implements Closeable {
remoteId,
localCreated: false,
service,
availableWriteBytes,
});
controller.ack(initialDelayedAckBytes);
try {
await handler(controller.socket);
@ -287,7 +330,12 @@ export class AdbPacketDispatcher implements Closeable {
this.options.initialDelayedAckBytes,
);
} catch (e) {
await this.sendPacket(AdbCommand.Close, 0, remoteId);
await this.sendPacket(
AdbCommand.Close,
0,
remoteId,
EMPTY_UINT8_ARRAY,
);
}
}
@ -298,14 +346,8 @@ export class AdbPacketDispatcher implements Closeable {
}
let handled = false;
await Promise.race([
delay(5000).then(() => {
if (this.options.debugSlowRead && !handled) {
throw new Error(
`packet for \`${socket.service}\` not handled in 5 seconds`,
);
}
}),
const promises: Promise<void>[] = [
(async () => {
await socket.enqueue(packet.payload);
await this.#sendOkay(
@ -315,9 +357,22 @@ export class AdbPacketDispatcher implements Closeable {
);
handled = true;
})(),
]);
];
return;
if (this.options.readTimeLimit) {
promises.push(
(async () => {
await delay(this.options.readTimeLimit!);
if (!handled) {
throw new Error(
`readable of \`${socket.service}\` has stalled for ${this.options.readTimeLimit} milliseconds`,
);
}
})(),
);
}
await Promise.race(promises);
}
async createSocket(service: string): Promise<AdbSocket> {
@ -342,8 +397,8 @@ export class AdbPacketDispatcher implements Closeable {
remoteId,
localCreated: true,
service,
availableWriteBytes,
});
controller.ack(availableWriteBytes);
this.#sockets.set(localId, controller);
return controller.socket;
@ -365,7 +420,8 @@ export class AdbPacketDispatcher implements Closeable {
command: AdbCommand,
arg0: number,
arg1: number,
payload: string | Uint8Array = EMPTY_UINT8_ARRAY,
// PERF: It's slightly faster to not use default parameter values
payload: string | Uint8Array,
): Promise<void> {
if (typeof payload === "string") {
payload = encodeUtf8(payload);