fix(adb): read socket too slow can cause data loss

This commit is contained in:
Simon Chan 2023-10-11 00:25:07 +08:00
parent dce44ae9ac
commit 1aa7a92d2c
No known key found for this signature in database
GPG key ID: A8B69F750B9BCEDD
7 changed files with 190 additions and 164 deletions

View file

@ -104,6 +104,8 @@ export class AdbDaemonWebUsbConnection
return this.#device; return this.#device;
} }
#inEndpoint: USBEndpoint;
#readable: ReadableStream<AdbPacketData>; #readable: ReadableStream<AdbPacketData>;
get readable() { get readable() {
return this.#readable; return this.#readable;
@ -121,6 +123,7 @@ export class AdbDaemonWebUsbConnection
usbManager: USB, usbManager: USB,
) { ) {
this.#device = device; this.#device = device;
this.#inEndpoint = inEndpoint;
let closed = false; let closed = false;
@ -154,89 +157,19 @@ export class AdbDaemonWebUsbConnection
usbManager.addEventListener("disconnect", handleUsbDisconnect); usbManager.addEventListener("disconnect", handleUsbDisconnect);
this.#readable = duplex.wrapReadable( this.#readable = duplex.wrapReadable(
new ReadableStream<AdbPacketData>({ new ReadableStream<AdbPacketData>(
async pull(controller) { {
try { pull: async (controller) => {
while (true) { const packet = await this.#transferIn();
// The `length` argument in `transferIn` must not be smaller than what the device sent, if (packet) {
// otherwise it will return `babble` status without any data.
// ADB daemon sends each packet in two parts, the 24-byte header and the payload.
const result = await device.raw.transferIn(
inEndpoint.endpointNumber,
24,
);
// Maximum payload size is 1MB, so reading 1MB data will always success,
// and always discards all lingering data.
// FIXME: Chrome on Windows doesn't support babble status. See the HACK below.
if (result.status === "babble") {
await device.raw.transferIn(
inEndpoint.endpointNumber,
1024 * 1024,
);
continue;
}
// Per spec, the `result.data` always covers the whole `buffer`.
const buffer = new Uint8Array(result.data!.buffer);
const stream = new Uint8ArrayExactReadable(buffer);
// Add `payload` field to its type, it's assigned below.
const packet = AdbPacketHeader.deserialize(
stream,
) as AdbPacketHeader & { payload: Uint8Array };
if (packet.payloadLength !== 0) {
// HACK: Chrome on Windows doesn't support babble status,
// so maybe we are not actually reading an ADB packet header.
// Currently the maximum payload size is 1MB,
// so if the payload length is larger than that,
// try to discard the data and receive again.
// https://crbug.com/1314358
if (packet.payloadLength > 1024 * 1024) {
await device.raw.transferIn(
inEndpoint.endpointNumber,
1024 * 1024,
);
continue;
}
const result = await device.raw.transferIn(
inEndpoint.endpointNumber,
packet.payloadLength,
);
packet.payload = new Uint8Array(
result.data!.buffer,
);
} else {
packet.payload = EMPTY_UINT8_ARRAY;
}
controller.enqueue(packet); controller.enqueue(packet);
return; } else {
controller.close();
} }
} catch (e) { },
// On Windows, disconnecting the device will cause `NetworkError` to be thrown,
// even before the `disconnect` event is fired.
// We need to wait a little bit and check if the device is still connected.
// https://github.com/WICG/webusb/issues/219
if (isErrorName(e, "NetworkError")) {
await new Promise<void>((resolve) => {
setTimeout(() => {
resolve();
}, 100);
});
if (closed) {
controller.close();
} else {
throw e;
}
}
throw e;
}
}, },
}), { highWaterMark: 0 },
),
); );
const zeroMask = outEndpoint.packetSize - 1; const zeroMask = outEndpoint.packetSize - 1;
@ -275,6 +208,67 @@ export class AdbDaemonWebUsbConnection
new AdbPacketSerializeStream(), new AdbPacketSerializeStream(),
); );
} }
async #transferIn(): Promise<AdbPacketData | undefined> {
try {
while (true) {
// ADB daemon sends each packet in two parts, the 24-byte header and the payload.
const result = await this.#device.raw.transferIn(
this.#inEndpoint.endpointNumber,
this.#inEndpoint.packetSize,
);
if (result.data!.byteLength !== 24) {
continue;
}
// Per spec, the `result.data` always covers the whole `buffer`.
const buffer = new Uint8Array(result.data!.buffer);
const stream = new Uint8ArrayExactReadable(buffer);
// Add `payload` field to its type, it's assigned below.
const packet = AdbPacketHeader.deserialize(
stream,
) as AdbPacketHeader & { payload: Uint8Array };
if (packet.magic !== (packet.command ^ 0xffffffff)) {
continue;
}
if (packet.payloadLength !== 0) {
const result = await this.#device.raw.transferIn(
this.#inEndpoint.endpointNumber,
packet.payloadLength,
);
packet.payload = new Uint8Array(result.data!.buffer);
} else {
packet.payload = EMPTY_UINT8_ARRAY;
}
return packet;
}
} catch (e) {
// On Windows, disconnecting the device will cause `NetworkError` to be thrown,
// even before the `disconnect` event is fired.
// We need to wait a little bit and check if the device is still connected.
// https://github.com/WICG/webusb/issues/219
if (isErrorName(e, "NetworkError")) {
await new Promise<void>((resolve) => {
setTimeout(() => {
resolve();
}, 100);
});
if (closed) {
return undefined;
} else {
throw e;
}
}
throw e;
}
}
} }
export class AdbDaemonWebUsbDevice implements AdbDaemonDevice { export class AdbDaemonWebUsbDevice implements AdbDaemonDevice {

View file

@ -87,20 +87,8 @@ export class AdbPacketDispatcher implements Closeable {
await this.#handleClose(packet); await this.#handleClose(packet);
break; break;
case AdbCommand.Write: case AdbCommand.Write:
if (this.#sockets.has(packet.arg1)) { await this.#handleWrite(packet);
await this.#sockets break;
.get(packet.arg1)!
.enqueue(packet.payload);
await this.sendPacket(
AdbCommand.OK,
packet.arg1,
packet.arg0,
);
break;
}
throw new Error(
`Unknown local socket id: ${packet.arg1}`,
);
case AdbCommand.Open: case AdbCommand.Open:
await this.#handleOpen(packet); await this.#handleOpen(packet);
break; break;
@ -200,6 +188,17 @@ export class AdbPacketDispatcher implements Closeable {
// the device may also respond with two `CLSE` packets. // the device may also respond with two `CLSE` packets.
} }
async #handleWrite(packet: AdbPacketData) {
const socket = this.#sockets.get(packet.arg1);
if (!socket) {
throw new Error(`Unknown local socket id: ${packet.arg1}`);
}
await socket.enqueue(packet.payload);
await this.sendPacket(AdbCommand.OK, packet.arg1, packet.arg0);
return;
}
addReverseTunnel(service: string, handler: AdbIncomingSocketHandler) { addReverseTunnel(service: string, handler: AdbIncomingSocketHandler) {
this.#incomingSocketHandlers.set(service, handler); this.#incomingSocketHandlers.set(service, handler);
} }

View file

@ -108,12 +108,7 @@ export class AdbDaemonSocketController
(controller) => { (controller) => {
this.#readableController = controller; this.#readableController = controller;
}, },
{ { highWaterMark: 0 },
highWaterMark: options.highWaterMark ?? 16 * 1024,
size(chunk) {
return chunk.byteLength;
},
},
), ),
); );

