feat(adb): rewrite process spawner API (#739)

This commit is contained in:
Simon Chan 2025-04-02 15:20:05 +08:00 committed by GitHub
parent 46e78401a4
commit d3019ce738
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
75 changed files with 1422 additions and 1022 deletions

View file

@ -10,7 +10,7 @@ import type { AdbFrameBuffer } from "./commands/index.js";
import {
AdbPower,
AdbReverseCommand,
AdbSubprocess,
AdbSubprocessService,
AdbSync,
AdbTcpIpCommand,
escapeArg,
@ -30,7 +30,7 @@ export interface AdbSocket
Closeable {
get service(): string;
get closed(): Promise<void>;
get closed(): Promise<undefined>;
}
export type AdbIncomingSocketHandler = (
@ -87,7 +87,7 @@ export class Adb implements Closeable {
return this.banner.features;
}
readonly subprocess: AdbSubprocess;
readonly subprocess: AdbSubprocessService;
readonly power: AdbPower;
readonly reverse: AdbReverseCommand;
readonly tcpip: AdbTcpIpCommand;
@ -95,7 +95,7 @@ export class Adb implements Closeable {
constructor(transport: AdbTransport) {
this.transport = transport;
this.subprocess = new AdbSubprocess(this);
this.subprocess = new AdbSubprocessService(this);
this.power = new AdbPower(this);
this.reverse = new AdbReverseCommand(this);
this.tcpip = new AdbTcpIpCommand(this);
@ -122,15 +122,13 @@ export class Adb implements Closeable {
.pipeThrough(new ConcatStringStream());
}
async getProp(key: string): Promise<string> {
const stdout = await this.subprocess.spawnAndWaitLegacy([
"getprop",
key,
]);
return stdout.trim();
getProp(key: string): Promise<string> {
return this.subprocess.noneProtocol
.spawnWaitText(["getprop", key])
.then((output) => output.trim());
}
async rm(
rm(
filenames: string | string[],
options?: { recursive?: boolean; force?: boolean },
): Promise<string> {
@ -150,8 +148,8 @@ export class Adb implements Closeable {
}
// https://android.googlesource.com/platform/packages/modules/adb/+/1a0fb8846d4e6b671c8aa7f137a8c21d7b248716/client/adb_install.cpp#984
args.push("</dev/null");
const stdout = await this.subprocess.spawnAndWaitLegacy(args);
return stdout;
return this.subprocess.noneProtocol.spawnWaitText(args);
}
async sync(): Promise<AdbSync> {

View file

@ -2,11 +2,14 @@ import { AutoDisposable } from "@yume-chan/event";
import type { Adb } from "../adb.js";
export class AdbCommandBase extends AutoDisposable {
protected adb: Adb;
export class AdbServiceBase extends AutoDisposable {
#adb: Adb;
get adb() {
return this.#adb;
}
constructor(adb: Adb) {
super();
this.adb = adb;
this.#adb = adb;
}
}

View file

@ -3,9 +3,9 @@
// cspell: ignore keyevent
// cspell: ignore longpress
import { AdbCommandBase } from "./base.js";
import { AdbServiceBase } from "./base.js";
export class AdbPower extends AdbCommandBase {
export class AdbPower extends AdbServiceBase {
reboot(mode = "") {
return this.adb.createSocketAndWait(`reboot:${mode}`);
}
@ -35,18 +35,18 @@ export class AdbPower extends AdbCommandBase {
return this.reboot("edl");
}
powerOff() {
return this.adb.subprocess.spawnAndWaitLegacy(["reboot", "-p"]);
powerOff(): Promise<string> {
return this.adb.subprocess.noneProtocol.spawnWaitText(["reboot", "-p"]);
}
powerButton(longPress = false) {
powerButton(longPress = false): Promise<string> {
const args = ["input", "keyevent"];
if (longPress) {
args.push("--longpress");
}
args.push("POWER");
return this.adb.subprocess.spawnAndWaitLegacy(args);
return this.adb.subprocess.noneProtocol.spawnWaitText(args);
}
/**

View file

@ -1,144 +0,0 @@
import type { AbortSignal } from "@yume-chan/stream-extra";
import { ConcatStringStream, TextDecoderStream } from "@yume-chan/stream-extra";
import { AdbCommandBase } from "../base.js";
import type {
AdbSubprocessProtocol,
AdbSubprocessProtocolConstructor,
} from "./protocols/index.js";
import {
AdbSubprocessNoneProtocol,
AdbSubprocessShellProtocol,
} from "./protocols/index.js";
export interface AdbSubprocessOptions {
/**
* A list of `AdbSubprocessProtocolConstructor`s to be used.
*
* Different `AdbSubprocessProtocol` has different capabilities, thus requires specific adaptations.
* Check their documentations for details.
*
* The first protocol whose `isSupported` returns `true` will be used.
* If no `AdbSubprocessProtocol` is supported, an error will be thrown.
*
* @default [AdbSubprocessShellProtocol, AdbSubprocessNoneProtocol]
*/
protocols: AdbSubprocessProtocolConstructor[];
signal?: AbortSignal;
}
const DEFAULT_OPTIONS = {
protocols: [AdbSubprocessShellProtocol, AdbSubprocessNoneProtocol],
} satisfies AdbSubprocessOptions;
export interface AdbSubprocessWaitResult {
stdout: string;
stderr: string;
exitCode: number;
}
export class AdbSubprocess extends AdbCommandBase {
async #createProtocol(
mode: "pty" | "raw",
command?: string | string[],
options?: Partial<AdbSubprocessOptions>,
): Promise<AdbSubprocessProtocol> {
const { protocols, signal } = { ...DEFAULT_OPTIONS, ...options };
let Constructor: AdbSubprocessProtocolConstructor | undefined;
for (const item of protocols) {
// It's async so can't use `Array#find`
if (await item.isSupported(this.adb)) {
Constructor = item;
break;
}
}
if (!Constructor) {
throw new Error("No specified protocol is supported by the device");
}
if (Array.isArray(command)) {
command = command.join(" ");
} else if (command === undefined) {
// spawn the default shell
command = "";
}
return await Constructor[mode](this.adb, command, signal);
}
/**
* Spawns an executable in PTY mode.
*
* Redirection mode is enough for most simple commands, but PTY mode is required for
* commands that manipulate the terminal, such as `vi` and `less`.
* @param command The command to run. If omitted, the default shell will be spawned.
* @param options The options for creating the `AdbSubprocessProtocol`
* @returns A new `AdbSubprocessProtocol` instance connecting to the spawned process.
*/
shell(
command?: string | string[],
options?: Partial<AdbSubprocessOptions>,
): Promise<AdbSubprocessProtocol> {
return this.#createProtocol("pty", command, options);
}
/**
* Spawns an executable and redirect the standard input/output stream.
*
* Redirection mode is enough for most simple commands, but PTY mode is required for
* commands that manipulate the terminal, such as `vi` and `less`.
* @param command The command to run, or an array of strings containing both command and args.
* @param options The options for creating the `AdbSubprocessProtocol`
* @returns A new `AdbSubprocessProtocol` instance connecting to the spawned process.
*/
spawn(
command: string | string[],
options?: Partial<AdbSubprocessOptions>,
): Promise<AdbSubprocessProtocol> {
return this.#createProtocol("raw", command, options);
}
/**
* Spawns a new process, waits until it exits, and returns the entire output.
* @param command The command to run
* @param options The options for creating the `AdbSubprocessProtocol`
* @returns The entire output of the command
*/
async spawnAndWait(
command: string | string[],
options?: Partial<AdbSubprocessOptions>,
): Promise<AdbSubprocessWaitResult> {
const process = await this.spawn(command, options);
const [stdout, stderr, exitCode] = await Promise.all([
process.stdout
.pipeThrough(new TextDecoderStream())
.pipeThrough(new ConcatStringStream()),
process.stderr
.pipeThrough(new TextDecoderStream())
.pipeThrough(new ConcatStringStream()),
process.exit,
]);
return {
stdout,
stderr,
exitCode,
};
}
/**
* Spawns a new process, waits until it exits, and returns the entire output.
* @param command The command to run
* @returns The entire output of the command
*/
async spawnAndWaitLegacy(command: string | string[]): Promise<string> {
const { stdout } = await this.spawnAndWait(command, {
protocols: [AdbSubprocessNoneProtocol],
});
return stdout;
}
}

View file

@ -1,3 +1,4 @@
export * from "./command.js";
export * from "./protocols/index.js";
export * from "./none/index.js";
export * from "./service.js";
export * from "./shell/index.js";
export * from "./utils.js";

View file

@ -0,0 +1,4 @@
export * from "./process.js";
export * from "./pty.js";
export * from "./service.js";
export * from "./spawner.js";

View file

@ -6,15 +6,15 @@ import { ReadableStream, WritableStream } from "@yume-chan/stream-extra";
import type { AdbSocket } from "../../../adb.js";
import { AdbSubprocessNoneProtocol } from "./none.js";
import { AdbNoneProtocolProcessImpl } from "./process.js";
describe("AdbSubprocessNoneProtocol", () => {
describe("stdout", () => {
describe("AdbNoneProtocolProcessImpl", () => {
describe("output", () => {
it("should pipe data from `socket`", async () => {
const closed = new PromiseResolver<void>();
const socket: AdbSocket = {
const closed = new PromiseResolver<undefined>();
const socket = {
service: "",
close: mock.fn(() => {}),
close: () => {},
closed: closed.promise,
readable: new ReadableStream({
async start(controller) {
@ -25,10 +25,10 @@ describe("AdbSubprocessNoneProtocol", () => {
},
}),
writable: new WritableStream(),
};
} satisfies AdbSocket;
const process = new AdbSubprocessNoneProtocol(socket);
const reader = process.stdout.getReader();
const process = new AdbNoneProtocolProcessImpl(socket);
const reader = process.output.getReader();
assert.deepStrictEqual(await reader.read(), {
done: false,
@ -41,8 +41,8 @@ describe("AdbSubprocessNoneProtocol", () => {
});
it("should close when `socket` is closed", async () => {
const closed = new PromiseResolver<void>();
const socket: AdbSocket = {
const closed = new PromiseResolver<undefined>();
const socket = {
service: "",
close: mock.fn(() => {}),
closed: closed.promise,
@ -55,10 +55,10 @@ describe("AdbSubprocessNoneProtocol", () => {
},
}),
writable: new WritableStream(),
};
} satisfies AdbSocket;
const process = new AdbSubprocessNoneProtocol(socket);
const reader = process.stdout.getReader();
const process = new AdbNoneProtocolProcessImpl(socket);
const reader = process.output.getReader();
assert.deepStrictEqual(await reader.read(), {
done: false,
@ -69,37 +69,7 @@ describe("AdbSubprocessNoneProtocol", () => {
value: new Uint8Array([4, 5, 6]),
});
closed.resolve();
assert.deepStrictEqual(await reader.read(), {
done: true,
value: undefined,
});
});
});
describe("stderr", () => {
it("should be empty", async () => {
const closed = new PromiseResolver<void>();
const socket: AdbSocket = {
service: "",
close: mock.fn(() => {}),
closed: closed.promise,
readable: new ReadableStream({
async start(controller) {
controller.enqueue(new Uint8Array([1, 2, 3]));
controller.enqueue(new Uint8Array([4, 5, 6]));
await closed.promise;
controller.close();
},
}),
writable: new WritableStream(),
};
const process = new AdbSubprocessNoneProtocol(socket);
const reader = process.stderr.getReader();
closed.resolve();
closed.resolve(undefined);
assert.deepStrictEqual(await reader.read(), {
done: true,
@ -110,48 +80,35 @@ describe("AdbSubprocessNoneProtocol", () => {
describe("exit", () => {
it("should resolve when `socket` closes", async () => {
const closed = new PromiseResolver<void>();
const socket: AdbSocket = {
const closed = new PromiseResolver<undefined>();
const socket = {
service: "",
close: mock.fn(() => {}),
close: () => {},
closed: closed.promise,
readable: new ReadableStream(),
writable: new WritableStream(),
};
} satisfies AdbSocket;
const process = new AdbSubprocessNoneProtocol(socket);
const process = new AdbNoneProtocolProcessImpl(socket);
closed.resolve();
closed.resolve(undefined);
assert.strictEqual(await process.exit, 0);
assert.strictEqual(await process.exited, undefined);
});
});
it("`resize` shouldn't throw any error", () => {
const socket: AdbSocket = {
it("`kill` should close `socket`", async () => {
const socket = {
service: "",
close: mock.fn(() => {}),
closed: new PromiseResolver<void>().promise,
closed: new PromiseResolver<undefined>().promise,
readable: new ReadableStream(),
writable: new WritableStream(),
};
} satisfies AdbSocket;
const process = new AdbSubprocessNoneProtocol(socket);
assert.doesNotThrow(() => process.resize());
});
it("`kill` should close `socket`", async () => {
const close = mock.fn(() => {});
const socket: AdbSocket = {
service: "",
close,
closed: new PromiseResolver<void>().promise,
readable: new ReadableStream(),
writable: new WritableStream(),
};
const process = new AdbSubprocessNoneProtocol(socket);
const process = new AdbNoneProtocolProcessImpl(socket);
await process.kill();
assert.deepEqual(close.mock.callCount(), 1);
assert.deepEqual(socket.close.mock.callCount(), 1);
});
});

View file

@ -0,0 +1,56 @@
import type { MaybePromiseLike } from "@yume-chan/async";
import { PromiseResolver } from "@yume-chan/async";
import type {
AbortSignal,
MaybeConsumable,
ReadableStream,
WritableStream,
} from "@yume-chan/stream-extra";
import type { AdbSocket } from "../../../adb.js";
import type { AdbNoneProtocolProcess } from "./spawner.js";
export class AdbNoneProtocolProcessImpl implements AdbNoneProtocolProcess {
readonly #socket: AdbSocket;
get stdin(): WritableStream<MaybeConsumable<Uint8Array>> {
return this.#socket.writable;
}
get output(): ReadableStream<Uint8Array> {
return this.#socket.readable;
}
readonly #exited: Promise<undefined>;
get exited(): Promise<undefined> {
return this.#exited;
}
constructor(socket: AdbSocket, signal?: AbortSignal) {
this.#socket = socket;
if (signal) {
// `signal` won't affect `this.output`
// So remaining data can still be read
// (call `controller.error` will discard all pending data)
const exited = new PromiseResolver<undefined>();
this.#socket.closed.then(
() => exited.resolve(undefined),
(e) => exited.reject(e),
);
signal.addEventListener("abort", () => {
exited.reject(signal.reason);
this.#socket.close();
});
this.#exited = exited.promise;
} else {
this.#exited = this.#socket.closed;
}
}
kill(): MaybePromiseLike<void> {
return this.#socket.close();
}
}

View file

@ -0,0 +1,45 @@
import type { MaybePromiseLike } from "@yume-chan/async";
import type {
ReadableStream,
WritableStream,
WritableStreamDefaultWriter,
} from "@yume-chan/stream-extra";
import { MaybeConsumable } from "@yume-chan/stream-extra";
import type { AdbSocket } from "../../../adb.js";
import type { AdbPtyProcess } from "../pty.js";
export class AdbNoneProtocolPtyProcess implements AdbPtyProcess<undefined> {
readonly #socket: AdbSocket;
readonly #writer: WritableStreamDefaultWriter<MaybeConsumable<Uint8Array>>;
readonly #input: MaybeConsumable.WritableStream<Uint8Array>;
get input(): WritableStream<MaybeConsumable<Uint8Array>> {
return this.#input;
}
get output(): ReadableStream<Uint8Array> {
return this.#socket.readable;
}
get exited(): Promise<undefined> {
return this.#socket.closed;
}
constructor(socket: AdbSocket) {
this.#socket = socket;
this.#writer = this.#socket.writable.getWriter();
this.#input = new MaybeConsumable.WritableStream<Uint8Array>({
write: (chunk) => this.#writer.write(chunk),
});
}
sigint() {
return this.#writer.write(new Uint8Array([0x03]));
}
kill(): MaybePromiseLike<void> {
return this.#socket.close();
}
}

View file

@ -0,0 +1,42 @@
import type { Adb } from "../../../adb.js";
import { AdbNoneProtocolProcessImpl } from "./process.js";
import { AdbNoneProtocolPtyProcess } from "./pty.js";
import { AdbNoneProtocolSpawner } from "./spawner.js";
export class AdbNoneProtocolSubprocessService extends AdbNoneProtocolSpawner {
readonly #adb: Adb;
get adb(): Adb {
return this.#adb;
}
constructor(adb: Adb) {
super(async (command, signal) => {
// `shell,raw:${command}` also triggers raw mode,
// But is not supported on Android version <7.
const socket = await this.#adb.createSocket(
`exec:${command.join(" ")}`,
);
if (signal?.aborted) {
await socket.close();
throw signal.reason;
}
return new AdbNoneProtocolProcessImpl(socket, signal);
});
this.#adb = adb;
}
async pty(command?: string | string[]): Promise<AdbNoneProtocolPtyProcess> {
if (command === undefined) {
command = "";
} else if (Array.isArray(command)) {
command = command.join(" ");
}
return new AdbNoneProtocolPtyProcess(
await this.#adb.createSocket(`shell:${command}`),
);
}
}

View file

@ -0,0 +1,68 @@
import type { MaybePromiseLike } from "@yume-chan/async";
import type {
AbortSignal,
MaybeConsumable,
ReadableStream,
WritableStream,
} from "@yume-chan/stream-extra";
import {
ConcatBufferStream,
ConcatStringStream,
TextDecoderStream,
} from "@yume-chan/stream-extra";
import { splitCommand } from "../utils.js";
export interface AdbNoneProtocolProcess {
get stdin(): WritableStream<MaybeConsumable<Uint8Array>>;
/**
* Mix of stdout and stderr
*/
get output(): ReadableStream<Uint8Array>;
get exited(): Promise<void>;
kill(): MaybePromiseLike<void>;
}
export class AdbNoneProtocolSpawner {
readonly #spawn: (
command: string[],
signal: AbortSignal | undefined,
) => Promise<AdbNoneProtocolProcess>;
constructor(
spawn: (
command: string[],
signal: AbortSignal | undefined,
) => Promise<AdbNoneProtocolProcess>,
) {
this.#spawn = spawn;
}
spawn(
command: string | string[],
signal?: AbortSignal,
): Promise<AdbNoneProtocolProcess> {
signal?.throwIfAborted();
if (typeof command === "string") {
command = splitCommand(command);
}
return this.#spawn(command, signal);
}
async spawnWait(command: string | string[]): Promise<Uint8Array> {
const process = await this.spawn(command);
return await process.output.pipeThrough(new ConcatBufferStream());
}
async spawnWaitText(command: string | string[]): Promise<string> {
const process = await this.spawn(command);
return await process.output
.pipeThrough(new TextDecoderStream())
.pipeThrough(new ConcatStringStream());
}
}

View file

@ -1,3 +0,0 @@
export * from "./none.js";
export * from "./shell.js";
export * from "./types.js";

View file

@ -1,90 +0,0 @@
import type {
AbortSignal,
MaybeConsumable,
WritableStream,
} from "@yume-chan/stream-extra";
import { ReadableStream } from "@yume-chan/stream-extra";
import type { Adb, AdbSocket } from "../../../adb.js";
import type { AdbSubprocessProtocol } from "./types.js";
/**
* The legacy shell
*
* Features:
* * `stderr`: No
* * `exit` exit code: No
* * `resize`: No
*/
export class AdbSubprocessNoneProtocol implements AdbSubprocessProtocol {
static isSupported() {
return true;
}
static async pty(adb: Adb, command: string, signal?: AbortSignal) {
return new AdbSubprocessNoneProtocol(
await adb.createSocket(`shell:${command}`),
signal,
);
}
static async raw(adb: Adb, command: string, signal?: AbortSignal) {
// `shell,raw:${command}` also triggers raw mode,
// But is not supported on Android version <7.
return new AdbSubprocessNoneProtocol(
await adb.createSocket(`exec:${command}`),
signal,
);
}
readonly #socket: AdbSocket;
// Legacy shell forwards all data to stdin.
get stdin(): WritableStream<MaybeConsumable<Uint8Array>> {
return this.#socket.writable;
}
/**
* Legacy shell mixes stdout and stderr.
*/
get stdout(): ReadableStream<Uint8Array> {
return this.#socket.readable;
}
#stderr: ReadableStream<Uint8Array>;
/**
* `stderr` will always be empty.
*/
get stderr(): ReadableStream<Uint8Array> {
return this.#stderr;
}
#exit: Promise<number>;
get exit() {
return this.#exit;
}
constructor(socket: AdbSocket, signal?: AbortSignal) {
signal?.throwIfAborted();
this.#socket = socket;
signal?.addEventListener("abort", () => void this.kill());
this.#stderr = new ReadableStream({
start: async (controller) => {
await this.#socket.closed;
controller.close();
},
});
this.#exit = socket.closed.then(() => 0);
}
resize() {
// Not supported, but don't throw.
}
async kill() {
await this.#socket.close();
}
}

View file

@ -1,177 +0,0 @@
import { PromiseResolver } from "@yume-chan/async";
import type {
AbortSignal,
PushReadableStreamController,
ReadableStream,
WritableStreamDefaultWriter,
} from "@yume-chan/stream-extra";
import {
MaybeConsumable,
PushReadableStream,
StructDeserializeStream,
WritableStream,
} from "@yume-chan/stream-extra";
import type { StructValue } from "@yume-chan/struct";
import { buffer, struct, u32, u8 } from "@yume-chan/struct";
import type { Adb, AdbSocket } from "../../../adb.js";
import { AdbFeature } from "../../../features.js";
import { encodeUtf8 } from "../../../utils/index.js";
import type { AdbSubprocessProtocol } from "./types.js";
export const AdbShellProtocolId = {
Stdin: 0,
Stdout: 1,
Stderr: 2,
Exit: 3,
CloseStdin: 4,
WindowSizeChange: 5,
} as const;
export type AdbShellProtocolId =
(typeof AdbShellProtocolId)[keyof typeof AdbShellProtocolId];
// This packet format is used in both directions.
export const AdbShellProtocolPacket = struct(
{
id: u8<AdbShellProtocolId>(),
data: buffer(u32),
},
{ littleEndian: true },
);
type AdbShellProtocolPacket = StructValue<typeof AdbShellProtocolPacket>;
/**
* Shell v2 a.k.a Shell Protocol
*
* Features:
* * `stderr`: Yes
* * `exit` exit code: Yes
* * `resize`: Yes
*/
export class AdbSubprocessShellProtocol implements AdbSubprocessProtocol {
static isSupported(adb: Adb) {
return adb.canUseFeature(AdbFeature.ShellV2);
}
static async pty(adb: Adb, command: string, signal?: AbortSignal) {
// TODO: AdbShellSubprocessProtocol: Support setting `XTERM` environment variable
return new AdbSubprocessShellProtocol(
await adb.createSocket(`shell,v2,pty:${command}`),
signal,
);
}
static async raw(adb: Adb, command: string, signal?: AbortSignal) {
return new AdbSubprocessShellProtocol(
await adb.createSocket(`shell,v2,raw:${command}`),
signal,
);
}
readonly #socket: AdbSocket;
#writer: WritableStreamDefaultWriter<MaybeConsumable<Uint8Array>>;
#stdin: WritableStream<MaybeConsumable<Uint8Array>>;
get stdin() {
return this.#stdin;
}
#stdout: ReadableStream<Uint8Array>;
get stdout() {
return this.#stdout;
}
#stderr: ReadableStream<Uint8Array>;
get stderr() {
return this.#stderr;
}
readonly #exit = new PromiseResolver<number>();
get exit() {
return this.#exit.promise;
}
constructor(socket: AdbSocket, signal?: AbortSignal) {
signal?.throwIfAborted();
this.#socket = socket;
signal?.addEventListener("abort", () => void this.kill());
let stdoutController!: PushReadableStreamController<Uint8Array>;
let stderrController!: PushReadableStreamController<Uint8Array>;
this.#stdout = new PushReadableStream<Uint8Array>((controller) => {
stdoutController = controller;
});
this.#stderr = new PushReadableStream<Uint8Array>((controller) => {
stderrController = controller;
});
socket.readable
.pipeThrough(new StructDeserializeStream(AdbShellProtocolPacket))
.pipeTo(
new WritableStream<AdbShellProtocolPacket>({
write: async (chunk) => {
switch (chunk.id) {
case AdbShellProtocolId.Exit:
this.#exit.resolve(chunk.data[0]!);
break;
case AdbShellProtocolId.Stdout:
await stdoutController.enqueue(chunk.data);
break;
case AdbShellProtocolId.Stderr:
await stderrController.enqueue(chunk.data);
break;
}
},
}),
)
.then(
() => {
stdoutController.close();
stderrController.close();
// If `#exit` has already resolved, this will be a no-op
this.#exit.reject(
new Error("Socket ended without exit message"),
);
},
(e) => {
stdoutController.error(e);
stderrController.error(e);
// If `#exit` has already resolved, this will be a no-op
this.#exit.reject(e);
},
);
this.#writer = this.#socket.writable.getWriter();
this.#stdin = new MaybeConsumable.WritableStream<Uint8Array>({
write: async (chunk) => {
await this.#writer.write(
AdbShellProtocolPacket.serialize({
id: AdbShellProtocolId.Stdin,
data: chunk,
}),
);
},
});
}
async resize(rows: number, cols: number) {
await this.#writer.write(
AdbShellProtocolPacket.serialize({
id: AdbShellProtocolId.WindowSizeChange,
// The "correct" format is `${rows}x${cols},${x_pixels}x${y_pixels}`
// However, according to https://linux.die.net/man/4/tty_ioctl
// `x_pixels` and `y_pixels` are unused, so always sending `0` should be fine.
data: encodeUtf8(`${rows}x${cols},0x0\0`),
}),
);
}
kill() {
return this.#socket.close();
}
}

View file

@ -1,72 +0,0 @@
import type { MaybePromiseLike } from "@yume-chan/async";
import type {
AbortSignal,
MaybeConsumable,
ReadableStream,
WritableStream,
} from "@yume-chan/stream-extra";
import type { Adb, AdbSocket } from "../../../adb.js";
export interface AdbSubprocessProtocol {
/**
* A WritableStream that writes to the `stdin` stream.
*/
readonly stdin: WritableStream<MaybeConsumable<Uint8Array>>;
/**
* The `stdout` stream of the process.
*/
readonly stdout: ReadableStream<Uint8Array>;
/**
* The `stderr` stream of the process.
*
* Note: Some `AdbSubprocessProtocol` doesn't separate `stdout` and `stderr`,
* All output will be sent to `stdout`.
*/
readonly stderr: ReadableStream<Uint8Array>;
/**
* A `Promise` that resolves to the exit code of the process.
*
* Note: Some `AdbSubprocessProtocol` doesn't support exit code,
* They will always resolve it with `0`.
*/
readonly exit: Promise<number>;
/**
* Resizes the current shell.
*
* Some `AdbSubprocessProtocol`s may not support resizing
* and will ignore calls to this method.
*/
resize(rows: number, cols: number): MaybePromiseLike<void>;
/**
* Kills the current process.
*/
kill(): MaybePromiseLike<void>;
}
export interface AdbSubprocessProtocolConstructor {
/** Returns `true` if the `adb` instance supports this shell */
isSupported(adb: Adb): MaybePromiseLike<boolean>;
/** Spawns an executable in PTY (interactive) mode. */
pty: (
adb: Adb,
command: string,
signal?: AbortSignal,
) => MaybePromiseLike<AdbSubprocessProtocol>;
/** Spawns an executable and pipe the output. */
raw(
adb: Adb,
command: string,
signal?: AbortSignal,
): MaybePromiseLike<AdbSubprocessProtocol>;
/** Creates a new `AdbShell` by attaching to an exist `AdbSocket` */
new (socket: AdbSocket): AdbSubprocessProtocol;
}

View file

@ -0,0 +1,15 @@
import type { MaybePromiseLike } from "@yume-chan/async";
import type {
MaybeConsumable,
ReadableStream,
WritableStream,
} from "@yume-chan/stream-extra";
export interface AdbPtyProcess<TExitCode> {
get input(): WritableStream<MaybeConsumable<Uint8Array>>;
get output(): ReadableStream<Uint8Array>;
get exited(): Promise<TExitCode>;
sigint(): Promise<void>;
kill(): MaybePromiseLike<void>;
}

View file

@ -0,0 +1,32 @@
import type { Adb } from "../../adb.js";
import { AdbFeature } from "../../features.js";
import { AdbNoneProtocolSubprocessService } from "./none/index.js";
import { AdbShellProtocolSubprocessService } from "./shell/index.js";
export class AdbSubprocessService {
#adb: Adb;
get adb() {
return this.#adb;
}
#noneProtocol: AdbNoneProtocolSubprocessService;
get noneProtocol(): AdbNoneProtocolSubprocessService {
return this.#noneProtocol;
}
#shellProtocol?: AdbShellProtocolSubprocessService;
get shellProtocol(): AdbShellProtocolSubprocessService | undefined {
return this.#shellProtocol;
}
constructor(adb: Adb) {
this.#adb = adb;
this.#noneProtocol = new AdbNoneProtocolSubprocessService(adb);
if (adb.canUseFeature(AdbFeature.ShellV2)) {
this.#shellProtocol = new AdbShellProtocolSubprocessService(adb);
}
}
}

View file

@ -0,0 +1,5 @@
export * from "./process.js";
export * from "./pty.js";
export * from "./service.js";
export * from "./shared.js";
export * from "./spawner.js";

View file

@ -7,16 +7,13 @@ import { ReadableStream, WritableStream } from "@yume-chan/stream-extra";
import type { AdbSocket } from "../../../adb.js";
import {
AdbShellProtocolId,
AdbShellProtocolPacket,
AdbSubprocessShellProtocol,
} from "./shell.js";
import { AdbShellProtocolProcessImpl } from "./process.js";
import { AdbShellProtocolId, AdbShellProtocolPacket } from "./shared.js";
function createMockSocket(
readable: (controller: ReadableStreamDefaultController<Uint8Array>) => void,
): [AdbSocket, PromiseResolver<void>] {
const closed = new PromiseResolver<void>();
): [AdbSocket, PromiseResolver<undefined>] {
const closed = new PromiseResolver<undefined>();
const socket: AdbSocket = {
service: "",
close() {},
@ -51,24 +48,12 @@ async function assertResolves<T>(promise: Promise<T>, expected: T) {
return assert.deepStrictEqual(await promise, expected);
}
describe("AdbShellProtocolPacket", () => {
it("should serialize", () => {
assert.deepStrictEqual(
AdbShellProtocolPacket.serialize({
id: AdbShellProtocolId.Stdout,
data: new Uint8Array([1, 2, 3, 4]),
}),
new Uint8Array([1, 4, 0, 0, 0, 1, 2, 3, 4]),
);
});
});
describe("AdbSubprocessShellProtocol", () => {
describe("AdbShellProtocolProcessImpl", () => {
describe("`stdout` and `stderr`", () => {
it("should parse data from `socket", () => {
const [socket] = createMockSocket(() => {});
const process = new AdbSubprocessShellProtocol(socket);
const process = new AdbShellProtocolProcessImpl(socket);
const stdoutReader = process.stdout.getReader();
const stderrReader = process.stderr.getReader();
@ -98,12 +83,12 @@ describe("AdbSubprocessShellProtocol", () => {
);
});
const process = new AdbSubprocessShellProtocol(socket);
const process = new AdbShellProtocolProcessImpl(socket);
const stdoutReader = process.stdout.getReader();
const stderrReader = process.stderr.getReader();
await stdoutReader.cancel();
closed.resolve();
closed.resolve(undefined);
assertResolves(stderrReader.read(), {
done: false,
@ -118,7 +103,7 @@ describe("AdbSubprocessShellProtocol", () => {
describe("`socket` close", () => {
describe("with `exit` message", () => {
it("should close `stdout`, `stderr` and resolve `exit`", async () => {
it("should close `stdout`, `stderr` and resolve `exited`", async () => {
const [socket, closed] = createMockSocket((controller) => {
controller.enqueue(
AdbShellProtocolPacket.serialize({
@ -129,7 +114,7 @@ describe("AdbSubprocessShellProtocol", () => {
controller.close();
});
const process = new AdbSubprocessShellProtocol(socket);
const process = new AdbShellProtocolProcessImpl(socket);
const stdoutReader = process.stdout.getReader();
const stderrReader = process.stderr.getReader();
@ -142,7 +127,7 @@ describe("AdbSubprocessShellProtocol", () => {
value: new Uint8Array([4, 5, 6]),
});
closed.resolve();
closed.resolve(undefined);
assertResolves(stdoutReader.read(), {
done: true,
@ -152,17 +137,17 @@ describe("AdbSubprocessShellProtocol", () => {
done: true,
value: undefined,
});
assert.strictEqual(await process.exit, 42);
assert.strictEqual(await process.exited, 42);
});
});
describe("with no `exit` message", () => {
it("should close `stdout`, `stderr` and reject `exit`", async () => {
it("should close `stdout`, `stderr` and reject `exited`", async () => {
const [socket, closed] = createMockSocket((controller) => {
controller.close();
});
const process = new AdbSubprocessShellProtocol(socket);
const process = new AdbShellProtocolProcessImpl(socket);
const stdoutReader = process.stdout.getReader();
const stderrReader = process.stderr.getReader();
@ -175,7 +160,7 @@ describe("AdbSubprocessShellProtocol", () => {
value: new Uint8Array([4, 5, 6]),
});
closed.resolve();
closed.resolve(undefined);
await Promise.all([
assertResolves(stdoutReader.read(), {
@ -186,20 +171,20 @@ describe("AdbSubprocessShellProtocol", () => {
done: true,
value: undefined,
}),
assert.rejects(process.exit),
assert.rejects(process.exited),
]);
});
});
});
describe("`socket.readable` invalid data", () => {
it("should error `stdout`, `stderr` and reject `exit`", async () => {
it("should error `stdout`, `stderr` and reject `exited`", async () => {
const [socket, closed] = createMockSocket((controller) => {
controller.enqueue(new Uint8Array([7, 8, 9]));
controller.close();
});
const process = new AdbSubprocessShellProtocol(socket);
const process = new AdbShellProtocolProcessImpl(socket);
const stdoutReader = process.stdout.getReader();
const stderrReader = process.stderr.getReader();
@ -212,12 +197,12 @@ describe("AdbSubprocessShellProtocol", () => {
value: new Uint8Array([4, 5, 6]),
});
closed.resolve();
closed.resolve(undefined);
await Promise.all([
assert.rejects(stdoutReader.read()),
assert.rejects(stderrReader.read()),
assert.rejects(process.exit),
assert.rejects(process.exited),
]);
});
});

View file

@ -0,0 +1,127 @@
import type { MaybePromiseLike } from "@yume-chan/async";
import { PromiseResolver } from "@yume-chan/async";
import type {
AbortSignal,
PushReadableStreamController,
ReadableStream,
WritableStreamDefaultWriter,
} from "@yume-chan/stream-extra";
import {
MaybeConsumable,
PushReadableStream,
StructDeserializeStream,
WritableStream,
} from "@yume-chan/stream-extra";
import type { AdbSocket } from "../../../adb.js";
import { AdbShellProtocolId, AdbShellProtocolPacket } from "./shared.js";
import type { AdbShellProtocolProcess } from "./spawner.js";
export class AdbShellProtocolProcessImpl implements AdbShellProtocolProcess {
readonly #socket: AdbSocket;
readonly #writer: WritableStreamDefaultWriter<MaybeConsumable<Uint8Array>>;
readonly #stdin: WritableStream<MaybeConsumable<Uint8Array>>;
get stdin() {
return this.#stdin;
}
readonly #stdout: ReadableStream<Uint8Array>;
get stdout() {
return this.#stdout;
}
readonly #stderr: ReadableStream<Uint8Array>;
get stderr() {
return this.#stderr;
}
readonly #exited: Promise<number>;
get exited() {
return this.#exited;
}
constructor(socket: AdbSocket, signal?: AbortSignal) {
this.#socket = socket;
let stdoutController!: PushReadableStreamController<Uint8Array>;
let stderrController!: PushReadableStreamController<Uint8Array>;
this.#stdout = new PushReadableStream<Uint8Array>((controller) => {
stdoutController = controller;
});
this.#stderr = new PushReadableStream<Uint8Array>((controller) => {
stderrController = controller;
});
const exited = new PromiseResolver<number>();
this.#exited = exited.promise;
socket.readable
.pipeThrough(new StructDeserializeStream(AdbShellProtocolPacket))
.pipeTo(
new WritableStream<AdbShellProtocolPacket>({
write: async (chunk) => {
switch (chunk.id) {
case AdbShellProtocolId.Exit:
exited.resolve(chunk.data[0]!);
break;
case AdbShellProtocolId.Stdout:
await stdoutController.enqueue(chunk.data);
break;
case AdbShellProtocolId.Stderr:
await stderrController.enqueue(chunk.data);
break;
default:
// Ignore unknown messages like Google ADB does
// https://cs.android.com/android/platform/superproject/main/+/main:packages/modules/adb/daemon/shell_service.cpp;l=684;drc=61197364367c9e404c7da6900658f1b16c42d0da
break;
}
},
}),
)
.then(
() => {
stdoutController.close();
stderrController.close();
// If `exited` has already settled, this will be a no-op
exited.reject(
new Error("Socket ended without exit message"),
);
},
(e) => {
stdoutController.error(e);
stderrController.error(e);
// If `exited` has already settled, this will be a no-op
exited.reject(e);
},
);
if (signal) {
// `signal` won't affect `this.stdout` and `this.stderr`
// So remaining data can still be read
// (call `controller.error` will discard all pending data)
signal.addEventListener("abort", () => {
exited.reject(signal.reason);
this.#socket.close();
});
}
this.#writer = this.#socket.writable.getWriter();
this.#stdin = new MaybeConsumable.WritableStream<Uint8Array>({
write: async (chunk) => {
await this.#writer.write(
AdbShellProtocolPacket.serialize({
id: AdbShellProtocolId.Stdin,
data: chunk,
}),
);
},
});
}
kill(): MaybePromiseLike<void> {
return this.#socket.close();
}
}

View file

@ -0,0 +1,112 @@
import { PromiseResolver } from "@yume-chan/async";
import type {
PushReadableStreamController,
ReadableStream,
WritableStreamDefaultWriter,
} from "@yume-chan/stream-extra";
import {
MaybeConsumable,
PushReadableStream,
StructDeserializeStream,
WritableStream,
} from "@yume-chan/stream-extra";
import { encodeUtf8 } from "@yume-chan/struct";
import type { AdbSocket } from "../../../adb.js";
import type { AdbPtyProcess } from "../pty.js";
import { AdbShellProtocolId, AdbShellProtocolPacket } from "./shared.js";
export class AdbShellProtocolPtyProcess implements AdbPtyProcess<number> {
readonly #socket: AdbSocket;
readonly #writer: WritableStreamDefaultWriter<MaybeConsumable<Uint8Array>>;
readonly #input: WritableStream<MaybeConsumable<Uint8Array>>;
get input() {
return this.#input;
}
readonly #stdout: ReadableStream<Uint8Array>;
get output() {
return this.#stdout;
}
readonly #exited = new PromiseResolver<number>();
get exited() {
return this.#exited.promise;
}
constructor(socket: AdbSocket) {
this.#socket = socket;
let stdoutController!: PushReadableStreamController<Uint8Array>;
this.#stdout = new PushReadableStream<Uint8Array>((controller) => {
stdoutController = controller;
});
socket.readable
.pipeThrough(new StructDeserializeStream(AdbShellProtocolPacket))
.pipeTo(
new WritableStream<AdbShellProtocolPacket>({
write: async (chunk) => {
switch (chunk.id) {
case AdbShellProtocolId.Exit:
this.#exited.resolve(chunk.data[0]!);
break;
case AdbShellProtocolId.Stdout:
await stdoutController.enqueue(chunk.data);
break;
}
},
}),
)
.then(
() => {
stdoutController.close();
// If `#exit` has already resolved, this will be a no-op
this.#exited.reject(
new Error("Socket ended without exit message"),
);
},
(e) => {
stdoutController.error(e);
// If `#exit` has already resolved, this will be a no-op
this.#exited.reject(e);
},
);
this.#writer = this.#socket.writable.getWriter();
this.#input = new MaybeConsumable.WritableStream<Uint8Array>({
write: (chunk) => this.#writeStdin(chunk),
});
}
#writeStdin(chunk: Uint8Array) {
return this.#writer.write(
AdbShellProtocolPacket.serialize({
id: AdbShellProtocolId.Stdin,
data: chunk,
}),
);
}
async resize(rows: number, cols: number) {
await this.#writer.write(
AdbShellProtocolPacket.serialize({
id: AdbShellProtocolId.WindowSizeChange,
// The "correct" format is `${rows}x${cols},${x_pixels}x${y_pixels}`
// However, according to https://linux.die.net/man/4/tty_ioctl
// `x_pixels` and `y_pixels` are unused, so always sending `0` should be fine.
data: encodeUtf8(`${rows}x${cols},0x0\0`),
}),
);
}
sigint() {
return this.#writeStdin(new Uint8Array([0x03]));
}
kill() {
return this.#socket.close();
}
}

View file

@ -0,0 +1,58 @@
import type { Adb } from "../../../adb.js";
import { AdbFeature } from "../../../features.js";
import { AdbShellProtocolProcessImpl } from "./process.js";
import { AdbShellProtocolPtyProcess } from "./pty.js";
import { AdbShellProtocolSpawner } from "./spawner.js";
export class AdbShellProtocolSubprocessService extends AdbShellProtocolSpawner {
readonly #adb: Adb;
get adb() {
return this.#adb;
}
get isSupported() {
return this.#adb.canUseFeature(AdbFeature.ShellV2);
}
constructor(adb: Adb) {
super(async (command, signal) => {
const socket = await this.#adb.createSocket(
`shell,v2,raw:${command.join(" ")}`,
);
if (signal?.aborted) {
await socket.close();
throw signal.reason;
}
return new AdbShellProtocolProcessImpl(socket, signal);
});
this.#adb = adb;
}
async pty(options?: {
command?: string | string[] | undefined;
terminalType?: string;
}): Promise<AdbShellProtocolPtyProcess> {
let service = "shell,v2,pty";
if (options?.terminalType) {
service += `,TERM=` + options.terminalType;
}
service += ":";
if (options) {
if (typeof options.command === "string") {
service += options.command;
} else if (Array.isArray(options.command)) {
service += options.command.join(" ");
}
}
return new AdbShellProtocolPtyProcess(
await this.#adb.createSocket(service),
);
}
}

View file

@ -0,0 +1,16 @@
import assert from "node:assert";
import { describe, it } from "node:test";
import { AdbShellProtocolId, AdbShellProtocolPacket } from "./shared.js";
describe("AdbShellProtocolPacket", () => {
it("should serialize", () => {
assert.deepStrictEqual(
AdbShellProtocolPacket.serialize({
id: AdbShellProtocolId.Stdout,
data: new Uint8Array([1, 2, 3, 4]),
}),
new Uint8Array([1, 4, 0, 0, 0, 1, 2, 3, 4]),
);
});
});

View file

@ -0,0 +1,25 @@
import type { StructValue } from "@yume-chan/struct";
import { buffer, struct, u32, u8 } from "@yume-chan/struct";
export const AdbShellProtocolId = {
Stdin: 0,
Stdout: 1,
Stderr: 2,
Exit: 3,
CloseStdin: 4,
WindowSizeChange: 5,
} as const;
export type AdbShellProtocolId =
(typeof AdbShellProtocolId)[keyof typeof AdbShellProtocolId];
// This packet format is used in both directions.
export const AdbShellProtocolPacket = struct(
{
id: u8<AdbShellProtocolId>(),
data: buffer(u32),
},
{ littleEndian: true },
);
export type AdbShellProtocolPacket = StructValue<typeof AdbShellProtocolPacket>;

View file

@ -0,0 +1,90 @@
import type { MaybePromiseLike } from "@yume-chan/async";
import type {
AbortSignal,
MaybeConsumable,
ReadableStream,
WritableStream,
} from "@yume-chan/stream-extra";
import {
ConcatBufferStream,
ConcatStringStream,
TextDecoderStream,
} from "@yume-chan/stream-extra";
import { splitCommand } from "../utils.js";
export interface AdbShellProtocolProcess {
get stdin(): WritableStream<MaybeConsumable<Uint8Array>>;
get stdout(): ReadableStream<Uint8Array>;
get stderr(): ReadableStream<Uint8Array>;
get exited(): Promise<number>;
kill(): MaybePromiseLike<void>;
}
export class AdbShellProtocolSpawner {
readonly #spawn: (
command: string[],
signal: AbortSignal | undefined,
) => Promise<AdbShellProtocolProcess>;
constructor(
spawn: (
command: string[],
signal: AbortSignal | undefined,
) => Promise<AdbShellProtocolProcess>,
) {
this.#spawn = spawn;
}
spawn(
command: string | string[],
signal?: AbortSignal,
): Promise<AdbShellProtocolProcess> {
signal?.throwIfAborted();
if (typeof command === "string") {
command = splitCommand(command);
}
return this.#spawn(command, signal);
}
async spawnWait(
command: string | string[],
): Promise<AdbShellProtocolSpawner.WaitResult<Uint8Array>> {
const process = await this.spawn(command);
const [stdout, stderr, exitCode] = await Promise.all([
process.stdout.pipeThrough(new ConcatBufferStream()),
process.stderr.pipeThrough(new ConcatBufferStream()),
process.exited,
]);
return { stdout, stderr, exitCode };
}
async spawnWaitText(
command: string | string[],
): Promise<AdbShellProtocolSpawner.WaitResult<string>> {
const process = await this.spawn(command);
const [stdout, stderr, exitCode] = await Promise.all([
process.stdout
.pipeThrough(new TextDecoderStream())
.pipeThrough(new ConcatStringStream()),
process.stderr
.pipeThrough(new TextDecoderStream())
.pipeThrough(new ConcatStringStream()),
process.exited,
]);
return { stdout, stderr, exitCode };
}
}
export namespace AdbShellProtocolSpawner {
export interface WaitResult<T> {
stdout: T;
stderr: T;
exitCode: number;
}
}

View file

@ -18,3 +18,44 @@ export function escapeArg(s: string) {
result += `'`;
return result;
}
export function splitCommand(command: string): string[] {
const result: string[] = [];
let quote: string | undefined;
let isEscaped = false;
let start = 0;
for (let i = 0, len = command.length; i < len; i += 1) {
if (isEscaped) {
isEscaped = false;
continue;
}
const char = command.charAt(i);
switch (char) {
case " ":
if (!quote && i !== start) {
result.push(command.substring(start, i));
start = i + 1;
}
break;
case "'":
case '"':
if (!quote) {
quote = char;
} else if (char === quote) {
quote = undefined;
}
break;
case "\\":
isEscaped = true;
break;
}
}
if (start < command.length) {
result.push(command.substring(start));
}
return result;
}

View file

@ -14,7 +14,7 @@ describe("AdbSyncSocket", () => {
{
service: "",
close() {},
closed: Promise.resolve(),
closed: Promise.resolve(undefined),
readable: new ReadableStream(),
writable: new WritableStream(),
},

View file

@ -151,7 +151,7 @@ export class AdbSync {
// It may fail if `filename` already exists.
// Ignore the result.
// TODO: sync: test push mkdir workaround (need an Android 8 device)
await this._adb.subprocess.spawnAndWait([
await this._adb.subprocess.noneProtocol.spawnWait([
"mkdir",
"-p",
escapeArg(dirname(options.filename)),

View file

@ -1,4 +1,4 @@
import { AdbCommandBase } from "./base.js";
import { AdbServiceBase } from "./base.js";
/**
* ADB daemon checks for the following properties in the order of
@ -27,7 +27,7 @@ function parsePort(value: string): number | undefined {
return Number.parseInt(value, 10);
}
export class AdbTcpIpCommand extends AdbCommandBase {
export class AdbTcpIpCommand extends AdbServiceBase {
async getListenAddresses(): Promise<AdbTcpIpListenAddresses> {
const serviceListenAddresses = await this.adb.getProp(
"service.adb.listen_addrs",

View file

@ -54,7 +54,7 @@ export class AdbDaemonSocketController
#closed = false;
#closedPromise = new PromiseResolver<void>();
#closedPromise = new PromiseResolver<undefined>();
get closed() {
return this.#closedPromise.promise;
}
@ -170,7 +170,7 @@ export class AdbDaemonSocketController
dispose() {
this.#readableController.close();
this.#closedPromise.resolve();
this.#closedPromise.resolve(undefined);
}
}
@ -200,7 +200,7 @@ export class AdbDaemonSocket implements AdbDaemonSocketInfo, AdbSocket {
return this.#controller.writable;
}
get closed(): Promise<void> {
get closed(): Promise<undefined> {
return this.#controller.closed;
}

View file

@ -458,10 +458,7 @@ export class AdbServerClient {
disconnected,
);
transport.disconnected.then(
() => waitAbortController.abort(),
() => waitAbortController.abort(),
);
void transport.disconnected.finally(() => waitAbortController.abort());
return transport;
}
@ -507,7 +504,7 @@ export namespace AdbServerClient {
export interface ServerConnection
extends ReadableWritablePair<Uint8Array, MaybeConsumable<Uint8Array>>,
Closeable {
get closed(): Promise<void>;
get closed(): Promise<undefined>;
}
export interface ServerConnector {