diff --git a/libraries/adb/src/daemon/socket.ts b/libraries/adb/src/daemon/socket.ts index 26d25ee6..839c8103 100644 --- a/libraries/adb/src/daemon/socket.ts +++ b/libraries/adb/src/daemon/socket.ts @@ -1,7 +1,6 @@ import { PromiseResolver } from "@yume-chan/async"; import type { Disposable } from "@yume-chan/event"; import type { - AbortSignal, PushReadableStreamController, ReadableStream, WritableStream, @@ -67,12 +66,12 @@ export class AdbDaemonSocketController #availableWriteBytesChanged: PromiseResolver | undefined; /** - * When delayed ack is disabled, can be `Infinity` if the socket is ready to write. - * Exactly one packet can be written no matter how large it is. Or `-1` if the socket - * is waiting for ack. + * When delayed ack is disabled, returns `Infinity` if the socket is ready to write + * (exactly one packet can be written no matter how large it is), or `-1` if the socket + * is waiting for ack message. * - * When delayed ack is enabled, a non-negative finite number indicates the number of - * bytes that can be written to the socket before receiving an ack. + * When delayed ack is enabled, returns a non-negative finite number indicates the number of + * bytes that can be written to the socket before waiting for ack message. */ #availableWriteBytes = 0; @@ -90,8 +89,13 @@ export class AdbDaemonSocketController this.writable = new MaybeConsumable.WritableStream({ start: (controller) => { this.#writableController = controller; + controller.signal.addEventListener("abort", () => { + this.#availableWriteBytesChanged?.reject( + controller.signal.reason, + ); + }); }, - write: async (data, controller) => { + write: async (data) => { const size = data.length; const chunkSize = this.#dispatcher.options.maxPayloadSize; for ( @@ -100,7 +104,7 @@ export class AdbDaemonSocketController start = end, end += chunkSize ) { const chunk = data.subarray(start, end); - await this.#writeChunk(chunk, controller.signal); + await this.#writeChunk(chunk); } }, }); @@ -109,16 +113,12 @@ export class AdbDaemonSocketController this.#availableWriteBytes = options.availableWriteBytes; } - async #writeChunk(data: Uint8Array, signal: AbortSignal) { + async #writeChunk(data: Uint8Array) { const length = data.length; while (this.#availableWriteBytes < length) { // Only one lock is required because Web Streams API guarantees // that `write` is not reentrant. const resolver = new PromiseResolver(); - signal.addEventListener("abort", () => { - resolver.reject(signal.reason); - }); - this.#availableWriteBytesChanged = resolver; await resolver.promise; }