feat(adb): add partial support for ADB server version 40

This commit is contained in:
Simon Chan 2024-12-30 16:40:23 +08:00
parent fba7533d78
commit ab98953b69
No known key found for this signature in database
GPG key ID: A8B69F750B9BCEDD
9 changed files with 396 additions and 187 deletions

View file

@ -0,0 +1,5 @@
---
"@yume-chan/adb": minor
---
Add partial support for ADB server version 40

View file

@ -4,7 +4,7 @@
import "source-map-support/register.js"; import "source-map-support/register.js";
import { Adb, AdbServerClient } from "@yume-chan/adb"; import { Adb, AdbServerClient, Ref } from "@yume-chan/adb";
import { AdbServerNodeTcpConnector } from "@yume-chan/adb-server-node-tcp"; import { AdbServerNodeTcpConnector } from "@yume-chan/adb-server-node-tcp";
import { WritableStream } from "@yume-chan/stream-extra"; import { WritableStream } from "@yume-chan/stream-extra";
import { program } from "commander"; import { program } from "commander";
@ -132,6 +132,8 @@ createDeviceCommand("shell [args...]")
) )
.configureHelp({ showGlobalOptions: true }) .configureHelp({ showGlobalOptions: true })
.action(async (args: string[], options: DeviceCommandOptions) => { .action(async (args: string[], options: DeviceCommandOptions) => {
const ref = new Ref();
const adb = await createAdb(options); const adb = await createAdb(options);
const shell = await adb.subprocess.shell(args); const shell = await adb.subprocess.shell(args);
@ -169,6 +171,8 @@ createDeviceCommand("shell [args...]")
process.exit(1); process.exit(1);
}, },
); );
ref.unref();
}); });
createDeviceCommand("logcat [args...]") createDeviceCommand("logcat [args...]")

View file

