From 1aa7a92d2c245135630af372fb4c87bc22eac8eb Mon Sep 17 00:00:00 2001 From: Simon Chan <1330321+yume-chan@users.noreply.github.com> Date: Wed, 11 Oct 2023 00:25:07 +0800 Subject: [PATCH] fix(adb): read socket too slow can cause data loss --- libraries/adb-daemon-webusb/src/device.ts | 154 ++++++++++---------- libraries/adb/src/daemon/dispatcher.ts | 27 ++-- libraries/adb/src/daemon/socket.ts | 7 +- libraries/stream-extra/src/buffered.ts | 33 +++-- libraries/stream-extra/src/duplex.ts | 35 +++-- libraries/stream-extra/src/push-readable.ts | 35 ++++- libraries/stream-extra/src/wrap-readable.ts | 63 ++++---- 7 files changed, 190 insertions(+), 164 deletions(-) diff --git a/libraries/adb-daemon-webusb/src/device.ts b/libraries/adb-daemon-webusb/src/device.ts index 88a99b95..5d3a74bf 100644 --- a/libraries/adb-daemon-webusb/src/device.ts +++ b/libraries/adb-daemon-webusb/src/device.ts @@ -104,6 +104,8 @@ export class AdbDaemonWebUsbConnection return this.#device; } + #inEndpoint: USBEndpoint; + #readable: ReadableStream; get readable() { return this.#readable; @@ -121,6 +123,7 @@ export class AdbDaemonWebUsbConnection usbManager: USB, ) { this.#device = device; + this.#inEndpoint = inEndpoint; let closed = false; @@ -154,89 +157,19 @@ export class AdbDaemonWebUsbConnection usbManager.addEventListener("disconnect", handleUsbDisconnect); this.#readable = duplex.wrapReadable( - new ReadableStream({ - async pull(controller) { - try { - while (true) { - // The `length` argument in `transferIn` must not be smaller than what the device sent, - // otherwise it will return `babble` status without any data. - // ADB daemon sends each packet in two parts, the 24-byte header and the payload. - const result = await device.raw.transferIn( - inEndpoint.endpointNumber, - 24, - ); - - // Maximum payload size is 1MB, so reading 1MB data will always success, - // and always discards all lingering data. - // FIXME: Chrome on Windows doesn't support babble status. See the HACK below. - if (result.status === "babble") { - await device.raw.transferIn( - inEndpoint.endpointNumber, - 1024 * 1024, - ); - continue; - } - - // Per spec, the `result.data` always covers the whole `buffer`. - const buffer = new Uint8Array(result.data!.buffer); - const stream = new Uint8ArrayExactReadable(buffer); - - // Add `payload` field to its type, it's assigned below. - const packet = AdbPacketHeader.deserialize( - stream, - ) as AdbPacketHeader & { payload: Uint8Array }; - if (packet.payloadLength !== 0) { - // HACK: Chrome on Windows doesn't support babble status, - // so maybe we are not actually reading an ADB packet header. - // Currently the maximum payload size is 1MB, - // so if the payload length is larger than that, - // try to discard the data and receive again. - // https://crbug.com/1314358 - if (packet.payloadLength > 1024 * 1024) { - await device.raw.transferIn( - inEndpoint.endpointNumber, - 1024 * 1024, - ); - continue; - } - - const result = await device.raw.transferIn( - inEndpoint.endpointNumber, - packet.payloadLength, - ); - packet.payload = new Uint8Array( - result.data!.buffer, - ); - } else { - packet.payload = EMPTY_UINT8_ARRAY; - } - + new ReadableStream( + { + pull: async (controller) => { + const packet = await this.#transferIn(); + if (packet) { controller.enqueue(packet); - return; + } else { + controller.close(); } - } catch (e) { - // On Windows, disconnecting the device will cause `NetworkError` to be thrown, - // even before the `disconnect` event is fired. - // We need to wait a little bit and check if the device is still connected. - // https://github.com/WICG/webusb/issues/219 - if (isErrorName(e, "NetworkError")) { - await new Promise((resolve) => { - setTimeout(() => { - resolve(); - }, 100); - }); - - if (closed) { - controller.close(); - } else { - throw e; - } - } - - throw e; - } + }, }, - }), + { highWaterMark: 0 }, + ), ); const zeroMask = outEndpoint.packetSize - 1; @@ -275,6 +208,67 @@ export class AdbDaemonWebUsbConnection new AdbPacketSerializeStream(), ); } + + async #transferIn(): Promise { + try { + while (true) { + // ADB daemon sends each packet in two parts, the 24-byte header and the payload. + const result = await this.#device.raw.transferIn( + this.#inEndpoint.endpointNumber, + this.#inEndpoint.packetSize, + ); + + if (result.data!.byteLength !== 24) { + continue; + } + + // Per spec, the `result.data` always covers the whole `buffer`. + const buffer = new Uint8Array(result.data!.buffer); + const stream = new Uint8ArrayExactReadable(buffer); + + // Add `payload` field to its type, it's assigned below. + const packet = AdbPacketHeader.deserialize( + stream, + ) as AdbPacketHeader & { payload: Uint8Array }; + + if (packet.magic !== (packet.command ^ 0xffffffff)) { + continue; + } + + if (packet.payloadLength !== 0) { + const result = await this.#device.raw.transferIn( + this.#inEndpoint.endpointNumber, + packet.payloadLength, + ); + packet.payload = new Uint8Array(result.data!.buffer); + } else { + packet.payload = EMPTY_UINT8_ARRAY; + } + + return packet; + } + } catch (e) { + // On Windows, disconnecting the device will cause `NetworkError` to be thrown, + // even before the `disconnect` event is fired. + // We need to wait a little bit and check if the device is still connected. + // https://github.com/WICG/webusb/issues/219 + if (isErrorName(e, "NetworkError")) { + await new Promise((resolve) => { + setTimeout(() => { + resolve(); + }, 100); + }); + + if (closed) { + return undefined; + } else { + throw e; + } + } + + throw e; + } + } } export class AdbDaemonWebUsbDevice implements AdbDaemonDevice { diff --git a/libraries/adb/src/daemon/dispatcher.ts b/libraries/adb/src/daemon/dispatcher.ts index 60450d0a..62971de9 100644 --- a/libraries/adb/src/daemon/dispatcher.ts +++ b/libraries/adb/src/daemon/dispatcher.ts @@ -87,20 +87,8 @@ export class AdbPacketDispatcher implements Closeable { await this.#handleClose(packet); break; case AdbCommand.Write: - if (this.#sockets.has(packet.arg1)) { - await this.#sockets - .get(packet.arg1)! - .enqueue(packet.payload); - await this.sendPacket( - AdbCommand.OK, - packet.arg1, - packet.arg0, - ); - break; - } - throw new Error( - `Unknown local socket id: ${packet.arg1}`, - ); + await this.#handleWrite(packet); + break; case AdbCommand.Open: await this.#handleOpen(packet); break; @@ -200,6 +188,17 @@ export class AdbPacketDispatcher implements Closeable { // the device may also respond with two `CLSE` packets. } + async #handleWrite(packet: AdbPacketData) { + const socket = this.#sockets.get(packet.arg1); + if (!socket) { + throw new Error(`Unknown local socket id: ${packet.arg1}`); + } + + await socket.enqueue(packet.payload); + await this.sendPacket(AdbCommand.OK, packet.arg1, packet.arg0); + return; + } + addReverseTunnel(service: string, handler: AdbIncomingSocketHandler) { this.#incomingSocketHandlers.set(service, handler); } diff --git a/libraries/adb/src/daemon/socket.ts b/libraries/adb/src/daemon/socket.ts index 29ff9301..df88389c 100644 --- a/libraries/adb/src/daemon/socket.ts +++ b/libraries/adb/src/daemon/socket.ts @@ -108,12 +108,7 @@ export class AdbDaemonSocketController (controller) => { this.#readableController = controller; }, - { - highWaterMark: options.highWaterMark ?? 16 * 1024, - size(chunk) { - return chunk.byteLength; - }, - }, + { highWaterMark: 0 }, ), ); diff --git a/libraries/stream-extra/src/buffered.ts b/libraries/stream-extra/src/buffered.ts index 05ac6e2a..f98f38f3 100644 --- a/libraries/stream-extra/src/buffered.ts +++ b/libraries/stream-extra/src/buffered.ts @@ -31,7 +31,6 @@ export class BufferedReadableStream implements AsyncExactReadable { if (done) { throw new ExactReadableEndedError(); } - this.#position += value.byteLength; return value; } @@ -42,45 +41,51 @@ export class BufferedReadableStream implements AsyncExactReadable { if (initial) { result = new Uint8Array(length); result.set(initial); - index = initial.byteLength; - length -= initial.byteLength; + index = initial.length; + length -= initial.length; } else { const array = await this.#readSource(); - if (array.byteLength === length) { + if (array.length === length) { + this.#position += length; return array; } - if (array.byteLength > length) { + if (array.length > length) { this.#buffered = array; this.#bufferedOffset = length; - this.#bufferedLength = array.byteLength - length; + this.#bufferedLength = array.length - length; + this.#position += length; return array.subarray(0, length); } result = new Uint8Array(length); result.set(array); - index = array.byteLength; - length -= array.byteLength; + index = array.length; + length -= array.length; + this.#position += array.length; } while (length > 0) { const array = await this.#readSource(); - if (array.byteLength === length) { + if (array.length === length) { result.set(array, index); + this.#position += length; return result; } - if (array.byteLength > length) { + if (array.length > length) { this.#buffered = array; this.#bufferedOffset = length; - this.#bufferedLength = array.byteLength - length; + this.#bufferedLength = array.length - length; result.set(array.subarray(0, length), index); + this.#position += length; return result; } result.set(array, index); - index += array.byteLength; - length -= array.byteLength; + index += array.length; + length -= array.length; + this.#position += array.length; } return result; @@ -101,12 +106,14 @@ export class BufferedReadableStream implements AsyncExactReadable { // don't use it until absolutely necessary this.#bufferedOffset += length; this.#bufferedLength -= length; + this.#position += length; return array.subarray(offset, offset + length); } this.#buffered = undefined; this.#bufferedLength = 0; this.#bufferedOffset = 0; + this.#position += array.length - offset; return this.#readAsync(length, array.subarray(offset)); } diff --git a/libraries/stream-extra/src/duplex.ts b/libraries/stream-extra/src/duplex.ts index a8d0691f..e6158308 100644 --- a/libraries/stream-extra/src/duplex.ts +++ b/libraries/stream-extra/src/duplex.ts @@ -2,6 +2,7 @@ import { PromiseResolver } from "@yume-chan/async"; import type { ValueOrPromise } from "@yume-chan/struct"; import type { + QueuingStrategy, ReadableStream, ReadableStreamDefaultController, WritableStreamDefaultWriter, @@ -65,21 +66,27 @@ export class DuplexStreamFactory { this.#options = options ?? {}; } - wrapReadable(readable: ReadableStream): WrapReadableStream { - return new WrapReadableStream({ - start: (controller) => { - this.#readableControllers.push(controller); - return readable; + wrapReadable( + readable: ReadableStream, + strategy?: QueuingStrategy, + ): WrapReadableStream { + return new WrapReadableStream( + { + start: (controller) => { + this.#readableControllers.push(controller); + return readable; + }, + cancel: async () => { + // cancel means the local peer wants to close the connection. + await this.close(); + }, + close: async () => { + // stream end means the remote peer closed the connection first. + await this.dispose(); + }, }, - cancel: async () => { - // cancel means the local peer wants to close the connection. - await this.close(); - }, - close: async () => { - // stream end means the remote peer closed the connection first. - await this.dispose(); - }, - }); + strategy, + ); } createWritable(stream: WritableStream): WritableStream { diff --git a/libraries/stream-extra/src/push-readable.ts b/libraries/stream-extra/src/push-readable.ts index 3f530424..ab37b71e 100644 --- a/libraries/stream-extra/src/push-readable.ts +++ b/libraries/stream-extra/src/push-readable.ts @@ -18,6 +18,8 @@ export type PushReadableStreamSource = ( ) => void | Promise; export class PushReadableStream extends ReadableStream { + #zeroHighWaterMarkAllowEnqueue = false; + /** * Create a new `PushReadableStream` from a source. * @@ -34,10 +36,12 @@ export class PushReadableStream extends ReadableStream { super( { - start: (controller) => { + start: async (controller) => { + await Promise.resolve(); + const result = source({ abortSignal: abortController.signal, - async enqueue(chunk) { + enqueue: async (chunk) => { if (abortController.signal.aborted) { // If the stream is already cancelled, // throw immediately. @@ -47,11 +51,20 @@ export class PushReadableStream extends ReadableStream { ); } - // Only when the stream is errored, `desiredSize` will be `null`. - // But since `null <= 0` is `true` - // (`null <= 0` is evaluated as `!(null > 0)` => `!false` => `true`), - // not handling it will cause a deadlock. - if ((controller.desiredSize ?? 1) <= 0) { + if (controller.desiredSize === null) { + // `desiredSize` being `null` means the stream is in error state, + // `controller.enqueue` will throw an error for us. + controller.enqueue(chunk); + return; + } + + if (this.#zeroHighWaterMarkAllowEnqueue) { + this.#zeroHighWaterMarkAllowEnqueue = false; + controller.enqueue(chunk); + return; + } + + if (controller.desiredSize <= 0) { waterMarkLow = new PromiseResolver(); await waterMarkLow.promise; } @@ -84,7 +97,13 @@ export class PushReadableStream extends ReadableStream { } }, pull: () => { - waterMarkLow?.resolve(); + if (waterMarkLow) { + waterMarkLow.resolve(); + return; + } + if (strategy?.highWaterMark === 0) { + this.#zeroHighWaterMarkAllowEnqueue = true; + } }, cancel: (reason) => { abortController.abort(reason); diff --git a/libraries/stream-extra/src/wrap-readable.ts b/libraries/stream-extra/src/wrap-readable.ts index 221d6f83..3efcf26d 100644 --- a/libraries/stream-extra/src/wrap-readable.ts +++ b/libraries/stream-extra/src/wrap-readable.ts @@ -1,6 +1,7 @@ import type { ValueOrPromise } from "@yume-chan/struct"; import type { + QueuingStrategy, ReadableStreamDefaultController, ReadableStreamDefaultReader, } from "./stream.js"; @@ -51,38 +52,42 @@ export class WrapReadableStream extends ReadableStream { | ReadableStream | WrapReadableStreamStart | ReadableStreamWrapper, + strategy?: QueuingStrategy, ) { - super({ - start: async (controller) => { - // `start` is invoked before `ReadableStream`'s constructor finish, - // so using `this` synchronously causes - // "Must call super constructor in derived class before accessing 'this' or returning from derived constructor". - // Queue a microtask to avoid this. - await Promise.resolve(); + super( + { + start: async (controller) => { + // `start` is invoked before `ReadableStream`'s constructor finish, + // so using `this` synchronously causes + // "Must call super constructor in derived class before accessing 'this' or returning from derived constructor". + // Queue a microtask to avoid this. + await Promise.resolve(); - this.readable = await getWrappedReadableStream( - wrapper, - controller, - ); - this.#reader = this.readable.getReader(); - }, - cancel: async (reason) => { - await this.#reader.cancel(reason); - if ("cancel" in wrapper) { - await wrapper.cancel?.(reason); - } - }, - pull: async (controller) => { - const result = await this.#reader.read(); - if (result.done) { - controller.close(); - if ("close" in wrapper) { - await wrapper.close?.(); + this.readable = await getWrappedReadableStream( + wrapper, + controller, + ); + this.#reader = this.readable.getReader(); + }, + pull: async (controller) => { + const result = await this.#reader.read(); + if (result.done) { + controller.close(); + if ("close" in wrapper) { + await wrapper.close?.(); + } + } else { + controller.enqueue(result.value); } - } else { - controller.enqueue(result.value); - } + }, + cancel: async (reason) => { + await this.#reader.cancel(reason); + if ("cancel" in wrapper) { + await wrapper.cancel?.(reason); + } + }, }, - }); + strategy, + ); } }