feat(adb): make DeviceObserver#onListChange sticky

This commit is contained in:
Simon Chan 2025-04-04 01:27:47 +08:00
parent fe06652f52
commit 05c01adbd1
No known key found for this signature in database
GPG key ID: A8B69F750B9BCEDD
8 changed files with 135 additions and 96 deletions

View file

@ -0,0 +1,7 @@
---
"@yume-chan/adb-daemon-webusb": minor
"@yume-chan/event": minor
"@yume-chan/adb": minor
---
Make `DeviceObserver#onListChange` sticky

View file

@ -1,4 +1,5 @@
import type { DeviceObserver } from "@yume-chan/adb";
import { unorderedRemove } from "@yume-chan/adb";
import { EventEmitter, StickyEventEmitter } from "@yume-chan/event";
import {
@ -24,20 +25,26 @@ export class AdbDaemonWebUsbDeviceObserver
return new AdbDaemonWebUsbDeviceObserver(usb, devices, options);
}
#filters: (USBDeviceFilter & UsbInterfaceFilter)[];
#exclusionFilters?: USBDeviceFilter[] | undefined;
#usbManager: USB;
readonly #filters: (USBDeviceFilter & UsbInterfaceFilter)[];
readonly #exclusionFilters?: USBDeviceFilter[] | undefined;
readonly #usbManager: USB;
#onDeviceAdd = new EventEmitter<AdbDaemonWebUsbDevice[]>();
readonly #onDeviceAdd = new EventEmitter<
readonly AdbDaemonWebUsbDevice[]
>();
onDeviceAdd = this.#onDeviceAdd.event;
#onDeviceRemove = new EventEmitter<AdbDaemonWebUsbDevice[]>();
readonly #onDeviceRemove = new EventEmitter<
readonly AdbDaemonWebUsbDevice[]
>();
onDeviceRemove = this.#onDeviceRemove.event;
#onListChange = new StickyEventEmitter<AdbDaemonWebUsbDevice[]>();
readonly #onListChange = new StickyEventEmitter<
readonly AdbDaemonWebUsbDevice[]
>();
onListChange = this.#onListChange.event;
current: AdbDaemonWebUsbDevice[] = [];
current: readonly AdbDaemonWebUsbDevice[] = [];
constructor(
usb: USB,
@ -47,9 +54,12 @@ export class AdbDaemonWebUsbDeviceObserver
this.#filters = mergeDefaultAdbInterfaceFilter(options.filters);
this.#exclusionFilters = options.exclusionFilters;
this.#usbManager = usb;
this.current = initial
.map((device) => this.#convertDevice(device))
.filter((device) => !!device);
// Fire `onListChange` to set the sticky value
this.#onListChange.fire(this.current);
this.#usbManager.addEventListener("connect", this.#handleConnect);
this.#usbManager.addEventListener("disconnect", this.#handleDisconnect);
@ -74,8 +84,11 @@ export class AdbDaemonWebUsbDeviceObserver
return;
}
const next = this.current.slice();
next.push(device);
this.current = next;
this.#onDeviceAdd.fire([device]);
this.current.push(device);
this.#onListChange.fire(this.current);
};
@ -85,9 +98,12 @@ export class AdbDaemonWebUsbDeviceObserver
);
if (index !== -1) {
const device = this.current[index]!;
const next = this.current.slice();
unorderedRemove(next, index);
this.current = next;
this.#onDeviceRemove.fire([device]);
this.current[index] = this.current[this.current.length - 1]!;
this.current.length -= 1;
this.#onListChange.fire(this.current);
}
};

View file

@ -2,9 +2,9 @@ import type { MaybePromiseLike } from "@yume-chan/async";
import type { Event } from "@yume-chan/event";
export interface DeviceObserver<T> {
onDeviceAdd: Event<T[]>;
onDeviceRemove: Event<T[]>;
onListChange: Event<T[]>;
current: T[];
readonly onDeviceAdd: Event<readonly T[]>;
readonly onDeviceRemove: Event<readonly T[]>;
readonly onListChange: Event<readonly T[]>;
readonly current: readonly T[];
stop(): MaybePromiseLike<void>;
}

View file