@ -3,123 +3,28 @@
import type { MaybePromiseLike } from "@yume-chan/async"; import type { MaybePromiseLike } from "@yume-chan/async";
import { PromiseResolver } from "@yume-chan/async"; import { PromiseResolver } from "@yume-chan/async";
import type { Event } from "@yume-chan/event"; import type { Event } from "@yume-chan/event";
import { EventEmitter } from "@yume-chan/event";
import { getUint64LittleEndian } from "@yume-chan/no-data-view"; import { getUint64LittleEndian } from "@yume-chan/no-data-view";
import type { import type {
AbortSignal, AbortSignal,
MaybeConsumable, MaybeConsumable,
ReadableWritablePair, ReadableWritablePair,
WritableStreamDefaultWriter,
} from "@yume-chan/stream-extra"; } from "@yume-chan/stream-extra";
import { import { AbortController } from "@yume-chan/stream-extra";
BufferedReadableStream,
tryCancel,
tryClose,
} from "@yume-chan/stream-extra";
import {
bipedal,
decodeUtf8,
encodeUtf8,
TextDecoder,
} from "@yume-chan/struct";
import type { AdbIncomingSocketHandler, AdbSocket, Closeable } from "../adb.js"; import type { AdbIncomingSocketHandler, AdbSocket, Closeable } from "../adb.js";
import { AdbBanner } from "../banner.js"; import { AdbBanner } from "../banner.js";
import type { DeviceObserver as DeviceObserverBase } from "../device-observer.js"; import type { DeviceObserver as DeviceObserverBase } from "../device-observer.js";
import type { AdbFeature } from "../features.js"; import type { AdbFeature } from "../features.js";
import { hexToNumber, sequenceEqual, write4HexDigits } from "../utils/index.js"; import { hexToNumber, sequenceEqual } from "../utils/index.js";
import { AdbServerDeviceObserverOwner } from "./observer.js";
import { AdbServerStream, FAIL } from "./stream.js";
import { AdbServerTransport } from "./transport.js"; import { AdbServerTransport } from "./transport.js";
const OKAY = encodeUtf8("OKAY");
const FAIL = encodeUtf8("FAIL");
class AdbServerStream {
#connection: AdbServerClient.ServerConnection;
#buffered: BufferedReadableStream;
#writer: WritableStreamDefaultWriter<Uint8Array>;
constructor(connection: AdbServerClient.ServerConnection) {
this.#connection = connection;
this.#buffered = new BufferedReadableStream(connection.readable);
this.#writer = connection.writable.getWriter();
}
readExactly(length: number): MaybePromiseLike<Uint8Array> {
return this.#buffered.readExactly(length);
}
readString = bipedal(function* (this: AdbServerStream, then) {
const data = yield* then(this.readExactly(4));
const length = hexToNumber(data);
if (length === 0) {
return "";
} else {
const decoder = new TextDecoder();
let result = "";
const iterator = this.#buffered.iterateExactly(length);
while (true) {
const { done, value } = iterator.next();
if (done) {
break;
}
result += decoder.decode(yield* then(value), { stream: true });
}
result += decoder.decode();
return result;
}
});
async writeString(value: string): Promise<void> {
// TODO: investigate using `encodeUtf8("0000" + value)` then modifying the length
// That way allocates a new string (hopefully only a rope) instead of a new buffer
const encoded = encodeUtf8(value);
const buffer = new Uint8Array(4 + encoded.length);
write4HexDigits(buffer, 0, encoded.length);
buffer.set(encoded, 4);
await this.#writer.write(buffer);
}
async readOkay(): Promise<void> {
const response = await this.readExactly(4);
if (sequenceEqual(response, OKAY)) {
// `OKAY` is followed by data length and data
// But different services want to parse the data differently
// So don't read the data here
return;
}
if (sequenceEqual(response, FAIL)) {
const reason = await this.readString();
throw new Error(reason);
}
throw new Error(`Unexpected response: ${decodeUtf8(response)}`);
}
release() {
this.#writer.releaseLock();
return {
readable: this.#buffered.release(),
writable: this.#connection.writable,
closed: this.#connection.closed,
close: () => this.#connection.close(),
};
}
async dispose() {
void tryCancel(this.#buffered);
void tryClose(this.#writer);
await this.#connection.close();
}
}
/** /**
* Client for the ADB Server. * Client for the ADB Server.
*/ */
export class AdbServerClient { export class AdbServerClient {
static readonly VERSION = 41;
static parseDeviceList(value: string): AdbServerClient.Device[] { static parseDeviceList(value: string): AdbServerClient.Device[] {
const devices: AdbServerClient.Device[] = []; const devices: AdbServerClient.Device[] = [];
for (const line of value.split("\n")) { for (const line of value.split("\n")) {
@ -196,6 +101,7 @@ export class AdbServerClient {
readonly wireless = new AdbServerClient.WirelessCommands(this); readonly wireless = new AdbServerClient.WirelessCommands(this);
readonly mDns = new AdbServerClient.MDnsCommands(this); readonly mDns = new AdbServerClient.MDnsCommands(this);
#observerOwner = new AdbServerDeviceObserverOwner(this);
constructor(connector: AdbServerClient.ServerConnector) { constructor(connector: AdbServerClient.ServerConnector) {
this.connector = connector; this.connector = connector;
@ -240,11 +146,11 @@ export class AdbServerClient {
} }
} }
async validateVersion() { async validateVersion(minimalVersion: number) {
const version = await this.getVersion(); const version = await this.getVersion();
if (version !== AdbServerClient.VERSION) { if (version < minimalVersion) {
throw new Error( throw new Error(
`adb server version (${version}) doesn't match this client (${AdbServerClient.VERSION})`, `adb server version (${version}) doesn't match this client (${minimalVersion})`,
); );
} }
} }
@ -288,61 +194,10 @@ export class AdbServerClient {
/** /**
* Monitors device list changes. * Monitors device list changes.
*/ */
async trackDevices(): Promise<AdbServerClient.DeviceObserver> { async trackDevices(
const connection = await this.createConnection("host:track-devices-l"); options?: AdbServerClient.ServerConnectionOptions,
): Promise<AdbServerClient.DeviceObserver> {
let current: AdbServerClient.Device[] = []; return this.#observerOwner.createObserver(options);
const onError = new EventEmitter<Error>();
const onDeviceAdd = new EventEmitter<AdbServerClient.Device[]>();
const onDeviceRemove = new EventEmitter<AdbServerClient.Device[]>();
const onListChange = new EventEmitter<AdbServerClient.Device[]>();
void (async () => {
try {
while (true) {
const response = await connection.readString();
const next = AdbServerClient.parseDeviceList(response);
const added: AdbServerClient.Device[] = [];
for (const nextDevice of next) {
const index = current.findIndex(
(device) =>
device.transportId === nextDevice.transportId,
);
if (index === -1) {
added.push(nextDevice);
continue;
}
current[index] = current[current.length - 1]!;
current.length -= 1;
}
if (added.length) {
onDeviceAdd.fire(added);
}
if (current.length) {
onDeviceRemove.fire(current);
}
current = next;
onListChange.fire(current);
}
} catch (e) {
onError.fire(e as Error);
}
})();
return {
onError: onError.event,
onDeviceAdd: onDeviceAdd.event,
onDeviceRemove: onDeviceRemove.event,
onListChange: onListChange.event,
get current() {
return current;
},
stop: () => connection.dispose(),
};
} }
/** /**
@ -412,20 +267,22 @@ export class AdbServerClient {
device: AdbServerClient.DeviceSelector, device: AdbServerClient.DeviceSelector,
service: string, service: string,
): Promise<AdbServerClient.Socket> { ): Promise<AdbServerClient.Socket> {
await this.validateVersion();
let switchService: string; let switchService: string;
let transportId: bigint | undefined; let transportId: bigint | undefined;
if (!device) { if (!device) {
await this.validateVersion(41);
switchService = `host:tport:any`; switchService = `host:tport:any`;
} else if ("transportId" in device) { } else if ("transportId" in device) {
switchService = `host:transport-id:${device.transportId}`; switchService = `host:transport-id:${device.transportId}`;
transportId = device.transportId; transportId = device.transportId;
} else if ("serial" in device) { } else if ("serial" in device) {
await this.validateVersion(41);
switchService = `host:tport:serial:${device.serial}`; switchService = `host:tport:serial:${device.serial}`;
} else if ("usb" in device) { } else if ("usb" in device) {
await this.validateVersion(41);
switchService = `host:tport:usb`; switchService = `host:tport:usb`;
} else if ("tcp" in device) { } else if ("tcp" in device) {
await this.validateVersion(41);
switchService = `host:tport:local`; switchService = `host:tport:local`;
} else { } else {
throw new TypeError("Invalid device selector"); throw new TypeError("Invalid device selector");
@ -467,18 +324,7 @@ export class AdbServerClient {
throw e; throw e;
} }
} }
async #waitForUnchecked(
/**
* Wait for a device to be connected or disconnected.
*
* `adb wait-for-<state>`
*
* @param device The device selector
* @param state The state to wait for
* @param options The options
* @returns A promise that resolves when the condition is met.
*/
async waitFor(
device: AdbServerClient.DeviceSelector, device: AdbServerClient.DeviceSelector,
state: "device" | "disconnect", state: "device" | "disconnect",
options?: AdbServerClient.ServerConnectionOptions, options?: AdbServerClient.ServerConnectionOptions,
@ -513,6 +359,60 @@ export class AdbServerClient {
} }
} }
/**
* Wait for a device to be connected or disconnected.
*
* `adb wait-for-<state>`
*
* @param device The device selector
* @param state The state to wait for
* @param options The options
* @returns A promise that resolves when the condition is met.
*/
async waitFor(
device: AdbServerClient.DeviceSelector,
state: "device" | "disconnect",
options?: AdbServerClient.ServerConnectionOptions,
): Promise<void> {
if (state === "disconnect") {
await this.validateVersion(41);
}
return this.#waitForUnchecked(device, state, options);
}
async waitForDisconnect(
transportId: bigint,
options?: AdbServerClient.ServerConnectionOptions,
): Promise<void> {
const serverVersion = await this.getVersion();
if (serverVersion >= 41) {
return this.#waitForUnchecked(
{ transportId },
"disconnect",
options,
);
} else {
const observer = await this.trackDevices(options);
return new Promise<void>((resolve, reject) => {
observer.onDeviceRemove((devices) => {
if (
devices.some(
(device) => device.transportId === transportId,
)
) {
observer.stop();
resolve();
}
});
observer.onError((e) => {
observer.stop();
reject(e);
});
});
}
}
/** /**
* Creates an ADB Transport for the specified device. * Creates an ADB Transport for the specified device.
*/ */
@ -533,12 +433,26 @@ export class AdbServerClient {
features, features,
); );
return new AdbServerTransport( const waitAbortController = new AbortController();
const disconnected = this.waitForDisconnect(transportId, {
unref: true,
signal: waitAbortController.signal,
});
const transport = new AdbServerTransport(
this, this,
info?.serial ?? "", info?.serial ?? "",
banner, banner,
transportId, transportId,
disconnected,
); );
transport.disconnected.then(
() => waitAbortController.abort(),
() => waitAbortController.abort(),
);
return transport;
} }
} }

View file

@ -1,2 +1,4 @@
export * from "./client.js"; export * from "./client.js";
export * from "./observer.js";
export * from "./stream.js";
export * from "./transport.js"; export * from "./transport.js";

View file

@ -0,0 +1,144 @@
import { EventEmitter } from "@yume-chan/event";
import { Ref } from "../utils/index.js";
import { AdbServerClient } from "./client.js";
import type { AdbServerStream } from "./stream.js";
function unorderedRemove<T>(array: T[], index: number) {
array[index] = array[array.length - 1]!;
array.length -= 1;
}
export class AdbServerDeviceObserverOwner {
current: AdbServerClient.Device[] = [];
#client: AdbServerClient;
#stream: Promise<AdbServerStream> | undefined;
#observers: {
onDeviceAdd: EventEmitter<AdbServerClient.Device[]>;
onDeviceRemove: EventEmitter<AdbServerClient.Device[]>;
onListChange: EventEmitter<AdbServerClient.Device[]>;
onError: EventEmitter<Error>;
}[] = [];
constructor(client: AdbServerClient) {
this.#client = client;
}
async #receive(stream: AdbServerStream) {
try {
while (true) {
const response = await stream.readString();
const next = AdbServerClient.parseDeviceList(response);
const added: AdbServerClient.Device[] = [];
for (const nextDevice of next) {
const index = this.current.findIndex(
(device) =>
device.transportId === nextDevice.transportId,
);
if (index === -1) {
added.push(nextDevice);
continue;
}
unorderedRemove(this.current, index);
}
if (added.length) {
for (const observer of this.#observers) {
observer.onDeviceAdd.fire(added);
}
}
if (this.current.length) {
for (const observer of this.#observers) {
observer.onDeviceRemove.fire(this.current);
}
}
this.current = next;
for (const observer of this.#observers) {
observer.onListChange.fire(this.current);
}
}
} catch (e) {
for (const observer of this.#observers) {
observer.onError.fire(e as Error);
}
}
}
async #connect() {
const stream = await this.#client.createConnection(
"host:track-devices-l",
// Each individual observer will ref depending on their options
{ unref: true },
);
void this.#receive(stream);
return stream;
}
async #handleObserverStop(stream: AdbServerStream) {
if (this.#observers.length === 0) {
this.#stream = undefined;
await stream.dispose();
}
}
async createObserver(
options?: AdbServerClient.ServerConnectionOptions,
): Promise<AdbServerClient.DeviceObserver> {
if (options?.signal?.aborted) {
throw options.signal.reason;
}
this.#stream ??= this.#connect();
const stream = await this.#stream;
if (options?.signal?.aborted) {
await this.#handleObserverStop(stream);
throw options.signal.reason;
}
const onDeviceAdd = new EventEmitter<AdbServerClient.Device[]>();
const onDeviceRemove = new EventEmitter<AdbServerClient.Device[]>();
const onListChange = new EventEmitter<AdbServerClient.Device[]>();
const onError = new EventEmitter<Error>();
const observer = { onDeviceAdd, onDeviceRemove, onListChange, onError };
this.#observers.push(observer);
const ref = new Ref(options);
const stop = async () => {
const index = self.#observers.indexOf(observer);
if (index === -1) {
return;
}
unorderedRemove(this.#observers, index);
await this.#handleObserverStop(stream);
ref.unref();
};
options?.signal?.addEventListener("abort", () => void stop());
// eslint-disable-next-line @typescript-eslint/no-this-alias
const self = this;
return {
onDeviceAdd: onDeviceAdd.event,
onDeviceRemove: onDeviceRemove.event,
onListChange: onListChange.event,
onError: onError.event,
get current() {
return self.current;
},
stop,
};
}
}