View file

@ -31,7 +31,6 @@ export class BufferedReadableStream implements AsyncExactReadable {
if (done) { if (done) {
throw new ExactReadableEndedError(); throw new ExactReadableEndedError();
} }
this.#position += value.byteLength;
return value; return value;
} }
@ -42,45 +41,51 @@ export class BufferedReadableStream implements AsyncExactReadable {
if (initial) { if (initial) {
result = new Uint8Array(length); result = new Uint8Array(length);
result.set(initial); result.set(initial);
index = initial.byteLength; index = initial.length;
length -= initial.byteLength; length -= initial.length;
} else { } else {
const array = await this.#readSource(); const array = await this.#readSource();
if (array.byteLength === length) { if (array.length === length) {
this.#position += length;
return array; return array;
} }
if (array.byteLength > length) { if (array.length > length) {
this.#buffered = array; this.#buffered = array;
this.#bufferedOffset = length; this.#bufferedOffset = length;
this.#bufferedLength = array.byteLength - length; this.#bufferedLength = array.length - length;
this.#position += length;
return array.subarray(0, length); return array.subarray(0, length);
} }
result = new Uint8Array(length); result = new Uint8Array(length);
result.set(array); result.set(array);
index = array.byteLength; index = array.length;
length -= array.byteLength; length -= array.length;
this.#position += array.length;
} }
while (length > 0) { while (length > 0) {
const array = await this.#readSource(); const array = await this.#readSource();
if (array.byteLength === length) { if (array.length === length) {
result.set(array, index); result.set(array, index);
this.#position += length;
return result; return result;
} }
if (array.byteLength > length) { if (array.length > length) {
this.#buffered = array; this.#buffered = array;
this.#bufferedOffset = length; this.#bufferedOffset = length;
this.#bufferedLength = array.byteLength - length; this.#bufferedLength = array.length - length;
result.set(array.subarray(0, length), index); result.set(array.subarray(0, length), index);
this.#position += length;
return result; return result;
} }
result.set(array, index); result.set(array, index);
index += array.byteLength; index += array.length;
length -= array.byteLength; length -= array.length;
this.#position += array.length;
} }
return result; return result;
@ -101,12 +106,14 @@ export class BufferedReadableStream implements AsyncExactReadable {
// don't use it until absolutely necessary // don't use it until absolutely necessary
this.#bufferedOffset += length; this.#bufferedOffset += length;
this.#bufferedLength -= length; this.#bufferedLength -= length;
this.#position += length;
return array.subarray(offset, offset + length); return array.subarray(offset, offset + length);
} }
this.#buffered = undefined; this.#buffered = undefined;
this.#bufferedLength = 0; this.#bufferedLength = 0;
this.#bufferedOffset = 0; this.#bufferedOffset = 0;
this.#position += array.length - offset;
return this.#readAsync(length, array.subarray(offset)); return this.#readAsync(length, array.subarray(offset));
} }