@ -5,20 +5,23 @@ import { Ref } from "../utils/index.js";
import { AdbServerClient } from "./client.js";
import type { AdbServerStream } from "./stream.js";
function unorderedRemove<T>(array: T[], index: number) {
export function unorderedRemove<T>(array: T[], index: number) {
if (index < 0 || index >= array.length) {
return;
}
array[index] = array[array.length - 1]!;
array.length -= 1;
}
export class AdbServerDeviceObserverOwner {
current: AdbServerClient.Device[] = [];
current: readonly AdbServerClient.Device[] = [];
#client: AdbServerClient;
readonly #client: AdbServerClient;
#stream: Promise<AdbServerStream> | undefined;
#observers: {
onDeviceAdd: EventEmitter<AdbServerClient.Device[]>;
onDeviceRemove: EventEmitter<AdbServerClient.Device[]>;
onListChange: EventEmitter<AdbServerClient.Device[]>;
onDeviceAdd: EventEmitter<readonly AdbServerClient.Device[]>;
onDeviceRemove: EventEmitter<readonly AdbServerClient.Device[]>;
onListChange: EventEmitter<readonly AdbServerClient.Device[]>;
onError: EventEmitter<Error>;
}[] = [];
@ -27,42 +30,50 @@ export class AdbServerDeviceObserverOwner {
}
async #receive(stream: AdbServerStream) {
const response = await stream.readString();
const next = AdbServerClient.parseDeviceList(response);
const removed = this.current.slice();
const added: AdbServerClient.Device[] = [];
for (const nextDevice of next) {
const index = removed.findIndex(
(device) => device.transportId === nextDevice.transportId,
);
if (index === -1) {
added.push(nextDevice);
continue;
}
unorderedRemove(removed, index);
}
this.current = next;
if (added.length) {
for (const observer of this.#observers) {
observer.onDeviceAdd.fire(added);
}
}
if (removed.length) {
for (const observer of this.#observers) {
observer.onDeviceRemove.fire(removed);
}
}
for (const observer of this.#observers) {
observer.onListChange.fire(this.current);
}
}
async #receiveLoop(stream: AdbServerStream) {
try {
while (true) {
const response = await stream.readString();
const next = AdbServerClient.parseDeviceList(response);
const added: AdbServerClient.Device[] = [];
for (const nextDevice of next) {
const index = this.current.findIndex(
(device) =>
device.transportId === nextDevice.transportId,
);
if (index === -1) {
added.push(nextDevice);
continue;
}
unorderedRemove(this.current, index);
}
if (added.length) {
for (const observer of this.#observers) {
observer.onDeviceAdd.fire(added);
}
}
if (this.current.length) {
for (const observer of this.#observers) {
observer.onDeviceRemove.fire(this.current);
}
}
this.current = next;
for (const observer of this.#observers) {
observer.onListChange.fire(this.current);
}
await this.#receive(stream);
}
} catch (e) {
this.#stream = undefined;
for (const observer of this.#observers) {
observer.onError.fire(e as Error);
}
@ -76,7 +87,11 @@ export class AdbServerDeviceObserverOwner {
{ unref: true },
);
void this.#receive(stream);
// Set `current` and `onListChange` value before returning
await this.#receive(stream);
// Then start receive loop
void this.#receiveLoop(stream);
return stream;
}
@ -91,55 +106,65 @@ export class AdbServerDeviceObserverOwner {
async createObserver(
options?: AdbServerClient.ServerConnectionOptions,
): Promise<AdbServerClient.DeviceObserver> {
if (options?.signal?.aborted) {
throw options.signal.reason;
}
options?.signal?.throwIfAborted();
const onDeviceAdd = new EventEmitter<AdbServerClient.Device[]>();
const onDeviceRemove = new EventEmitter<AdbServerClient.Device[]>();
const onListChange = new StickyEventEmitter<AdbServerClient.Device[]>();
const onDeviceAdd = new EventEmitter<
readonly AdbServerClient.Device[]
>();
const onDeviceRemove = new EventEmitter<
readonly AdbServerClient.Device[]
>();
const onListChange = new StickyEventEmitter<
readonly AdbServerClient.Device[]
>();
const onError = new StickyEventEmitter<Error>();
const observer = { onDeviceAdd, onDeviceRemove, onListChange, onError };
// Register `observer` before `#connect`.
// Because `#connect` might immediately receive some data
// and want to trigger observers
// So `#handleObserverStop` knows if there is any observer.
this.#observers.push(observer);
this.#stream ??= this.#connect();
const stream = await this.#stream;
let stream: AdbServerStream;
if (!this.#stream) {
this.#stream = this.#connect();
if (options?.signal?.aborted) {
await this.#handleObserverStop(stream);
throw options.signal.reason;
try {
stream = await this.#stream;
} catch (e) {
this.#stream = undefined;
throw e;
}
} else {
stream = await this.#stream;
onListChange.fire(this.current);
}
const ref = new Ref(options);
const stop = async () => {
const index = self.#observers.indexOf(observer);
if (index === -1) {
return;
}
unorderedRemove(this.#observers, index);
unorderedRemove(this.#observers, this.#observers.indexOf(observer));
await this.#handleObserverStop(stream);
ref.unref();
};
options?.signal?.addEventListener("abort", () => void stop());
if (options?.signal) {
if (options.signal.aborted) {
await stop();
throw options.signal.reason;
}
options.signal.addEventListener("abort", () => void stop());
}
// eslint-disable-next-line @typescript-eslint/no-this-alias
const self = this;
const _this = this;
return {
onDeviceAdd: onDeviceAdd.event,
onDeviceRemove: onDeviceRemove.event,
onListChange: onListChange.event,
onError: onError.event,
get current() {
return self.current;
return _this.current;
},
stop,
};

View file

@ -1,5 +1,5 @@
import type { Disposable } from "./disposable.js";
import type { EventListener, RemoveEventListener } from "./event.js";
import type { Event, EventListener, RemoveEventListener } from "./event.js";
export interface EventListenerInfo<TEvent, TResult = unknown> {
listener: EventListener<TEvent, unknown, unknown[], TResult>;
@ -9,17 +9,6 @@ export interface EventListenerInfo<TEvent, TResult = unknown> {
args: unknown[];
}
export interface AddEventListener<TEvent, TResult = unknown> {
(
listener: EventListener<TEvent, unknown, [], TResult>,
): RemoveEventListener;
<TThis, TArgs extends unknown[]>(
listener: EventListener<TEvent, TThis, TArgs, TResult>,
thisArg: TThis,
...args: TArgs
): RemoveEventListener;
}
export class EventEmitter<TEvent, TResult = unknown> implements Disposable {
protected readonly listeners: EventListenerInfo<TEvent, TResult>[] = [];
@ -42,7 +31,7 @@ export class EventEmitter<TEvent, TResult = unknown> implements Disposable {
return remove;
}
event: AddEventListener<TEvent, TResult> = <TThis, TArgs extends unknown[]>(
event: Event<TEvent, TResult> = <TThis, TArgs extends unknown[]>(
listener: EventListener<TEvent, TThis, TArgs, TResult>,
thisArg?: TThis,
...args: TArgs

View file

@ -2,16 +2,18 @@ import type { EventListenerInfo } from "./event-emitter.js";
import { EventEmitter } from "./event-emitter.js";
import type { RemoveEventListener } from "./event.js";
const Undefined = Symbol("undefined");
export class StickyEventEmitter<TEvent, TResult = unknown> extends EventEmitter<
TEvent,
TResult
> {
#value: TEvent | undefined;
#value: TEvent | typeof Undefined = Undefined;
protected override addEventListener(
info: EventListenerInfo<TEvent, TResult>,
): RemoveEventListener {
if (this.#value) {
if (this.#value !== Undefined) {
info.listener.call(info.thisArg, this.#value, ...info.args);
}
return super.addEventListener(info);

View file

@ -1,5 +1,5 @@
import { PromiseResolver } from "@yume-chan/async";
import { EventEmitter } from "@yume-chan/event";
import { StickyEventEmitter } from "@yume-chan/event";
import type { ScrcpyMediaStreamPacket } from "@yume-chan/scrcpy";
import {
AndroidAvcLevel,
@ -45,7 +45,7 @@ export class TinyH264Decoder implements ScrcpyVideoDecoder {
return this.#renderer;
}
#sizeChanged = new EventEmitter<{ width: number; height: number }>();
#sizeChanged = new StickyEventEmitter<{ width: number; height: number }>();
get sizeChanged() {
return this.#sizeChanged.event;
}

View file

@ -1,4 +1,4 @@
import { EventEmitter } from "@yume-chan/event";
import { StickyEventEmitter } from "@yume-chan/event";
import type { ScrcpyMediaStreamPacket } from "@yume-chan/scrcpy";
import { ScrcpyVideoCodecId } from "@yume-chan/scrcpy";
import type {
@ -61,7 +61,7 @@ export class WebCodecsVideoDecoder implements ScrcpyVideoDecoder {
return this.#framesSkipped;
}
#sizeChanged = new EventEmitter<{ width: number; height: number }>();
#sizeChanged = new StickyEventEmitter<{ width: number; height: number }>();
get sizeChanged() {
return this.#sizeChanged.event;
}