From fe5cb5a176b1b8a7f7fcfda5dc12082494e87647 Mon Sep 17 00:00:00 2001 From: Simon Chan <1330321+yume-chan@users.noreply.github.com> Date: Tue, 17 Oct 2023 21:13:36 +0800 Subject: [PATCH] feat(adb): don't kill subprocess on stream close --- libraries/adb-server-node-tcp/src/index.ts | 46 ++++++---- libraries/adb/src/adb.ts | 4 +- .../src/commands/subprocess/protocols/none.ts | 27 +++--- .../commands/subprocess/protocols/shell.ts | 9 +- libraries/adb/src/server/client.ts | 88 +++++++++++-------- libraries/stream-extra/src/push-readable.ts | 5 +- 6 files changed, 102 insertions(+), 77 deletions(-) diff --git a/libraries/adb-server-node-tcp/src/index.ts b/libraries/adb-server-node-tcp/src/index.ts index 49216584..43b7c698 100644 --- a/libraries/adb-server-node-tcp/src/index.ts +++ b/libraries/adb-server-node-tcp/src/index.ts @@ -5,8 +5,8 @@ import type { AdbIncomingSocketHandler, AdbServerConnection, AdbServerConnectionOptions, + AdbServerConnector, } from "@yume-chan/adb"; -import type { ReadableWritablePair } from "@yume-chan/stream-extra"; import { PushReadableStream, UnwrapConsumableStream, @@ -15,12 +15,21 @@ import { } from "@yume-chan/stream-extra"; import type { ValueOrPromise } from "@yume-chan/struct"; -function nodeSocketToStreamPair(socket: Socket) { +function nodeSocketToConnection(socket: Socket): AdbServerConnection { socket.setNoDelay(true); + + const closed = new Promise((resolve) => { + socket.on("close", resolve); + }); + return { readable: new PushReadableStream((controller) => { // eslint-disable-next-line @typescript-eslint/no-misused-promises socket.on("data", async (data) => { + if (controller.abortSignal.aborted) { + return; + } + socket.pause(); await controller.enqueue(data); socket.resume(); @@ -32,9 +41,6 @@ function nodeSocketToStreamPair(socket: Socket) { // controller already closed } }); - controller.abortSignal.addEventListener("abort", () => { - socket.end(); - }); }), writable: new WritableStream({ write: async (chunk) => { @@ -48,16 +54,17 @@ function nodeSocketToStreamPair(socket: Socket) { }); }); }, - close() { - return new Promise((resolve) => { - socket.end(resolve); - }); - }, }), + get closed() { + return closed; + }, + close() { + socket.end(); + }, }; } -export class AdbServerNodeTcpConnection implements AdbServerConnection { +export class AdbServerNodeTcpConnector implements AdbServerConnector { readonly spec: SocketConnectOpts; readonly #listeners = new Map(); @@ -68,7 +75,7 @@ export class AdbServerNodeTcpConnection implements AdbServerConnection { async connect( { unref }: AdbServerConnectionOptions = { unref: false }, - ): Promise> { + ): Promise { const socket = new Socket(); if (unref) { socket.unref(); @@ -78,7 +85,7 @@ export class AdbServerNodeTcpConnection implements AdbServerConnection { socket.once("connect", resolve); socket.once("error", reject); }); - return nodeSocketToStreamPair(socket); + return nodeSocketToConnection(socket); } async addReverseTunnel( @@ -87,16 +94,19 @@ export class AdbServerNodeTcpConnection implements AdbServerConnection { ): Promise { // eslint-disable-next-line @typescript-eslint/no-misused-promises const server = new Server(async (socket) => { - const stream = nodeSocketToStreamPair(socket); + const connection = nodeSocketToConnection(socket); try { await handler({ service: address!, - readable: stream.readable, + readable: connection.readable, writable: new WrapWritableStream( - stream.writable, + connection.writable, ).bePipedThroughFrom(new UnwrapConsumableStream()), - close() { - socket.end(); + get closed() { + return connection.closed; + }, + async close() { + await connection.close(); }, }); } catch { diff --git a/libraries/adb/src/adb.ts b/libraries/adb/src/adb.ts index 10f4f8d1..bdb384d2 100644 --- a/libraries/adb/src/adb.ts +++ b/libraries/adb/src/adb.ts @@ -22,7 +22,9 @@ export interface Closeable { export interface AdbSocket extends ReadableWritablePair>, Closeable { - readonly service: string; + get service(): string; + + get closed(): Promise; } export type AdbIncomingSocketHandler = ( diff --git a/libraries/adb/src/commands/subprocess/protocols/none.ts b/libraries/adb/src/commands/subprocess/protocols/none.ts index 316ecb4b..e012d297 100644 --- a/libraries/adb/src/commands/subprocess/protocols/none.ts +++ b/libraries/adb/src/commands/subprocess/protocols/none.ts @@ -1,7 +1,8 @@ import type { Consumable, WritableStream } from "@yume-chan/stream-extra"; -import { DuplexStreamFactory, ReadableStream } from "@yume-chan/stream-extra"; +import { ReadableStream } from "@yume-chan/stream-extra"; import type { Adb, AdbSocket } from "../../../adb.js"; +import { unreachable } from "../../../utils/index.js"; import type { AdbSubprocessProtocol } from "./types.js"; @@ -34,19 +35,16 @@ export class AdbSubprocessNoneProtocol implements AdbSubprocessProtocol { readonly #socket: AdbSocket; - readonly #duplex: DuplexStreamFactory; - // Legacy shell forwards all data to stdin. get stdin(): WritableStream> { return this.#socket.writable; } - #stdout: ReadableStream; /** * Legacy shell mixes stdout and stderr. */ get stdout(): ReadableStream { - return this.#stdout; + return this.#socket.readable; } #stderr: ReadableStream; @@ -65,24 +63,21 @@ export class AdbSubprocessNoneProtocol implements AdbSubprocessProtocol { constructor(socket: AdbSocket) { this.#socket = socket; - // Link `stdout`, `stderr` and `stdin` together, - // so closing any of them will close the others. - this.#duplex = new DuplexStreamFactory({ - close: async () => { - await this.#socket.close(); + this.#stderr = new ReadableStream({ + start: (controller) => { + this.#socket.closed + .then(() => controller.close()) + .catch(unreachable); }, }); - - this.#stdout = this.#duplex.wrapReadable(this.#socket.readable); - this.#stderr = this.#duplex.wrapReadable(new ReadableStream()); - this.#exit = this.#duplex.closed.then(() => 0); + this.#exit = socket.closed.then(() => 0); } resize() { // Not supported, but don't throw. } - kill() { - return this.#duplex.close(); + async kill() { + await this.#socket.close(); } } diff --git a/libraries/adb/src/commands/subprocess/protocols/shell.ts b/libraries/adb/src/commands/subprocess/protocols/shell.ts index 124f2440..2a087e3f 100644 --- a/libraries/adb/src/commands/subprocess/protocols/shell.ts +++ b/libraries/adb/src/commands/subprocess/protocols/shell.ts @@ -159,6 +159,7 @@ export class AdbSubprocessShellProtocol implements AdbSubprocessProtocol { let stdoutController!: PushReadableStreamController; let stderrController!: PushReadableStreamController; + this.#stdout = new PushReadableStream((controller) => { stdoutController = controller; }); @@ -176,10 +177,14 @@ export class AdbSubprocessShellProtocol implements AdbSubprocessProtocol { this.#exit.resolve(chunk.data[0]!); break; case AdbShellProtocolId.Stdout: - await stdoutController.enqueue(chunk.data); + if (!stdoutController.abortSignal.aborted) { + await stdoutController.enqueue(chunk.data); + } break; case AdbShellProtocolId.Stderr: - await stderrController.enqueue(chunk.data); + if (!stderrController.abortSignal.aborted) { + await stderrController.enqueue(chunk.data); + } break; } }, diff --git a/libraries/adb/src/server/client.ts b/libraries/adb/src/server/client.ts index e1eb92de..d8071e09 100644 --- a/libraries/adb/src/server/client.ts +++ b/libraries/adb/src/server/client.ts @@ -3,13 +3,11 @@ import { PromiseResolver } from "@yume-chan/async"; import type { AbortSignal, - Consumable, ReadableWritablePair, WritableStreamDefaultWriter, } from "@yume-chan/stream-extra"; import { BufferedReadableStream, - DuplexStreamFactory, UnwrapConsumableStream, WrapWritableStream, } from "@yume-chan/stream-extra"; @@ -25,7 +23,7 @@ import { encodeUtf8, } from "@yume-chan/struct"; -import type { AdbIncomingSocketHandler, AdbSocket } from "../adb.js"; +import type { AdbIncomingSocketHandler, AdbSocket, Closeable } from "../adb.js"; import { AdbBanner } from "../banner.js"; import type { AdbFeature } from "../features.js"; import { NOOP, hexToNumber, numberToHex } from "../utils/index.js"; @@ -37,10 +35,16 @@ export interface AdbServerConnectionOptions { signal?: AbortSignal | undefined; } -export interface AdbServerConnection { +export interface AdbServerConnection + extends ReadableWritablePair, + Closeable { + get closed(): Promise; +} + +export interface AdbServerConnector { connect( options?: AdbServerConnectionOptions, - ): ValueOrPromise>; + ): ValueOrPromise; addReverseTunnel( handler: AdbIncomingSocketHandler, @@ -74,9 +78,9 @@ export interface AdbServerDevice { export class AdbServerClient { static readonly VERSION = 41; - readonly connection: AdbServerConnection; + readonly connection: AdbServerConnector; - constructor(connection: AdbServerConnection) { + constructor(connection: AdbServerConnector) { this.connection = connection; } @@ -126,30 +130,41 @@ export class AdbServerClient { async connect( request: string, options?: AdbServerConnectionOptions, - ): Promise> { + ): Promise { const connection = await this.connection.connect(options); - const writer = connection.writable.getWriter(); - await AdbServerClient.writeString(writer, request); + try { + const writer = connection.writable.getWriter(); + await AdbServerClient.writeString(writer, request); + writer.releaseLock(); + } catch (e) { + await connection.readable.cancel(); + await connection.close(); + throw e; + } const readable = new BufferedReadableStream(connection.readable); - try { - // `raceSignal` throws if the signal is aborted, + // `raceSignal` throws when the signal is aborted, // so the `catch` block can close the connection. await raceSignal( () => AdbServerClient.readOkay(readable), options?.signal, ); - writer.releaseLock(); return { readable: readable.release(), writable: connection.writable, + get closed() { + return connection.closed; + }, + async close() { + await connection.close(); + }, }; } catch (e) { - writer.close().catch(NOOP); - readable.cancel().catch(NOOP); + await readable.cancel().catch(NOOP); + await connection.close(); throw e; } } @@ -328,8 +343,18 @@ export class AdbServerClient { } const connection = await this.connect(switchService); + + try { + const writer = connection.writable.getWriter(); + await AdbServerClient.writeString(writer, service); + writer.releaseLock(); + } catch (e) { + await connection.readable.cancel(); + await connection.close(); + throw e; + } + const readable = new BufferedReadableStream(connection.readable); - const writer = connection.writable.getWriter(); try { if (transportId === undefined) { const array = await readable.readExactly(8); @@ -342,34 +367,25 @@ export class AdbServerClient { transportId = BigIntFieldType.Uint64.getter(dataView, 0, true); } - await AdbServerClient.writeString(writer, service); await AdbServerClient.readOkay(readable); - writer.releaseLock(); - - const duplex = new DuplexStreamFactory< - Uint8Array, - Consumable - >(); - const wrapReadable = duplex.wrapReadable(readable.release()); - const wrapWritable = duplex.createWritable( - new WrapWritableStream(connection.writable).bePipedThroughFrom( - new UnwrapConsumableStream(), - ), - ); - return { transportId, service, - readable: wrapReadable, - writable: wrapWritable, - close() { - return duplex.close(); + readable: readable.release(), + writable: new WrapWritableStream( + connection.writable, + ).bePipedThroughFrom(new UnwrapConsumableStream()), + get closed() { + return connection.closed; + }, + async close() { + await connection.close(); }, }; } catch (e) { - writer.close().catch(NOOP); - readable.cancel().catch(NOOP); + await readable.cancel().catch(NOOP); + await connection.close(); throw e; } } diff --git a/libraries/stream-extra/src/push-readable.ts b/libraries/stream-extra/src/push-readable.ts index ab37b71e..0db59c0e 100644 --- a/libraries/stream-extra/src/push-readable.ts +++ b/libraries/stream-extra/src/push-readable.ts @@ -45,10 +45,7 @@ export class PushReadableStream extends ReadableStream { if (abortController.signal.aborted) { // If the stream is already cancelled, // throw immediately. - throw ( - abortController.signal.reason ?? - new Error("Aborted") - ); + throw abortController.signal.reason; } if (controller.desiredSize === null) {