feat(adb): make DeviceObserver#onListChange sticky

This commit is contained in:
Simon Chan 2025-04-04 01:27:47 +08:00
parent fe06652f52
commit 05c01adbd1
No known key found for this signature in database
GPG key ID: A8B69F750B9BCEDD
8 changed files with 135 additions and 96 deletions

View file

@ -0,0 +1,7 @@
---
"@yume-chan/adb-daemon-webusb": minor
"@yume-chan/event": minor
"@yume-chan/adb": minor
---
Make `DeviceObserver#onListChange` sticky

View file

@ -1,4 +1,5 @@
import type { DeviceObserver } from "@yume-chan/adb"; import type { DeviceObserver } from "@yume-chan/adb";
import { unorderedRemove } from "@yume-chan/adb";
import { EventEmitter, StickyEventEmitter } from "@yume-chan/event"; import { EventEmitter, StickyEventEmitter } from "@yume-chan/event";
import { import {
@ -24,20 +25,26 @@ export class AdbDaemonWebUsbDeviceObserver
return new AdbDaemonWebUsbDeviceObserver(usb, devices, options); return new AdbDaemonWebUsbDeviceObserver(usb, devices, options);
} }
#filters: (USBDeviceFilter & UsbInterfaceFilter)[]; readonly #filters: (USBDeviceFilter & UsbInterfaceFilter)[];
#exclusionFilters?: USBDeviceFilter[] | undefined; readonly #exclusionFilters?: USBDeviceFilter[] | undefined;
#usbManager: USB; readonly #usbManager: USB;
#onDeviceAdd = new EventEmitter<AdbDaemonWebUsbDevice[]>(); readonly #onDeviceAdd = new EventEmitter<
readonly AdbDaemonWebUsbDevice[]
>();
onDeviceAdd = this.#onDeviceAdd.event; onDeviceAdd = this.#onDeviceAdd.event;
#onDeviceRemove = new EventEmitter<AdbDaemonWebUsbDevice[]>(); readonly #onDeviceRemove = new EventEmitter<
readonly AdbDaemonWebUsbDevice[]
>();
onDeviceRemove = this.#onDeviceRemove.event; onDeviceRemove = this.#onDeviceRemove.event;
#onListChange = new StickyEventEmitter<AdbDaemonWebUsbDevice[]>(); readonly #onListChange = new StickyEventEmitter<
readonly AdbDaemonWebUsbDevice[]
>();
onListChange = this.#onListChange.event; onListChange = this.#onListChange.event;
current: AdbDaemonWebUsbDevice[] = []; current: readonly AdbDaemonWebUsbDevice[] = [];
constructor( constructor(
usb: USB, usb: USB,
@ -47,9 +54,12 @@ export class AdbDaemonWebUsbDeviceObserver
this.#filters = mergeDefaultAdbInterfaceFilter(options.filters); this.#filters = mergeDefaultAdbInterfaceFilter(options.filters);
this.#exclusionFilters = options.exclusionFilters; this.#exclusionFilters = options.exclusionFilters;
this.#usbManager = usb; this.#usbManager = usb;
this.current = initial this.current = initial
.map((device) => this.#convertDevice(device)) .map((device) => this.#convertDevice(device))
.filter((device) => !!device); .filter((device) => !!device);
// Fire `onListChange` to set the sticky value
this.#onListChange.fire(this.current);
this.#usbManager.addEventListener("connect", this.#handleConnect); this.#usbManager.addEventListener("connect", this.#handleConnect);
this.#usbManager.addEventListener("disconnect", this.#handleDisconnect); this.#usbManager.addEventListener("disconnect", this.#handleDisconnect);
@ -74,8 +84,11 @@ export class AdbDaemonWebUsbDeviceObserver
return; return;
} }
const next = this.current.slice();
next.push(device);
this.current = next;
this.#onDeviceAdd.fire([device]); this.#onDeviceAdd.fire([device]);
this.current.push(device);
this.#onListChange.fire(this.current); this.#onListChange.fire(this.current);
}; };
@ -85,9 +98,12 @@ export class AdbDaemonWebUsbDeviceObserver
); );
if (index !== -1) { if (index !== -1) {
const device = this.current[index]!; const device = this.current[index]!;
const next = this.current.slice();
unorderedRemove(next, index);
this.current = next;
this.#onDeviceRemove.fire([device]); this.#onDeviceRemove.fire([device]);
this.current[index] = this.current[this.current.length - 1]!;
this.current.length -= 1;
this.#onListChange.fire(this.current); this.#onListChange.fire(this.current);
} }
}; };

