refactor: code cleanup, add tests

This commit is contained in:
Simon Chan 2024-03-05 12:41:48 +08:00
parent cbc0fa8f1e
commit 31656652db
No known key found for this signature in database
GPG key ID: A8B69F750B9BCEDD
4 changed files with 156 additions and 41 deletions

View file

@ -214,9 +214,10 @@ export class AdbPacketDispatcher implements Closeable {
const socket = this.#sockets.get(packet.arg1); const socket = this.#sockets.get(packet.arg1);
if (socket) { if (socket) {
// When delayed ack is enabled, device has received `ackBytes` from the socket. // When delayed ack is enabled, `ackBytes` is a positive number represents
// When delayed ack is disabled, device has received last `WRTE` packet from the socket, // how many bytes the device has received from this socket.
// `ackBytes` is `Infinity` in this case. // When delayed ack is disabled, `ackBytes` is always `Infinity` represents
// the device has received last `WRTE` packet from the socket.
socket.ack(ackBytes); socket.ack(ackBytes);
return; return;
} }
@ -332,7 +333,7 @@ export class AdbPacketDispatcher implements Closeable {
service, service,
); );
// Fulfilled by `handleOk` // Fulfilled by `handleOkay`
const { remoteId, availableWriteBytes } = await initializer; const { remoteId, availableWriteBytes } = await initializer;
const controller = new AdbDaemonSocketController({ const controller = new AdbDaemonSocketController({
dispatcher: this, dispatcher: this,

View file

@ -1,6 +1,7 @@
import { PromiseResolver } from "@yume-chan/async"; import { PromiseResolver } from "@yume-chan/async";
import type { Disposable } from "@yume-chan/event"; import type { Disposable } from "@yume-chan/event";
import type { import type {
AbortSignal,
Consumable, Consumable,
PushReadableStreamController, PushReadableStreamController,
ReadableStream, ReadableStream,
@ -13,7 +14,6 @@ import {
} from "@yume-chan/stream-extra"; } from "@yume-chan/stream-extra";
import type { AdbSocket } from "../adb.js"; import type { AdbSocket } from "../adb.js";
import { raceSignal } from "../server/index.js";
import type { AdbPacketDispatcher } from "./dispatcher.js"; import type { AdbPacketDispatcher } from "./dispatcher.js";
import { AdbCommand } from "./packet.js"; import { AdbCommand } from "./packet.js";
@ -105,30 +105,7 @@ export class AdbDaemonSocketController
start = end, end += chunkSize start = end, end += chunkSize
) { ) {
const chunk = data.subarray(start, end); const chunk = data.subarray(start, end);
const length = chunk.byteLength; await this.#writeChunk(chunk, controller.signal);
while (this.#availableWriteBytes < length) {
// Only one lock is required because Web Streams API guarantees
// that `write` is not reentrant.
this.#availableWriteBytesChanged =
new PromiseResolver();
await raceSignal(
() => this.#availableWriteBytesChanged!.promise,
controller.signal,
);
}
if (this.#availableWriteBytes === Infinity) {
this.#availableWriteBytes = -1;
} else {
this.#availableWriteBytes -= length;
}
await this.#dispatcher.sendPacket(
AdbCommand.Write,
this.localId,
this.remoteId,
chunk,
);
} }
}, },
}); });
@ -136,6 +113,34 @@ export class AdbDaemonSocketController
this.#socket = new AdbDaemonSocket(this); this.#socket = new AdbDaemonSocket(this);
} }
async #writeChunk(data: Uint8Array, signal: AbortSignal) {
const length = data.byteLength;
while (this.#availableWriteBytes < length) {
// Only one lock is required because Web Streams API guarantees
// that `write` is not reentrant.
const resolver = new PromiseResolver<void>();
signal.addEventListener("abort", () => {
resolver.reject(signal.reason);
});
this.#availableWriteBytesChanged = resolver;
await resolver.promise;
}
if (this.#availableWriteBytes === Infinity) {
this.#availableWriteBytes = -1;
} else {
this.#availableWriteBytes -= length;
}
await this.#dispatcher.sendPacket(
AdbCommand.Write,
this.localId,
this.remoteId,
data,
);
}
async enqueue(data: Uint8Array) { async enqueue(data: Uint8Array) {
// Consumers can `cancel` the `readable` if they are not interested in future data. // Consumers can `cancel` the `readable` if they are not interested in future data.
// Throw away the data if that happens. // Throw away the data if that happens.

View file

@ -0,0 +1,109 @@
import { describe, expect, it, jest } from "@jest/globals";
import { WritableStream } from "./stream.js";
import { WrapWritableStream } from "./wrap-writable.js";
describe("WrapWritableStream", () => {
describe("start", () => {
it("should accept a WritableStream", async () => {
const stream = new WritableStream();
const wrapper = new WrapWritableStream(stream);
await wrapper.close();
expect(wrapper.writable).toBe(stream);
});
it("should accept a start function", async () => {
const stream = new WritableStream();
const start = jest.fn<() => WritableStream<number>>(() => stream);
const wrapper = new WrapWritableStream(start);
await stream.close();
expect(start).toHaveBeenCalledTimes(1);
expect(wrapper.writable).toBe(stream);
});
it("should accept a start object", async () => {
const stream = new WritableStream();
const start = jest.fn<() => WritableStream<number>>(() => stream);
const wrapper = new WrapWritableStream({ start });
await wrapper.close();
expect(start).toHaveBeenCalledTimes(1);
expect(wrapper.writable).toBe(stream);
});
});
describe("write", () => {
it("should write to inner stream", async () => {
const write = jest.fn<() => void>();
const stream = new WrapWritableStream(
new WritableStream({
write,
}),
);
const writer = stream.getWriter();
const data = {};
await writer.write(data);
await writer.close();
expect(write).toHaveBeenCalledTimes(1);
expect(write).toHaveBeenCalledWith(data, expect.anything());
});
});
describe("close", () => {
it("should close wrapper", async () => {
const close = jest.fn<() => void>();
const stream = new WrapWritableStream({
start() {
return new WritableStream();
},
close,
});
await expect(stream.close()).resolves.toBe(undefined);
expect(close).toHaveBeenCalledTimes(1);
});
it("should close inner stream", async () => {
const close = jest.fn<() => void>();
const stream = new WrapWritableStream(
new WritableStream({
close,
}),
);
await expect(stream.close()).resolves.toBe(undefined);
expect(close).toHaveBeenCalledTimes(1);
});
it("should not close inner stream twice", async () => {
const stream = new WrapWritableStream(new WritableStream());
await expect(stream.close()).resolves.toBe(undefined);
});
});
describe("abort", () => {
it("should close wrapper", async () => {
const close = jest.fn<() => void>();
const stream = new WrapWritableStream({
start() {
return new WritableStream();
},
close,
});
await expect(stream.abort()).resolves.toBe(undefined);
expect(close).toHaveBeenCalledTimes(1);
});
it("should abort inner stream", async () => {
const abort = jest.fn<() => void>();
const stream = new WrapWritableStream(
new WritableStream({
abort,
}),
);
await expect(stream.abort()).resolves.toBe(undefined);
expect(abort).toHaveBeenCalledTimes(1);
});
it("should not close inner stream twice", async () => {
const stream = new WrapWritableStream(new WritableStream());
await expect(stream.abort()).resolves.toBe(undefined);
});
});
});

View file

@ -13,19 +13,19 @@ export interface WritableStreamWrapper<T> {
} }
async function getWrappedWritableStream<T>( async function getWrappedWritableStream<T>(
wrapper: start:
| WritableStream<T> | WritableStream<T>
| WrapWritableStreamStart<T> | WrapWritableStreamStart<T>
| WritableStreamWrapper<T>, | WritableStreamWrapper<T>,
) { ) {
if ("start" in wrapper) { if ("start" in start) {
return await wrapper.start(); return await start.start();
} else if (typeof wrapper === "function") { } else if (typeof start === "function") {
return await wrapper(); return await start();
} else { } else {
// Can't use `wrapper instanceof WritableStream` // Can't use `wrapper instanceof WritableStream`
// Because we want to be compatible with any WritableStream-like objects // Because we want to be compatible with any WritableStream-like objects
return wrapper; return start;
} }
} }
@ -35,7 +35,7 @@ export class WrapWritableStream<T> extends WritableStream<T> {
#writer!: WritableStreamDefaultWriter<T>; #writer!: WritableStreamDefaultWriter<T>;
constructor( constructor(
wrapper: start:
| WritableStream<T> | WritableStream<T>
| WrapWritableStreamStart<T> | WrapWritableStreamStart<T>
| WritableStreamWrapper<T>, | WritableStreamWrapper<T>,
@ -48,7 +48,7 @@ export class WrapWritableStream<T> extends WritableStream<T> {
// Queue a microtask to avoid this. // Queue a microtask to avoid this.
await Promise.resolve(); await Promise.resolve();
this.writable = await getWrappedWritableStream(wrapper); this.writable = await getWrappedWritableStream(start);
this.#writer = this.writable.getWriter(); this.#writer = this.writable.getWriter();
}, },
write: async (chunk) => { write: async (chunk) => {
@ -56,8 +56,8 @@ export class WrapWritableStream<T> extends WritableStream<T> {
}, },
abort: async (reason) => { abort: async (reason) => {
await this.#writer.abort(reason); await this.#writer.abort(reason);
if (wrapper !== this.writable && "close" in wrapper) { if (start !== this.writable && "close" in start) {
await wrapper.close?.(); await start.close?.();
} }
}, },
close: async () => { close: async () => {
@ -66,8 +66,8 @@ export class WrapWritableStream<T> extends WritableStream<T> {
// closing the outer stream first will make the inner stream incapable of // closing the outer stream first will make the inner stream incapable of
// sending data in its `close` handler. // sending data in its `close` handler.
await this.#writer.close(); await this.#writer.close();
if (wrapper !== this.writable && "close" in wrapper) { if (start !== this.writable && "close" in start) {
await wrapper.close?.(); await start.close?.();
} }
}, },
}); });