feat: rename childprocess to subprocess

This commit is contained in:
Simon Chan 2022-02-15 23:18:39 +08:00
parent 443190e22a
commit 1a8891d4fc
22 changed files with 185 additions and 211 deletions

View file

@ -1,5 +1,5 @@
# `@yume-chan/adb-backend-direct-sockets`
Use Direct Sockets API for plugin-free ADB over WiFi connection.
Use [Direct Sockets API](https://wicg.github.io/direct-sockets/) for plugin-free ADB over WiFi connection.
Note: Direct Sockets API is still under developement in Chrome and requries extra command line arguments to enable. This package is not intended to be used in production, thus not published to NPM registry.
Note: Direct Sockets API is still under development in Chrome and requires extra command line arguments to enable. This package is not intended to be used in production, thus not published to NPM registry.

View file

@ -7,8 +7,8 @@ declare global {
readonly remoteAddress: string;
readonly remotePort: number;
readonly readable: ReadableStream;
readonly writable: WritableStream;
readonly readable: ReadableStream<Uint8Array>;
readonly writable: WritableStream<BufferSource>;
}
interface SocketOptions {
@ -45,22 +45,19 @@ export default class AdbDirectSocketsBackend implements AdbBackend {
private socket: TCPSocket | undefined;
private _readablePassthrough = new TransformStream<Uint8Array, ArrayBuffer>({
private _readableTransformStream = new TransformStream<Uint8Array, ArrayBuffer>({
transform(chunk, controller) {
// Although spec didn't say,
// the chunk always has `byteOffset` of 0 and `byteLength` same as its buffer
controller.enqueue(chunk.buffer);
},
});
public get readable(): ReadableStream<ArrayBuffer> {
return this._readablePassthrough.readable;
return this._readableTransformStream.readable;
}
private _writablePassthrough = new TransformStream<ArrayBuffer, Uint8Array>({
transform(chunk, controller) {
controller.enqueue(new Uint8Array(chunk));
},
});
public get writable(): WritableStream<ArrayBuffer> {
return this._writablePassthrough.writable;
public get writable(): WritableStream<ArrayBuffer> | undefined {
return this.socket?.writable;
}
private _connected = false;
@ -84,8 +81,13 @@ export default class AdbDirectSocketsBackend implements AdbBackend {
});
this.socket = socket;
this.socket.readable.pipeTo(this._readablePassthrough.writable);
this._writablePassthrough.readable.pipeTo(this.socket.writable);
this.socket.readable
.pipeThrough(new TransformStream<Uint8Array, Uint8Array>({
flush: () => {
this.disconnectEvent.fire();
},
}))
.pipeTo(this._readableTransformStream.writable);
this._connected = true;
}

View file

@ -103,6 +103,9 @@ export class AdbWebUsbBackend implements AdbBackend {
cancel: () => {
this.dispose();
},
}, {
highWaterMark: 16 * 1024,
size(chunk) { return chunk.byteLength; },
});
if (this._writable !== undefined) {
this._connected = true;
@ -117,6 +120,9 @@ export class AdbWebUsbBackend implements AdbBackend {
close: () => {
this.dispose();
},
}, {
highWaterMark: 16 * 1024,
size(chunk) { return chunk.byteLength; },
});
if (this.readable !== undefined) {
this._connected = true;

View file

@ -4,4 +4,6 @@ Backend for `@yume-chan/adb` using WebSocket API.
Requires WebSockify softwares to bridge the connection between TCP (ADB over Wi-Fi) and WebSocket.
Note: This package only demostrate the possiblility. It's not intended to be used in production, thus not published to NPM registry.
**WARNING:** WebSocket is an unreliable protocol! When send buffer is full, it will throw away any new-coming data, or even close the connection.
Note: This package only demonstrate the possibility. It's not intended to be used in production, thus not published to NPM registry.

View file

@ -1,4 +1,4 @@
import { AdbBackend, BufferedStream, EventQueue, Stream } from '@yume-chan/adb';
import { AdbBackend, ReadableStream, WritableStream } from '@yume-chan/adb';
import { PromiseResolver } from '@yume-chan/async';
import { EventEmitter } from '@yume-chan/event';
@ -20,7 +20,11 @@ export default class AdbWsBackend implements AdbBackend {
private socket: WebSocket | undefined;
private bufferedStream: BufferedStream<Stream> | undefined;
private _readable: ReadableStream<ArrayBuffer> | undefined;
public get readable() { return this._readable; }
private _writable: WritableStream<ArrayBuffer> | undefined;
public get writable() { return this._writable; }
private _connected = false;
public get connected() { return this._connected; }
@ -44,20 +48,31 @@ export default class AdbWsBackend implements AdbBackend {
};
await resolver.promise;
const queue = new EventQueue<ArrayBuffer>();
socket.onmessage = ({ data }: { data: ArrayBuffer; }) => {
queue.enqueue(data, data.byteLength);
};
socket.onclose = () => {
queue.end();
this._connected = false;
this.disconnectEvent.fire();
};
this._readable = new ReadableStream({
start: (controller) => {
socket.onmessage = ({ data }: { data: ArrayBuffer; }) => {
controller.enqueue(data);
};
socket.onclose = () => {
controller.close();
this._connected = false;
this.disconnectEvent.fire();
};
}
}, {
highWaterMark: 16 * 1024,
size(chunk) { return chunk.byteLength; },
});
this._writable = new WritableStream({
write: (chunk) => {
socket.send(chunk);
},
}, {
highWaterMark: 16 * 1024,
size(chunk) { return chunk.byteLength; },
});
this.socket = socket;
this.bufferedStream = new BufferedStream({
read() { return queue.dequeue(); },
});
this._connected = true;
}
@ -73,10 +88,6 @@ export default class AdbWsBackend implements AdbBackend {
this.socket?.send(buffer);
}
public read(length: number): ArrayBuffer | Promise<ArrayBuffer> {
return this.bufferedStream!.read(length);
}
public dispose(): void | Promise<void> {
this.socket?.close();
}

View file

@ -2,7 +2,7 @@ import { PromiseResolver } from '@yume-chan/async';
import { DisposableList } from '@yume-chan/event';
import { AdbAuthenticationHandler, AdbCredentialStore, AdbDefaultAuthenticators } from './auth';
import { AdbBackend } from './backend';
import { AdbChildProcess, AdbFrameBuffer, AdbPower, AdbReverseCommand, AdbSync, AdbTcpIpCommand, escapeArg, framebuffer, install } from './commands';
import { AdbSubprocess, AdbFrameBuffer, AdbPower, AdbReverseCommand, AdbSync, AdbTcpIpCommand, escapeArg, framebuffer, install } from './commands';
import { AdbFeatures } from './features';
import { AdbCommand } from './packet';
import { AdbLogger, AdbPacketDispatcher, AdbSocket } from './socket';
@ -44,7 +44,7 @@ export class Adb {
private _features: AdbFeatures[] | undefined;
public get features() { return this._features; }
public readonly childProcess: AdbChildProcess;
public readonly subprocess: AdbSubprocess;
public readonly power: AdbPower;
public readonly reverse: AdbReverseCommand;
public readonly tcpip: AdbTcpIpCommand;
@ -53,7 +53,7 @@ export class Adb {
this._backend = backend;
this.packetDispatcher = new AdbPacketDispatcher(backend, logger);
this.childProcess = new AdbChildProcess(this);
this.subprocess = new AdbSubprocess(this);
this.power = new AdbPower(this);
this.reverse = new AdbReverseCommand(this.packetDispatcher);
this.tcpip = new AdbTcpIpCommand(this);
@ -194,24 +194,21 @@ export class Adb {
}
public async getProp(key: string): Promise<string> {
const stdout = await this.childProcess.spawnAndWaitLegacy(
const stdout = await this.subprocess.spawnAndWaitLegacy(
['getprop', key]
);
return stdout.trim();
}
public async rm(...filenames: string[]): Promise<string> {
const stdout = await this.childProcess.spawnAndWaitLegacy(
const stdout = await this.subprocess.spawnAndWaitLegacy(
['rm', '-rf', ...filenames.map(arg => escapeArg(arg))],
);
return stdout;
}
public async install(
apk: ReadableStream<ArrayBuffer>,
onProgress?: (uploaded: number) => void,
): Promise<void> {
return await install(this, apk, onProgress);
public async install(apk: ReadableStream<ArrayBuffer>): Promise<void> {
return await install(this, apk);
}
public async sync(): Promise<AdbSync> {

View file

@ -3,6 +3,6 @@ export * from './framebuffer';
export * from './install';
export * from './power';
export * from './reverse';
export * from './shell';
export * from './subprocess';
export * from './sync';
export * from './tcpip';

View file

@ -1,22 +1,21 @@
import { Adb } from "../adb";
import { ReadableStream } from "../utils";
import { escapeArg } from "./shell";
import { escapeArg } from "./subprocess";
export async function install(
adb: Adb,
apk: ReadableStream<ArrayBuffer>,
onProgress?: (uploaded: number) => void,
): Promise<void> {
const filename = `/data/local/tmp/${Math.random().toString().substring(2)}.apk`;
// Upload apk file to tmp folder
const sync = await adb.sync();
const writable = sync.write(filename, undefined, undefined, onProgress);
const writable = sync.write(filename, undefined, undefined);
await apk.pipeTo(writable);
sync.dispose();
// Invoke `pm install` to install it
await adb.childProcess.spawnAndWaitLegacy(['pm', 'install', escapeArg(filename)]);
await adb.subprocess.spawnAndWaitLegacy(['pm', 'install', escapeArg(filename)]);
// Remove the temp file
await adb.rm(filename);

View file

@ -36,11 +36,11 @@ export class AdbPower extends AdbCommandBase {
}
public powerOff() {
return this.adb.childProcess.spawnAndWaitLegacy(['reboot', '-p']);
return this.adb.subprocess.spawnAndWaitLegacy(['reboot', '-p']);
}
public powerButton(longPress: boolean = false) {
return this.adb.childProcess.spawnAndWaitLegacy(['input', 'keyevent', longPress ? '--longpress POWER' : 'POWER']);
return this.adb.subprocess.spawnAndWaitLegacy(['input', 'keyevent', longPress ? '--longpress POWER' : 'POWER']);
}
/**

View file

@ -1,15 +1,15 @@
import type { Adb } from '../../adb';
import { decodeUtf8, WritableStream } from "../../utils";
import { AdbLegacyShell } from './legacy';
import { AdbShellProtocol } from './protocol';
import type { AdbShell, AdbShellConstructor } from './types';
import { AdbNoneSubprocessProtocol } from './legacy';
import { AdbShellSubprocessProtocol } from './protocol';
import type { AdbSubprocessProtocol, AdbSubprocessProtocolConstructor } from './types';
export * from './legacy';
export * from './protocol';
export * from './types';
export * from './utils';
export interface AdbChildProcessOptions {
export interface AdbSubprocessOptions {
/**
* A list of `AdbShellConstructor`s to be used.
*
@ -21,42 +21,42 @@ export interface AdbChildProcessOptions {
*
* The default value is `[AdbShellProtocol, AdbLegacyShell]`.
*/
shells: AdbShellConstructor[];
protocols: AdbSubprocessProtocolConstructor[];
}
const DefaultOptions: AdbChildProcessOptions = {
shells: [AdbShellProtocol, AdbLegacyShell],
const DefaultOptions: AdbSubprocessOptions = {
protocols: [AdbShellSubprocessProtocol, AdbNoneSubprocessProtocol],
};
export interface ChildProcessResult {
export interface SubprocessResult {
stdout: string;
stderr: string;
exitCode: number;
}
export class AdbChildProcess {
export class AdbSubprocess {
public readonly adb: Adb;
public constructor(adb: Adb) {
this.adb = adb;
}
private async createShell(command: string, options?: Partial<AdbChildProcessOptions>): Promise<AdbShell> {
let { shells } = { ...DefaultOptions, ...options };
private async createProtocol(command: string, options?: Partial<AdbSubprocessOptions>): Promise<AdbSubprocessProtocol> {
let { protocols } = { ...DefaultOptions, ...options };
let shell: AdbShellConstructor | undefined;
for (const item of shells) {
let Constructor: AdbSubprocessProtocolConstructor | undefined;
for (const item of protocols) {
if (await item.isSupported(this.adb)) {
shell = item;
Constructor = item;
break;
}
}
if (!shell) {
throw new Error('No specified shell is supported by the device');
if (!Constructor) {
throw new Error('No specified protocol is supported by the device');
}
return await shell.spawn(this.adb, command);
return await Constructor.spawn(this.adb, command);
}
/**
@ -64,8 +64,8 @@ export class AdbChildProcess {
* @param options The options for creating the `AdbShell`
* @returns A new `AdbShell` instance connecting to the spawned shell process.
*/
public shell(options?: Partial<AdbChildProcessOptions>): Promise<AdbShell> {
return this.createShell('', options);
public shell(options?: Partial<AdbSubprocessOptions>): Promise<AdbSubprocessProtocol> {
return this.createProtocol('', options);
}
/**
@ -74,11 +74,11 @@ export class AdbChildProcess {
* @param options The options for creating the `AdbShell`
* @returns A new `AdbShell` instance connecting to the spawned process.
*/
public spawn(command: string | string[], options?: Partial<AdbChildProcessOptions>): Promise<AdbShell> {
public spawn(command: string | string[], options?: Partial<AdbSubprocessOptions>): Promise<AdbSubprocessProtocol> {
if (Array.isArray(command)) {
command = command.join(' ');
}
return this.createShell(command, options);
return this.createProtocol(command, options);
}
/**
@ -87,7 +87,7 @@ export class AdbChildProcess {
* @param options The options for creating the `AdbShell`
* @returns The entire output of the command
*/
public async spawnAndWait(command: string | string[], options?: Partial<AdbChildProcessOptions>): Promise<ChildProcessResult> {
public async spawnAndWait(command: string | string[], options?: Partial<AdbSubprocessOptions>): Promise<SubprocessResult> {
const shell = await this.spawn(command, options);
// Optimization: rope (concat strings) is faster than `[].join('')`
let stdout = '';
@ -116,7 +116,7 @@ export class AdbChildProcess {
* @returns The entire output of the command
*/
public async spawnAndWaitLegacy(command: string | string[]): Promise<string> {
const { stdout } = await this.spawnAndWait(command, { shells: [AdbLegacyShell] });
const { stdout } = await this.spawnAndWait(command, { protocols: [AdbNoneSubprocessProtocol] });
return stdout;
}
}

View file

@ -1,7 +1,8 @@
import { PromiseResolver } from "@yume-chan/async";
import type { Adb } from "../../adb";
import type { AdbSocket } from "../../socket";
import { ReadableStream, TransformStream } from "../../utils";
import type { AdbShell } from "./types";
import type { AdbSubprocessProtocol } from "./types";
/**
* The legacy shell
@ -11,11 +12,11 @@ import type { AdbShell } from "./types";
* * `exit` exit code: No
* * `resize`: No
*/
export class AdbLegacyShell implements AdbShell {
export class AdbNoneSubprocessProtocol implements AdbSubprocessProtocol {
public static isSupported() { return true; }
public static async spawn(adb: Adb, command: string) {
return new AdbLegacyShell(await adb.createSocket(`shell:${command}`));
return new AdbNoneSubprocessProtocol(await adb.createSocket(`shell:${command}`));
}
private readonly socket: AdbSocket;
@ -31,17 +32,17 @@ export class AdbLegacyShell implements AdbShell {
private _stderr = new TransformStream<ArrayBuffer, ArrayBuffer>();
public get stderr() { return this._stderr.readable; }
private _exit: Promise<number>;
public get exit() { return this._exit; }
private _exit = new PromiseResolver<number>();
public get exit() { return this._exit.promise; }
public constructor(socket: AdbSocket) {
this.socket = socket;
let exit;
[this._stdout, exit] = this.socket.readable.tee();
this._exit = exit.getReader().closed.then(() => {
this._stderr.writable.close();
return 0;
});
this._stdout = this.socket.readable.pipeThrough(new TransformStream({
flush: () => {
this._stderr.writable.close();
this._exit.resolve(0);
},
}));
}
public resize() {

View file

@ -5,7 +5,7 @@ import { AdbFeatures } from "../../features";
import type { AdbSocket } from "../../socket";
import { AdbBufferedStream } from "../../stream";
import { encodeUtf8, TransformStream, WritableStream, WritableStreamDefaultWriter } from "../../utils";
import type { AdbShell } from "./types";
import type { AdbSubprocessProtocol } from "./types";
export enum AdbShellProtocolId {
Stdin,
@ -26,22 +26,6 @@ function assertUnreachable(x: never): never {
throw new Error("Unreachable");
}
class WritableMultiplexer<T> {
private writer: WritableStreamDefaultWriter<T>;
public constructor(writer: WritableStreamDefaultWriter<T>) {
this.writer = writer;
}
public createWritable(): WritableStream<T> {
return new WritableStream({
write: (chunk) => {
return this.writer.write(chunk);
},
});
}
}
class StdinTransformStream extends TransformStream<ArrayBuffer, ArrayBuffer>{
constructor() {
super({
@ -65,20 +49,19 @@ class StdinTransformStream extends TransformStream<ArrayBuffer, ArrayBuffer>{
* * `exit` exit code: Yes
* * `resize`: Yes
*/
export class AdbShellProtocol implements AdbShell {
export class AdbShellSubprocessProtocol implements AdbSubprocessProtocol {
public static isSupported(adb: Adb) {
return adb.features!.includes(AdbFeatures.ShellV2);
}
public static async spawn(adb: Adb, command: string) {
// TODO: the service string may support more parameters
return new AdbShellProtocol(await adb.createSocket(`shell,v2,pty:${command}`));
return new AdbShellSubprocessProtocol(await adb.createSocket(`shell,v2,pty:${command}`));
}
private readonly stream: AdbBufferedStream;
private readonly _buffered: AdbBufferedStream;
private _writableMultiplexer: WritableMultiplexer<ArrayBuffer>;
private _writer: WritableStreamDefaultWriter<ArrayBuffer>;
private _socketWriter: WritableStreamDefaultWriter<ArrayBuffer>;
private _stdin = new StdinTransformStream();
public get stdin() { return this._stdin.writable; }
@ -95,26 +78,39 @@ export class AdbShellProtocol implements AdbShell {
public get exit() { return this._exit.promise; }
public constructor(socket: AdbSocket) {
this.stream = new AdbBufferedStream(socket);
this._buffered = new AdbBufferedStream(socket);
this.readData();
this._writableMultiplexer = new WritableMultiplexer(socket.writable.getWriter());
this._writer = this._writableMultiplexer.createWritable().getWriter();
this._stdin.readable.pipeTo(this._writableMultiplexer.createWritable());
this._socketWriter = socket.writable.getWriter();
this._stdin.readable.pipeTo(new WritableStream<ArrayBuffer>({
write: async (chunk) => {
await this._socketWriter.ready;
await this._socketWriter.write(chunk);
},
close() {
// TODO: Shell protocol: close stdin
},
}, {
highWaterMark: 16 * 1024,
size(chunk) { return chunk.byteLength; },
}));
}
private async readData() {
while (true) {
try {
// TODO: add back pressure to AdbShellProtocol
const packet = await AdbShellProtocolPacket.deserialize(this.stream);
const packet = await AdbShellProtocolPacket.deserialize(this._buffered);
switch (packet.id) {
case AdbShellProtocolId.Stdout:
await this._stdoutWriter.ready;
this._stdoutWriter.write(packet.data);
break;
case AdbShellProtocolId.Stderr:
await this._stderrWriter.ready;
this._stderrWriter.write(packet.data);
break;
case AdbShellProtocolId.Exit:
this._stdoutWriter.close();
this._stderrWriter.close();
this._exit.resolve(new Uint8Array(packet.data)[0]!);
break;
case AdbShellProtocolId.CloseStdin:
@ -132,7 +128,7 @@ export class AdbShellProtocol implements AdbShell {
}
public async resize(rows: number, cols: number) {
await this._writer.write(
await this._socketWriter.write(
AdbShellProtocolPacket.serialize(
{
id: AdbShellProtocolId.WindowSizeChange,
@ -148,6 +144,6 @@ export class AdbShellProtocol implements AdbShell {
}
public kill() {
return this.stream.close();
return this._buffered.close();
}
}

View file

@ -3,7 +3,7 @@ import type { Adb } from "../../adb";
import type { AdbSocket } from "../../socket";
import type { ReadableStream, WritableStream } from "../../utils";
export interface AdbShell {
export interface AdbSubprocessProtocol {
/**
* A WritableStream that writes to the `stdin` pipe.
*/
@ -43,13 +43,13 @@ export interface AdbShell {
kill(): ValueOrPromise<void>;
}
export interface AdbShellConstructor {
export interface AdbSubprocessProtocolConstructor {
/** Returns `true` if the `adb` instance supports this shell */
isSupported(adb: Adb): ValueOrPromise<boolean>;
/** Creates a new `AdbShell` using the specified `Adb` and `command` */
spawn(adb: Adb, command: string): ValueOrPromise<AdbShell>;
spawn(adb: Adb, command: string): ValueOrPromise<AdbSubprocessProtocol>;
/** Creates a new `AdbShell` by attaching to an exist `AdbSocket` */
new(socket: AdbSocket): AdbShell;
new(socket: AdbSocket): AdbSubprocessProtocol;
}

View file

@ -19,15 +19,10 @@ export function adbSyncPull(
stream: AdbBufferedStream,
writer: WritableStreamDefaultWriter<ArrayBuffer>,
path: string,
bufferSize: number = 16 * 1024,
): ReadableStream<ArrayBuffer> {
return new ReadableStream({
async start(controller) {
try {
await adbSyncWriteRequest(writer, AdbSyncRequestId.Receive, path);
} catch (e) {
controller.error(e);
}
async start() {
await adbSyncWriteRequest(writer, AdbSyncRequestId.Receive, path);
},
async pull(controller) {
const response = await adbSyncReadResponse(stream, ResponseTypes);
@ -39,12 +34,11 @@ export function adbSyncPull(
controller.close();
break;
default:
controller.error(new Error('Unexpected response id'));
break;
throw new Error('Unexpected response id');
}
}
}, {
highWaterMark: bufferSize,
highWaterMark: 16 * 1024,
size(chunk) { return chunk.byteLength; }
});
}

View file

@ -22,9 +22,7 @@ export function adbSyncPush(
mode: number = (LinuxFileType.File << 12) | 0o666,
mtime: number = (Date.now() / 1000) | 0,
packetSize: number = AdbSyncMaxPacketSize,
onProgress?: (uploaded: number) => void,
): WritableStream<ArrayBuffer> {
let uploaded = 0;
return new WritableStream({
async start() {
const pathAndMode = `${filename},${mode.toString()}`;
@ -33,13 +31,14 @@ export function adbSyncPush(
async write(chunk) {
for (const buffer of chunkArrayLike(chunk, packetSize)) {
await adbSyncWriteRequest(writer, AdbSyncRequestId.Data, buffer);
uploaded += buffer.byteLength;
onProgress?.(uploaded);
}
},
async close() {
await adbSyncWriteRequest(writer, AdbSyncRequestId.Done, mtime);
await adbSyncReadResponse(stream, ResponseTypes);
}
}, {
highWaterMark: 16 * 1024,
size(chunk) { return chunk.byteLength; }
});
}

View file

@ -120,5 +120,5 @@ export async function adbSyncStat(
path: string,
): Promise<AdbSyncStatResponse> {
await adbSyncWriteRequest(writer, AdbSyncRequestId.Stat, path);
return adbSyncReadResponse(stream, StatResponseType);
return await adbSyncReadResponse(stream, StatResponseType);
}

View file

@ -104,27 +104,17 @@ export class AdbSync extends AutoDisposable {
return results;
}
public read(filename: string, highWaterMark = 16 * 1024): ReadableStream<ArrayBuffer> {
const readable = adbSyncPull(this.stream, this.writer, filename, highWaterMark);
return readable.pipeThrough(new LockTransformStream(
this.sendLock,
{ highWaterMark, size(chunk) { return chunk.byteLength; } },
{ highWaterMark, size(chunk) { return chunk.byteLength; } }
));
public read(filename: string): ReadableStream<ArrayBuffer> {
const readable = adbSyncPull(this.stream, this.writer, filename);
return readable.pipeThrough(new LockTransformStream(this.sendLock));
}
public write(
filename: string,
mode?: number,
mtime?: number,
onProgress?: (uploaded: number) => void,
highWaterMark = 16 * 1024,
): WritableStream<ArrayBuffer> {
const lockStream = new LockTransformStream<ArrayBuffer>(
this.sendLock,
{ highWaterMark, size(chunk) { return chunk.byteLength; } },
{ highWaterMark, size(chunk) { return chunk.byteLength; } }
);
const lockStream = new LockTransformStream<ArrayBuffer>(this.sendLock);
const writable = adbSyncPush(
this.stream,
@ -132,8 +122,6 @@ export class AdbSync extends AutoDisposable {
filename,
mode,
mtime,
undefined,
onProgress
);
lockStream.readable.pipeTo(writable);

View file

@ -30,7 +30,7 @@ export type AdbPacket = typeof AdbPacketStruct['TDeserializeResult'];
export type AdbPacketInit = Omit<typeof AdbPacketStruct['TInit'], 'checksum' | 'magic'>;
export class AdbPacketStream extends TransformStream<ArrayBuffer, AdbPacket> {
private _passthrough = new TransformStream<ArrayBuffer, ArrayBuffer>({});
private _passthrough = new TransformStream<ArrayBuffer, ArrayBuffer>();
private _passthroughWriter = this._passthrough.writable.getWriter();
private _buffered = new BufferedStream(this._passthrough.readable);
private _controller!: TransformStreamDefaultController<AdbPacket>;

View file

@ -1,8 +1,7 @@
// cspell: ignore bugreport
// cspell: ignore bugreportz
import { AdbCommandBase, AdbShellProtocol, decodeUtf8, EventQueue, EventQueueEndedError } from "@yume-chan/adb";
import { once } from "@yume-chan/event";
import { AdbCommandBase, AdbShellSubprocessProtocol, decodeUtf8, TransformStream } from "@yume-chan/adb";
function* splitLines(text: string): Generator<string, void, void> {
let start = 0;
@ -43,11 +42,11 @@ export class BugReportZ extends AdbCommandBase {
*/
public async version(): Promise<BugReportVersion | undefined> {
// bugreportz requires shell protocol
if (!AdbShellProtocol.isSupported(this.adb)) {
if (!AdbShellSubprocessProtocol.isSupported(this.adb)) {
return undefined;
}
const { stderr, exitCode } = await this.adb.childProcess.spawnAndWait(['bugreportz', '-v']);
const { stderr, exitCode } = await this.adb.subprocess.spawnAndWait(['bugreportz', '-v']);
if (exitCode !== 0 || stderr === '') {
return undefined;
}
@ -67,22 +66,9 @@ export class BugReportZ extends AdbCommandBase {
return version.major > 1 || version.minor >= 2;
}
public async *stream(): AsyncGenerator<ArrayBuffer, void, void> {
const process = await this.adb.childProcess.spawn(['bugreportz', '-s']);
const queue = new EventQueue<ArrayBuffer>();
process.onStdout(buffer => queue.enqueue(buffer));
process.onExit(() => queue.end());
try {
while (true) {
yield await queue.dequeue();
}
} catch (e) {
if (e instanceof EventQueueEndedError) {
return;
}
throw e;
}
public async stream(): Promise<ReadableStream<ArrayBuffer>> {
const process = await this.adb.subprocess.spawn(['bugreportz', '-s']);
return process.stdout;
}
public supportProgress(version: BugReportVersion): boolean {
@ -98,7 +84,7 @@ export class BugReportZ extends AdbCommandBase {
* @returns The path of the bugreport file.
*/
public async generate(onProgress?: (progress: string, total: string) => void): Promise<string> {
const process = await this.adb.childProcess.spawn([
const process = await this.adb.subprocess.spawn([
'bugreportz',
...(onProgress ? ['-p'] : []),
]);
@ -106,33 +92,35 @@ export class BugReportZ extends AdbCommandBase {
let filename: string | undefined;
let error: string | undefined;
process.onStdout(buffer => {
const string = decodeUtf8(buffer);
for (const line of splitLines(string)) {
// (Not 100% sure) `BEGIN:` and `PROGRESS:` only appear when `-p` is specified.
let match = line.match(BugReportZ.PROGRESS_REGEX);
if (match) {
onProgress?.(match[1]!, match[2]!);
}
await process.stdout.pipeTo(new WritableStream({
write(chunk) {
const string = decodeUtf8(chunk);
for (const line of splitLines(string)) {
// (Not 100% sure) `BEGIN:` and `PROGRESS:` only appear when `-p` is specified.
let match = line.match(BugReportZ.PROGRESS_REGEX);
if (match) {
onProgress?.(match[1]!, match[2]!);
}
match = line.match(BugReportZ.BEGIN_REGEX);
if (match) {
filename = match[1]!;
}
match = line.match(BugReportZ.BEGIN_REGEX);
if (match) {
filename = match[1]!;
}
match = line.match(BugReportZ.OK_REGEX);
if (match) {
filename = match[1];
}
match = line.match(BugReportZ.OK_REGEX);
if (match) {
filename = match[1];
}
match = line.match(BugReportZ.FAIL_REGEX);
if (match) {
error = match[1];
match = line.match(BugReportZ.FAIL_REGEX);
if (match) {
// Don't report error now
// We want to gather all output.
error = match[1];
}
}
}
});
await once(process.onExit);
}));
if (error) {
throw new Error(error);
@ -148,21 +136,12 @@ export class BugReportZ extends AdbCommandBase {
}
export class BugReport extends AdbCommandBase {
public async *generate(): AsyncGenerator<string, void, void> {
const process = await this.adb.childProcess.spawn(['bugreport']);
const queue = new EventQueue<ArrayBuffer>();
process.onStdout(buffer => queue.enqueue(buffer));
process.onExit(() => queue.end());
try {
while (true) {
yield decodeUtf8(await queue.dequeue());
public async generate(): Promise<ReadableStream<string>> {
const process = await this.adb.subprocess.spawn(['bugreport']);
return process.stdout.pipeThrough(new TransformStream({
transform(chunk, controller) {
controller.enqueue(decodeUtf8(chunk));
}
} catch (e) {
if (e instanceof EventQueueEndedError) {
return;
}
throw e;
}
}));
}
}

View file

@ -71,7 +71,7 @@ export class DemoMode extends AdbCommandBase {
}
public async broadcast(command: string, extra?: Record<string, string>): Promise<void> {
await this.adb.childProcess.spawnAndWaitLegacy([
await this.adb.subprocess.spawnAndWaitLegacy([
'am',
'broadcast',
'-a',

View file

@ -9,7 +9,7 @@ export class Settings extends AdbCommandBase {
// TODO: `--user <user>` argument
public base(command: string, namespace: SettingsNamespace, ...args: string[]) {
return this.adb.childProcess.spawnAndWaitLegacy(['settings', command, namespace, ...args]);
return this.adb.subprocess.spawnAndWaitLegacy(['settings', command, namespace, ...args]);
}
public get(namespace: SettingsNamespace, key: string) {