View file

@ -2,9 +2,9 @@ import type { MaybePromiseLike } from "@yume-chan/async";
import type { Event } from "@yume-chan/event"; import type { Event } from "@yume-chan/event";
export interface DeviceObserver<T> { export interface DeviceObserver<T> {
onDeviceAdd: Event<T[]>; readonly onDeviceAdd: Event<readonly T[]>;
onDeviceRemove: Event<T[]>; readonly onDeviceRemove: Event<readonly T[]>;
onListChange: Event<T[]>; readonly onListChange: Event<readonly T[]>;
current: T[]; readonly current: readonly T[];
stop(): MaybePromiseLike<void>; stop(): MaybePromiseLike<void>;
} }

View file

@ -5,20 +5,23 @@ import { Ref } from "../utils/index.js";
import { AdbServerClient } from "./client.js"; import { AdbServerClient } from "./client.js";
import type { AdbServerStream } from "./stream.js"; import type { AdbServerStream } from "./stream.js";
function unorderedRemove<T>(array: T[], index: number) { export function unorderedRemove<T>(array: T[], index: number) {
if (index < 0 || index >= array.length) {
return;
}
array[index] = array[array.length - 1]!; array[index] = array[array.length - 1]!;
array.length -= 1; array.length -= 1;
} }
export class AdbServerDeviceObserverOwner { export class AdbServerDeviceObserverOwner {
current: AdbServerClient.Device[] = []; current: readonly AdbServerClient.Device[] = [];
#client: AdbServerClient; readonly #client: AdbServerClient;
#stream: Promise<AdbServerStream> | undefined; #stream: Promise<AdbServerStream> | undefined;
#observers: { #observers: {
onDeviceAdd: EventEmitter<AdbServerClient.Device[]>; onDeviceAdd: EventEmitter<readonly AdbServerClient.Device[]>;
onDeviceRemove: EventEmitter<AdbServerClient.Device[]>; onDeviceRemove: EventEmitter<readonly AdbServerClient.Device[]>;
onListChange: EventEmitter<AdbServerClient.Device[]>; onListChange: EventEmitter<readonly AdbServerClient.Device[]>;
onError: EventEmitter<Error>; onError: EventEmitter<Error>;
}[] = []; }[] = [];
@ -27,42 +30,50 @@ export class AdbServerDeviceObserverOwner {
} }
async #receive(stream: AdbServerStream) { async #receive(stream: AdbServerStream) {
const response = await stream.readString();
const next = AdbServerClient.parseDeviceList(response);
const removed = this.current.slice();
const added: AdbServerClient.Device[] = [];
for (const nextDevice of next) {
const index = removed.findIndex(
(device) => device.transportId === nextDevice.transportId,
);
if (index === -1) {
added.push(nextDevice);
continue;
}
unorderedRemove(removed, index);
}
this.current = next;
if (added.length) {
for (const observer of this.#observers) {
observer.onDeviceAdd.fire(added);
}
}
if (removed.length) {
for (const observer of this.#observers) {
observer.onDeviceRemove.fire(removed);
}
}
for (const observer of this.#observers) {
observer.onListChange.fire(this.current);
}
}
async #receiveLoop(stream: AdbServerStream) {
try { try {
while (true) { while (true) {
const response = await stream.readString(); await this.#receive(stream);
const next = AdbServerClient.parseDeviceList(response);
const added: AdbServerClient.Device[] = [];
for (const nextDevice of next) {
const index = this.current.findIndex(
(device) =>
device.transportId === nextDevice.transportId,
);
if (index === -1) {
added.push(nextDevice);
continue;
}
unorderedRemove(this.current, index);
}
if (added.length) {
for (const observer of this.#observers) {
observer.onDeviceAdd.fire(added);
}
}
if (this.current.length) {
for (const observer of this.#observers) {
observer.onDeviceRemove.fire(this.current);
}
}
this.current = next;
for (const observer of this.#observers) {
observer.onListChange.fire(this.current);
}
} }
} catch (e) { } catch (e) {
this.#stream = undefined;
for (const observer of this.#observers) { for (const observer of this.#observers) {
observer.onError.fire(e as Error); observer.onError.fire(e as Error);
} }
@ -76,7 +87,11 @@ export class AdbServerDeviceObserverOwner {
{ unref: true }, { unref: true },
); );
void this.#receive(stream); // Set `current` and `onListChange` value before returning
await this.#receive(stream);
// Then start receive loop
void this.#receiveLoop(stream);
return stream; return stream;
} }
@ -91,55 +106,65 @@ export class AdbServerDeviceObserverOwner {
async createObserver( async createObserver(
options?: AdbServerClient.ServerConnectionOptions, options?: AdbServerClient.ServerConnectionOptions,
): Promise<AdbServerClient.DeviceObserver> { ): Promise<AdbServerClient.DeviceObserver> {
if (options?.signal?.aborted) { options?.signal?.throwIfAborted();
throw options.signal.reason;
}
const onDeviceAdd = new EventEmitter<AdbServerClient.Device[]>(); const onDeviceAdd = new EventEmitter<
const onDeviceRemove = new EventEmitter<AdbServerClient.Device[]>(); readonly AdbServerClient.Device[]
const onListChange = new StickyEventEmitter<AdbServerClient.Device[]>(); >();
const onDeviceRemove = new EventEmitter<
readonly AdbServerClient.Device[]
>();
const onListChange = new StickyEventEmitter<
readonly AdbServerClient.Device[]
>();
const onError = new StickyEventEmitter<Error>(); const onError = new StickyEventEmitter<Error>();
const observer = { onDeviceAdd, onDeviceRemove, onListChange, onError }; const observer = { onDeviceAdd, onDeviceRemove, onListChange, onError };
// Register `observer` before `#connect`. // Register `observer` before `#connect`.
// Because `#connect` might immediately receive some data // So `#handleObserverStop` knows if there is any observer.
// and want to trigger observers
this.#observers.push(observer); this.#observers.push(observer);
this.#stream ??= this.#connect(); let stream: AdbServerStream;
const stream = await this.#stream; if (!this.#stream) {
this.#stream = this.#connect();
if (options?.signal?.aborted) { try {
await this.#handleObserverStop(stream); stream = await this.#stream;
throw options.signal.reason; } catch (e) {
this.#stream = undefined;
throw e;
}
} else {
stream = await this.#stream;
onListChange.fire(this.current);
} }
const ref = new Ref(options); const ref = new Ref(options);
const stop = async () => { const stop = async () => {
const index = self.#observers.indexOf(observer); unorderedRemove(this.#observers, this.#observers.indexOf(observer));
if (index === -1) {
return;
}
unorderedRemove(this.#observers, index);
await this.#handleObserverStop(stream); await this.#handleObserverStop(stream);
ref.unref(); ref.unref();
}; };
options?.signal?.addEventListener("abort", () => void stop()); if (options?.signal) {
if (options.signal.aborted) {
await stop();
throw options.signal.reason;
}
options.signal.addEventListener("abort", () => void stop());
}
// eslint-disable-next-line @typescript-eslint/no-this-alias // eslint-disable-next-line @typescript-eslint/no-this-alias
const self = this; const _this = this;
return { return {
onDeviceAdd: onDeviceAdd.event, onDeviceAdd: onDeviceAdd.event,
onDeviceRemove: onDeviceRemove.event, onDeviceRemove: onDeviceRemove.event,
onListChange: onListChange.event, onListChange: onListChange.event,
onError: onError.event, onError: onError.event,
get current() { get current() {
return self.current; return _this.current;
}, },
stop, stop,
}; };

View file

@ -1,5 +1,5 @@
import type { Disposable } from "./disposable.js"; import type { Disposable } from "./disposable.js";
import type { EventListener, RemoveEventListener } from "./event.js"; import type { Event, EventListener, RemoveEventListener } from "./event.js";
export interface EventListenerInfo<TEvent, TResult = unknown> { export interface EventListenerInfo<TEvent, TResult = unknown> {
listener: EventListener<TEvent, unknown, unknown[], TResult>; listener: EventListener<TEvent, unknown, unknown[], TResult>;
@ -9,17 +9,6 @@ export interface EventListenerInfo<TEvent, TResult = unknown> {
args: unknown[]; args: unknown[];
} }
export interface AddEventListener<TEvent, TResult = unknown> {
(
listener: EventListener<TEvent, unknown, [], TResult>,
): RemoveEventListener;
<TThis, TArgs extends unknown[]>(
listener: EventListener<TEvent, TThis, TArgs, TResult>,
thisArg: TThis,
...args: TArgs
): RemoveEventListener;
}
export class EventEmitter<TEvent, TResult = unknown> implements Disposable { export class EventEmitter<TEvent, TResult = unknown> implements Disposable {
protected readonly listeners: EventListenerInfo<TEvent, TResult>[] = []; protected readonly listeners: EventListenerInfo<TEvent, TResult>[] = [];
@ -42,7 +31,7 @@ export class EventEmitter<TEvent, TResult = unknown> implements Disposable {
return remove; return remove;
} }
event: AddEventListener<TEvent, TResult> = <TThis, TArgs extends unknown[]>( event: Event<TEvent, TResult> = <TThis, TArgs extends unknown[]>(
listener: EventListener<TEvent, TThis, TArgs, TResult>, listener: EventListener<TEvent, TThis, TArgs, TResult>,
thisArg?: TThis, thisArg?: TThis,
...args: TArgs ...args: TArgs

View file

@ -2,16 +2,18 @@ import type { EventListenerInfo } from "./event-emitter.js";
import { EventEmitter } from "./event-emitter.js"; import { EventEmitter } from "./event-emitter.js";
import type { RemoveEventListener } from "./event.js"; import type { RemoveEventListener } from "./event.js";
const Undefined = Symbol("undefined");
export class StickyEventEmitter<TEvent, TResult = unknown> extends EventEmitter< export class StickyEventEmitter<TEvent, TResult = unknown> extends EventEmitter<
TEvent, TEvent,
TResult TResult
> { > {
#value: TEvent | undefined; #value: TEvent | typeof Undefined = Undefined;
protected override addEventListener( protected override addEventListener(
info: EventListenerInfo<TEvent, TResult>, info: EventListenerInfo<TEvent, TResult>,
): RemoveEventListener { ): RemoveEventListener {
if (this.#value) { if (this.#value !== Undefined) {
info.listener.call(info.thisArg, this.#value, ...info.args); info.listener.call(info.thisArg, this.#value, ...info.args);
} }
return super.addEventListener(info); return super.addEventListener(info);

View file

@ -1,5 +1,5 @@
import { PromiseResolver } from "@yume-chan/async"; import { PromiseResolver } from "@yume-chan/async";
import { EventEmitter } from "@yume-chan/event"; import { StickyEventEmitter } from "@yume-chan/event";
import type { ScrcpyMediaStreamPacket } from "@yume-chan/scrcpy"; import type { ScrcpyMediaStreamPacket } from "@yume-chan/scrcpy";
import { import {
AndroidAvcLevel, AndroidAvcLevel,
@ -45,7 +45,7 @@ export class TinyH264Decoder implements ScrcpyVideoDecoder {
return this.#renderer; return this.#renderer;
} }
#sizeChanged = new EventEmitter<{ width: number; height: number }>(); #sizeChanged = new StickyEventEmitter<{ width: number; height: number }>();
get sizeChanged() { get sizeChanged() {
return this.#sizeChanged.event; return this.#sizeChanged.event;
} }

View file

@ -1,4 +1,4 @@
import { EventEmitter } from "@yume-chan/event"; import { StickyEventEmitter } from "@yume-chan/event";
import type { ScrcpyMediaStreamPacket } from "@yume-chan/scrcpy"; import type { ScrcpyMediaStreamPacket } from "@yume-chan/scrcpy";
import { ScrcpyVideoCodecId } from "@yume-chan/scrcpy"; import { ScrcpyVideoCodecId } from "@yume-chan/scrcpy";
import type { import type {
@ -61,7 +61,7 @@ export class WebCodecsVideoDecoder implements ScrcpyVideoDecoder {
return this.#framesSkipped; return this.#framesSkipped;
} }
#sizeChanged = new EventEmitter<{ width: number; height: number }>(); #sizeChanged = new StickyEventEmitter<{ width: number; height: number }>();
get sizeChanged() { get sizeChanged() {
return this.#sizeChanged.event; return this.#sizeChanged.event;
} }