View file

@ -0,0 +1,100 @@
import type { MaybePromiseLike } from "@yume-chan/async";
import type { WritableStreamDefaultWriter } from "@yume-chan/stream-extra";
import {
BufferedReadableStream,
tryCancel,
tryClose,
} from "@yume-chan/stream-extra";
import {
bipedal,
decodeUtf8,
encodeUtf8,
TextDecoder,
} from "@yume-chan/struct";
import { hexToNumber, sequenceEqual, write4HexDigits } from "../utils/index.js";
import type { AdbServerClient } from "./client.js";
const OKAY = encodeUtf8("OKAY");
export const FAIL = encodeUtf8("FAIL");
export class AdbServerStream {
#connection: AdbServerClient.ServerConnection;
#buffered: BufferedReadableStream;
#writer: WritableStreamDefaultWriter<Uint8Array>;
constructor(connection: AdbServerClient.ServerConnection) {
this.#connection = connection;
this.#buffered = new BufferedReadableStream(connection.readable);
this.#writer = connection.writable.getWriter();
}
readExactly(length: number): MaybePromiseLike<Uint8Array> {
return this.#buffered.readExactly(length);
}
readString = bipedal(function* (this: AdbServerStream, then) {
const data = yield* then(this.readExactly(4));
const length = hexToNumber(data);
if (length === 0) {
return "";
} else {
const decoder = new TextDecoder();
let result = "";
const iterator = this.#buffered.iterateExactly(length);
while (true) {
const { done, value } = iterator.next();
if (done) {
break;
}
result += decoder.decode(yield* then(value), { stream: true });
}
result += decoder.decode();
return result;
}
});
async readOkay(): Promise<void> {
const response = await this.readExactly(4);
if (sequenceEqual(response, OKAY)) {
// `OKAY` is followed by data length and data
// But different services want to parse the data differently
// So don't read the data here
return;
}
if (sequenceEqual(response, FAIL)) {
const reason = await this.readString();
throw new Error(reason);
}
throw new Error(`Unexpected response: ${decodeUtf8(response)}`);
}
async writeString(value: string): Promise<void> {
// TODO: investigate using `encodeUtf8("0000" + value)` then modifying the length
// That way allocates a new string (hopefully only a rope) instead of a new buffer
const encoded = encodeUtf8(value);
const buffer = new Uint8Array(4 + encoded.length);
write4HexDigits(buffer, 0, encoded.length);
buffer.set(encoded, 4);
await this.#writer.write(buffer);
}
release() {
this.#writer.releaseLock();
return {
readable: this.#buffered.release(),
writable: this.#connection.writable,
closed: this.#connection.closed,
close: () => this.#connection.close(),
};
}
async dispose() {
void tryCancel(this.#buffered);
void tryClose(this.#writer);
await this.#connection.close();
}
}

View file

@ -1,6 +1,4 @@
import type { MaybePromiseLike } from "@yume-chan/async";
import { PromiseResolver } from "@yume-chan/async"; import { PromiseResolver } from "@yume-chan/async";
import { AbortController } from "@yume-chan/stream-extra";
import type { import type {
AdbIncomingSocketHandler, AdbIncomingSocketHandler,
@ -44,9 +42,13 @@ export class AdbServerTransport implements AdbTransport {
readonly banner: AdbBanner; readonly banner: AdbBanner;
#sockets: AdbSocket[] = [];
#closed = new PromiseResolver<void>(); #closed = new PromiseResolver<void>();
#waitAbortController = new AbortController(); #disconnected: Promise<void>;
readonly disconnected: Promise<void>; get disconnected() {
return this.#disconnected;
}
get clientFeatures() { get clientFeatures() {
// No need to get host features (features supported by ADB server) // No need to get host features (features supported by ADB server)
@ -54,31 +56,29 @@ export class AdbServerTransport implements AdbTransport {
return ADB_SERVER_DEFAULT_FEATURES; return ADB_SERVER_DEFAULT_FEATURES;
} }
// eslint-disable-next-line @typescript-eslint/max-params
constructor( constructor(
client: AdbServerClient, client: AdbServerClient,
serial: string, serial: string,
banner: AdbBanner, banner: AdbBanner,
transportId: bigint, transportId: bigint,
disconnected: Promise<void>,
) { ) {
this.#client = client; this.#client = client;
this.serial = serial; this.serial = serial;
this.banner = banner; this.banner = banner;
this.transportId = transportId; this.transportId = transportId;
this.disconnected = Promise.race([ this.#disconnected = Promise.race([this.#closed.promise, disconnected]);
this.#closed.promise,
client.waitFor({ transportId }, "disconnect", {
signal: this.#waitAbortController.signal,
unref: true,
}),
]);
} }
async connect(service: string): Promise<AdbSocket> { async connect(service: string): Promise<AdbSocket> {
return await this.#client.createDeviceConnection( const socket = await this.#client.createDeviceConnection(
{ transportId: this.transportId }, { transportId: this.transportId },
service, service,
); );
this.#sockets.push(socket);
return socket;
} }
async addReverseTunnel( async addReverseTunnel(
@ -96,8 +96,10 @@ export class AdbServerTransport implements AdbTransport {
await this.#client.connector.clearReverseTunnels(); await this.#client.connector.clearReverseTunnels();
} }
close(): MaybePromiseLike<void> { async close(): Promise<void> {
for (const socket of this.#sockets) {
await socket.close();
}
this.#closed.resolve(); this.#closed.resolve();
this.#waitAbortController.abort();
} }
} }

View file

@ -3,4 +3,5 @@ export * from "./auto-reset-event.js";
export * from "./base64.js"; export * from "./base64.js";
export * from "./hex.js"; export * from "./hex.js";
export * from "./no-op.js"; export * from "./no-op.js";
export * from "./ref.js";
export * from "./sequence-equal.js"; export * from "./sequence-equal.js";

View file

@ -0,0 +1,37 @@
interface GlobalExtension {
setInterval: (callback: () => void, delay: number) => number;
clearInterval: (id: number) => void;
}
const { setInterval, clearInterval } = globalThis as unknown as GlobalExtension;
/**
* An object to keep current Node.js process alive even when no code is running.
*
* Does nothing in Web environments.
*
* Note that it does't have reference counting. Calling `unref` will
* remove the ref no matter how many times `ref` has been previously called, and vice versa.
* This is the same as how Node.js works.
*/
export class Ref {
#intervalId: number | undefined;
constructor(options?: { unref?: boolean | undefined }) {
if (!options?.unref) {
this.ref();
}
}
ref() {
// `setInterval` can keep current Node.js alive, the delay value doesn't matter
this.#intervalId = setInterval(() => {}, 60 * 1000);
}
unref() {
if (this.#intervalId) {
clearInterval(this.#intervalId);
this.#intervalId = undefined;
}
}
}