View file

@ -2,6 +2,7 @@ import { PromiseResolver } from "@yume-chan/async";
import type { ValueOrPromise } from "@yume-chan/struct"; import type { ValueOrPromise } from "@yume-chan/struct";
import type { import type {
QueuingStrategy,
ReadableStream, ReadableStream,
ReadableStreamDefaultController, ReadableStreamDefaultController,
WritableStreamDefaultWriter, WritableStreamDefaultWriter,
@ -65,21 +66,27 @@ export class DuplexStreamFactory<R, W> {
this.#options = options ?? {}; this.#options = options ?? {};
} }
wrapReadable(readable: ReadableStream<R>): WrapReadableStream<R> { wrapReadable(
return new WrapReadableStream<R>({ readable: ReadableStream<R>,
start: (controller) => { strategy?: QueuingStrategy<R>,
this.#readableControllers.push(controller); ): WrapReadableStream<R> {
return readable; return new WrapReadableStream<R>(
{
start: (controller) => {
this.#readableControllers.push(controller);
return readable;
},
cancel: async () => {
// cancel means the local peer wants to close the connection.
await this.close();
},
close: async () => {
// stream end means the remote peer closed the connection first.
await this.dispose();
},
}, },
cancel: async () => { strategy,
// cancel means the local peer wants to close the connection. );
await this.close();
},
close: async () => {
// stream end means the remote peer closed the connection first.
await this.dispose();
},
});
} }
createWritable(stream: WritableStream<W>): WritableStream<W> { createWritable(stream: WritableStream<W>): WritableStream<W> {

View file

@ -18,6 +18,8 @@ export type PushReadableStreamSource<T> = (
) => void | Promise<void>; ) => void | Promise<void>;
export class PushReadableStream<T> extends ReadableStream<T> { export class PushReadableStream<T> extends ReadableStream<T> {
#zeroHighWaterMarkAllowEnqueue = false;
/** /**
* Create a new `PushReadableStream` from a source. * Create a new `PushReadableStream` from a source.
* *
@ -34,10 +36,12 @@ export class PushReadableStream<T> extends ReadableStream<T> {
super( super(
{ {
start: (controller) => { start: async (controller) => {
await Promise.resolve();
const result = source({ const result = source({
abortSignal: abortController.signal, abortSignal: abortController.signal,
async enqueue(chunk) { enqueue: async (chunk) => {
if (abortController.signal.aborted) { if (abortController.signal.aborted) {
// If the stream is already cancelled, // If the stream is already cancelled,
// throw immediately. // throw immediately.
@ -47,11 +51,20 @@ export class PushReadableStream<T> extends ReadableStream<T> {
); );
} }
// Only when the stream is errored, `desiredSize` will be `null`. if (controller.desiredSize === null) {
// But since `null <= 0` is `true` // `desiredSize` being `null` means the stream is in error state,
// (`null <= 0` is evaluated as `!(null > 0)` => `!false` => `true`), // `controller.enqueue` will throw an error for us.
// not handling it will cause a deadlock. controller.enqueue(chunk);
if ((controller.desiredSize ?? 1) <= 0) { return;
}
if (this.#zeroHighWaterMarkAllowEnqueue) {
this.#zeroHighWaterMarkAllowEnqueue = false;
controller.enqueue(chunk);
return;
}
if (controller.desiredSize <= 0) {
waterMarkLow = new PromiseResolver<void>(); waterMarkLow = new PromiseResolver<void>();
await waterMarkLow.promise; await waterMarkLow.promise;
} }
@ -84,7 +97,13 @@ export class PushReadableStream<T> extends ReadableStream<T> {
} }
}, },
pull: () => { pull: () => {
waterMarkLow?.resolve(); if (waterMarkLow) {
waterMarkLow.resolve();
return;
}
if (strategy?.highWaterMark === 0) {
this.#zeroHighWaterMarkAllowEnqueue = true;
}
}, },
cancel: (reason) => { cancel: (reason) => {
abortController.abort(reason); abortController.abort(reason);

View file

@ -1,6 +1,7 @@
import type { ValueOrPromise } from "@yume-chan/struct"; import type { ValueOrPromise } from "@yume-chan/struct";
import type { import type {
QueuingStrategy,
ReadableStreamDefaultController, ReadableStreamDefaultController,
ReadableStreamDefaultReader, ReadableStreamDefaultReader,
} from "./stream.js"; } from "./stream.js";
@ -51,38 +52,42 @@ export class WrapReadableStream<T> extends ReadableStream<T> {
| ReadableStream<T> | ReadableStream<T>
| WrapReadableStreamStart<T> | WrapReadableStreamStart<T>
| ReadableStreamWrapper<T>, | ReadableStreamWrapper<T>,
strategy?: QueuingStrategy<T>,
) { ) {
super({ super(
start: async (controller) => { {
// `start` is invoked before `ReadableStream`'s constructor finish, start: async (controller) => {
// so using `this` synchronously causes // `start` is invoked before `ReadableStream`'s constructor finish,
// "Must call super constructor in derived class before accessing 'this' or returning from derived constructor". // so using `this` synchronously causes
// Queue a microtask to avoid this. // "Must call super constructor in derived class before accessing 'this' or returning from derived constructor".
await Promise.resolve(); // Queue a microtask to avoid this.
await Promise.resolve();
this.readable = await getWrappedReadableStream( this.readable = await getWrappedReadableStream(
wrapper, wrapper,
controller, controller,
); );
this.#reader = this.readable.getReader(); this.#reader = this.readable.getReader();
}, },
cancel: async (reason) => { pull: async (controller) => {
await this.#reader.cancel(reason); const result = await this.#reader.read();
if ("cancel" in wrapper) { if (result.done) {
await wrapper.cancel?.(reason); controller.close();
} if ("close" in wrapper) {
}, await wrapper.close?.();
pull: async (controller) => { }
const result = await this.#reader.read(); } else {
if (result.done) { controller.enqueue(result.value);
controller.close();
if ("close" in wrapper) {
await wrapper.close?.();
} }
} else { },
controller.enqueue(result.value); cancel: async (reason) => {
} await this.#reader.cancel(reason);
if ("cancel" in wrapper) {
await wrapper.cancel?.(reason);
}
},
}, },
}); strategy,
);
} }
} }