diff --git a/apps/demo/src/components/connect.tsx b/apps/demo/src/components/connect.tsx index 0db107ad..26b3f275 100644 --- a/apps/demo/src/components/connect.tsx +++ b/apps/demo/src/components/connect.tsx @@ -169,6 +169,14 @@ function _Connect(): JSX.Element | null { return; } + function dispose() { + // Adb won't close the streams, + // so manually close them. + try { readable.cancel(); } catch { } + try { writable.close(); } catch { } + globalState.setDevice(undefined, undefined); + } + try { const device = await Adb.authenticate( { readable, writable }, @@ -177,20 +185,16 @@ function _Connect(): JSX.Element | null { ); device.disconnected.then(() => { - globalState.setDevice(undefined, undefined); + dispose(); }, (e) => { globalState.showErrorDialog(e); - globalState.setDevice(undefined, undefined); + dispose(); }); globalState.setDevice(selectedBackend, device); } catch (e: any) { globalState.showErrorDialog(e); - - // The streams are still open when Adb authentication failed, - // manually close them to release the device. - readable.cancel(); - writable.close(); + dispose(); } finally { setConnecting(false); } diff --git a/libraries/adb-backend-webusb/src/backend.ts b/libraries/adb-backend-webusb/src/backend.ts index 66954325..bb5c4b5e 100644 --- a/libraries/adb-backend-webusb/src/backend.ts +++ b/libraries/adb-backend-webusb/src/backend.ts @@ -1,4 +1,4 @@ -import { AdbPacketHeader, AdbPacketSerializeStream, DuplexStreamFactory, pipeFrom, ReadableStream, type AdbBackend, type AdbPacketData, type AdbPacketInit, type ReadableWritablePair, type WritableStream } from '@yume-chan/adb'; +import { AdbPacketHeader, AdbPacketSerializeStream, DuplexStreamFactory, pipeFrom, ReadableStream, WritableStream, type AdbBackend, type AdbPacketData, type AdbPacketInit, type ReadableWritablePair } from '@yume-chan/adb'; import type { StructDeserializeStream } from "@yume-chan/struct"; export const ADB_DEVICE_FILTER: USBDeviceFilter = { @@ -77,14 +77,14 @@ export class AdbWebUsbBackendStream implements ReadableWritablePair { await device.transferOut(outEndpoint.endpointNumber, chunk); }, }, { highWaterMark: 16 * 1024, size(chunk) { return chunk.byteLength; }, - }), + })), new AdbPacketSerializeStream() ); } diff --git a/libraries/adb-backend-ws/src/index.ts b/libraries/adb-backend-ws/src/index.ts index 4b44b94c..196e150d 100644 --- a/libraries/adb-backend-ws/src/index.ts +++ b/libraries/adb-backend-ws/src/index.ts @@ -1,4 +1,4 @@ -import { AdbPacket, AdbPacketSerializeStream, DuplexStreamFactory, pipeFrom, ReadableStream, StructDeserializeStream, type AdbBackend } from '@yume-chan/adb'; +import { AdbPacket, AdbPacketSerializeStream, DuplexStreamFactory, pipeFrom, ReadableStream, StructDeserializeStream, WritableStream, type AdbBackend } from '@yume-chan/adb'; export default class AdbWsBackend implements AdbBackend { public readonly serial: string; @@ -42,14 +42,14 @@ export default class AdbWsBackend implements AdbBackend { size(chunk) { return chunk.byteLength; }, })); - const writable = factory.createWritable({ + const writable = factory.createWritable(new WritableStream({ write: (chunk) => { socket.send(chunk); }, }, { highWaterMark: 16 * 1024, size(chunk) { return chunk.byteLength; }, - }); + })); return { readable: readable.pipeThrough(new StructDeserializeStream(AdbPacket)), diff --git a/libraries/adb/src/adb.ts b/libraries/adb/src/adb.ts index d2622cd9..1ccbac01 100644 --- a/libraries/adb/src/adb.ts +++ b/libraries/adb/src/adb.ts @@ -76,6 +76,7 @@ export class Adb implements Closeable { await writer.write(calculateChecksum(init)); } + let banner: string; try { // https://android.googlesource.com/platform/packages/modules/adb/+/79010dc6d5ca7490c493df800d4421730f5466ca/transport.cpp#1252 // There are some other feature constants, but some of them are only used by ADB server, not devices (daemons). @@ -109,27 +110,23 @@ export class Adb implements Closeable { payload: encodeUtf8(`host::features=${features};`), }); - const banner = await resolver.promise; - - // Stop piping before creating `Adb` object - // Because `AdbPacketDispatcher` will lock the streams when initializing + banner = await resolver.promise; + } finally { + // When failed, release locks on `connection` so the caller can try again. + // When success, also release locks so `AdbPacketDispatcher` can use them. abortController.abort(); writer.releaseLock(); // Wait until pipe stops (`ReadableStream` lock released) await pipe; - - return new Adb( - connection, - version, - maxPayloadSize, - banner, - ); - } catch (e) { - abortController.abort(); - writer.releaseLock(); - throw e; } + + return new Adb( + connection, + version, + maxPayloadSize, + banner, + ); } private readonly dispatcher: AdbPacketDispatcher; diff --git a/libraries/adb/src/commands/sync/sync.ts b/libraries/adb/src/commands/sync/sync.ts index 41337881..9f6d27f4 100644 --- a/libraries/adb/src/commands/sync/sync.ts +++ b/libraries/adb/src/commands/sync/sync.ts @@ -175,7 +175,7 @@ export class AdbSync extends AutoDisposable { public override async dispose() { super.dispose(); - this.stream.close(); + await this.stream.close(); await this.writer.close(); } } diff --git a/libraries/adb/src/socket/dispatcher.ts b/libraries/adb/src/socket/dispatcher.ts index 38a78c99..94748bcf 100644 --- a/libraries/adb/src/socket/dispatcher.ts +++ b/libraries/adb/src/socket/dispatcher.ts @@ -47,6 +47,7 @@ export class AdbPacketDispatcher implements Closeable { public readonly options: AdbPacketDispatcherOptions; + private _closed = false; private _disconnected = new PromiseResolver(); public get disconnected() { return this._disconnected.promise; } @@ -90,17 +91,22 @@ export class AdbPacketDispatcher implements Closeable { }, }), { // There are multiple reasons for the pipe to stop, - // including device disconnection, protocol error, or user abort, + // (device disconnection, protocol error, or user abortion) // if the underlying streams are still open, // it's still possible to create another ADB connection. // So don't close `readable` here. - preventCancel: false, + preventCancel: true, signal: this._abortController.signal, }) .then(() => { this.dispose(); }, (e) => { - this._disconnected.reject(e); + // https://github.com/MattiasBuelens/web-streams-polyfill/issues/115 + // `e` is always `AbortError` (instead of what I give in `abortController.abort()`) + // so we can't check if `e` is a real error. + if (!this._closed) { + this._disconnected.reject(e); + } this.dispose(); }); @@ -265,6 +271,7 @@ export class AdbPacketDispatcher implements Closeable { (init as AdbPacketInit).checksum = 0; } + await this._writer.ready; await this._writer.write(init as AdbPacketInit); } @@ -280,13 +287,11 @@ export class AdbPacketDispatcher implements Closeable { // Stop receiving // It's possible that we haven't received all `CLSE` confirm packets, // but it doesn't matter, the next connection can cope with them. - try { - this._abortController.abort(); - } catch { } + this._closed = true; + this._abortController.abort(); + this._writer.releaseLock(); - // Adb connection doesn't have a method to confirm closing, - // so call `dispose` immediately - this.dispose(); + // `pipe().then()` will call `dispose` } private dispose() { @@ -294,8 +299,6 @@ export class AdbPacketDispatcher implements Closeable { socket.dispose(); } - this._writer.releaseLock(); - this._disconnected.resolve(); } } diff --git a/libraries/adb/src/socket/socket.ts b/libraries/adb/src/socket/socket.ts index c647c7d0..bbb4dbdb 100644 --- a/libraries/adb/src/socket/socket.ts +++ b/libraries/adb/src/socket/socket.ts @@ -1,7 +1,7 @@ import { PromiseResolver } from "@yume-chan/async"; import type { Disposable } from "@yume-chan/event"; import { AdbCommand } from '../packet.js'; -import { ChunkStream, DuplexStreamFactory, pipeFrom, PushReadableStream, type PushReadableStreamController, type ReadableStream, type ReadableWritablePair, type WritableStream } from '../stream/index.js'; +import { ChunkStream, DuplexStreamFactory, pipeFrom, PushReadableStream, WritableStream, type PushReadableStreamController, type ReadableStream, type ReadableWritablePair } from '../stream/index.js'; import type { AdbPacketDispatcher, Closeable } from './dispatcher.js'; export interface AdbSocketInfo { @@ -77,19 +77,21 @@ export class AdbSocketController implements AdbSocketInfo, ReadableWritablePair< ); this.writable = pipeFrom( - this._factory.createWritable({ - write: async (chunk) => { - // Wait for an ack packet - this._writePromise = new PromiseResolver(); - await this.dispatcher.sendPacket( - AdbCommand.Write, - this.localId, - this.remoteId, - chunk - ); - await this._writePromise.promise; - }, - }), + this._factory.createWritable( + new WritableStream({ + write: async (chunk) => { + // Wait for an ack packet + this._writePromise = new PromiseResolver(); + await this.dispatcher.sendPacket( + AdbCommand.Write, + this.localId, + this.remoteId, + chunk + ); + await this._writePromise.promise; + } + }), + ), new ChunkStream(this.dispatcher.options.maxPayloadSize) ); diff --git a/libraries/adb/src/stream/buffered.ts b/libraries/adb/src/stream/buffered.ts index 4ecdbbe2..1d1eac4b 100644 --- a/libraries/adb/src/stream/buffered.ts +++ b/libraries/adb/src/stream/buffered.ts @@ -123,8 +123,8 @@ export class BufferedStream { } } - public close() { - this.reader.cancel(); + public async close() { + await this.reader.cancel(); } } diff --git a/libraries/adb/src/stream/detect.polyfill.ts b/libraries/adb/src/stream/detect.polyfill.ts index 2d0b8507..6434bc16 100644 --- a/libraries/adb/src/stream/detect.polyfill.ts +++ b/libraries/adb/src/stream/detect.polyfill.ts @@ -17,7 +17,7 @@ export interface AbortController { /** * Invoking this method will set this object's AbortSignal's aborted flag and signal to any observers that the associated activity is to be aborted. */ - abort(): void; + abort(reason?: any): void; } export let AbortController: { diff --git a/libraries/adb/src/stream/transform.ts b/libraries/adb/src/stream/transform.ts index f76802b7..52d6ae00 100644 --- a/libraries/adb/src/stream/transform.ts +++ b/libraries/adb/src/stream/transform.ts @@ -3,7 +3,7 @@ import type Struct from "@yume-chan/struct"; import type { StructValueType, ValueOrPromise } from "@yume-chan/struct"; import { decodeUtf8 } from "../utils/index.js"; import { BufferedStream, BufferedStreamEndedError } from "./buffered.js"; -import { AbortController, AbortSignal, ReadableStream, ReadableStreamDefaultReader, TransformStream, WritableStream, WritableStreamDefaultWriter, type QueuingStrategy, type ReadableStreamDefaultController, type ReadableWritablePair, type UnderlyingSink } from "./detect.js"; +import { AbortController, AbortSignal, ReadableStream, ReadableStreamDefaultReader, TransformStream, WritableStream, WritableStreamDefaultWriter, type QueuingStrategy, type ReadableStreamDefaultController, type ReadableWritablePair } from "./detect.js"; export interface DuplexStreamFactoryOptions { close?: (() => ValueOrPromise) | undefined; @@ -19,6 +19,7 @@ export interface DuplexStreamFactoryOptions { */ export class DuplexStreamFactory { private readableControllers: ReadableStreamDefaultController[] = []; + private writers: WritableStreamDefaultWriter[] = []; private _writableClosed = false; public get writableClosed() { return this._writableClosed; } @@ -49,29 +50,26 @@ export class DuplexStreamFactory { }); } - public createWritable(sink: UnderlyingSink, strategy?: QueuingStrategy): WritableStream { + public createWritable(stream: WritableStream): WritableStream { + const writer = stream.getWriter(); + this.writers.push(writer); + // `WritableStream` has no way to tell if the remote peer has closed the connection. // So it only triggers `close`. return new WritableStream({ - start: async (controller) => { - await sink.start?.(controller); - }, - write: async (chunk, controller) => { - if (this._writableClosed) { - throw new Error("stream is closed"); - } - - await sink.write?.(chunk, controller); + write: async (chunk) => { + await writer.ready; + await writer.write(chunk); }, abort: async (reason) => { - await sink.abort?.(reason); + await writer.abort(reason); await this.close(); }, close: async () => { - await sink.close?.(); + try { await writer.close(); } catch { } await this.close(); }, - }, strategy); + }); } public async close() { @@ -79,10 +77,15 @@ export class DuplexStreamFactory { return; } this._writableClosed = true; + if (await this.options.close?.() !== false) { // `close` can return `false` to disable automatic `dispose`. await this.dispose(); } + + for (const writer of this.writers) { + try { await writer.close(); } catch { } + } } public async dispose() {