diff --git a/.changeset/five-glasses-attend.md b/.changeset/five-glasses-attend.md new file mode 100644 index 00000000..e4e25280 --- /dev/null +++ b/.changeset/five-glasses-attend.md @@ -0,0 +1,7 @@ +--- +"@yume-chan/adb-daemon-webusb": minor +"@yume-chan/event": minor +"@yume-chan/adb": minor +--- + +Make `DeviceObserver#onListChange` sticky diff --git a/libraries/adb-daemon-webusb/src/observer.ts b/libraries/adb-daemon-webusb/src/observer.ts index 5fa8ae88..a1fd8f4d 100644 --- a/libraries/adb-daemon-webusb/src/observer.ts +++ b/libraries/adb-daemon-webusb/src/observer.ts @@ -1,4 +1,5 @@ import type { DeviceObserver } from "@yume-chan/adb"; +import { unorderedRemove } from "@yume-chan/adb"; import { EventEmitter, StickyEventEmitter } from "@yume-chan/event"; import { @@ -24,20 +25,26 @@ export class AdbDaemonWebUsbDeviceObserver return new AdbDaemonWebUsbDeviceObserver(usb, devices, options); } - #filters: (USBDeviceFilter & UsbInterfaceFilter)[]; - #exclusionFilters?: USBDeviceFilter[] | undefined; - #usbManager: USB; + readonly #filters: (USBDeviceFilter & UsbInterfaceFilter)[]; + readonly #exclusionFilters?: USBDeviceFilter[] | undefined; + readonly #usbManager: USB; - #onDeviceAdd = new EventEmitter(); + readonly #onDeviceAdd = new EventEmitter< + readonly AdbDaemonWebUsbDevice[] + >(); onDeviceAdd = this.#onDeviceAdd.event; - #onDeviceRemove = new EventEmitter(); + readonly #onDeviceRemove = new EventEmitter< + readonly AdbDaemonWebUsbDevice[] + >(); onDeviceRemove = this.#onDeviceRemove.event; - #onListChange = new StickyEventEmitter(); + readonly #onListChange = new StickyEventEmitter< + readonly AdbDaemonWebUsbDevice[] + >(); onListChange = this.#onListChange.event; - current: AdbDaemonWebUsbDevice[] = []; + current: readonly AdbDaemonWebUsbDevice[] = []; constructor( usb: USB, @@ -47,9 +54,12 @@ export class AdbDaemonWebUsbDeviceObserver this.#filters = mergeDefaultAdbInterfaceFilter(options.filters); this.#exclusionFilters = options.exclusionFilters; this.#usbManager = usb; + this.current = initial .map((device) => this.#convertDevice(device)) .filter((device) => !!device); + // Fire `onListChange` to set the sticky value + this.#onListChange.fire(this.current); this.#usbManager.addEventListener("connect", this.#handleConnect); this.#usbManager.addEventListener("disconnect", this.#handleDisconnect); @@ -74,8 +84,11 @@ export class AdbDaemonWebUsbDeviceObserver return; } + const next = this.current.slice(); + next.push(device); + this.current = next; + this.#onDeviceAdd.fire([device]); - this.current.push(device); this.#onListChange.fire(this.current); }; @@ -85,9 +98,12 @@ export class AdbDaemonWebUsbDeviceObserver ); if (index !== -1) { const device = this.current[index]!; + + const next = this.current.slice(); + unorderedRemove(next, index); + this.current = next; + this.#onDeviceRemove.fire([device]); - this.current[index] = this.current[this.current.length - 1]!; - this.current.length -= 1; this.#onListChange.fire(this.current); } }; diff --git a/libraries/adb/src/device-observer.ts b/libraries/adb/src/device-observer.ts index f00ed26c..0af721f4 100644 --- a/libraries/adb/src/device-observer.ts +++ b/libraries/adb/src/device-observer.ts @@ -2,9 +2,9 @@ import type { MaybePromiseLike } from "@yume-chan/async"; import type { Event } from "@yume-chan/event"; export interface DeviceObserver { - onDeviceAdd: Event; - onDeviceRemove: Event; - onListChange: Event; - current: T[]; + readonly onDeviceAdd: Event; + readonly onDeviceRemove: Event; + readonly onListChange: Event; + readonly current: readonly T[]; stop(): MaybePromiseLike; } diff --git a/libraries/adb/src/server/observer.ts b/libraries/adb/src/server/observer.ts index b8ec2f60..82188c19 100644 --- a/libraries/adb/src/server/observer.ts +++ b/libraries/adb/src/server/observer.ts @@ -5,20 +5,23 @@ import { Ref } from "../utils/index.js"; import { AdbServerClient } from "./client.js"; import type { AdbServerStream } from "./stream.js"; -function unorderedRemove(array: T[], index: number) { +export function unorderedRemove(array: T[], index: number) { + if (index < 0 || index >= array.length) { + return; + } array[index] = array[array.length - 1]!; array.length -= 1; } export class AdbServerDeviceObserverOwner { - current: AdbServerClient.Device[] = []; + current: readonly AdbServerClient.Device[] = []; - #client: AdbServerClient; + readonly #client: AdbServerClient; #stream: Promise | undefined; #observers: { - onDeviceAdd: EventEmitter; - onDeviceRemove: EventEmitter; - onListChange: EventEmitter; + onDeviceAdd: EventEmitter; + onDeviceRemove: EventEmitter; + onListChange: EventEmitter; onError: EventEmitter; }[] = []; @@ -27,42 +30,50 @@ export class AdbServerDeviceObserverOwner { } async #receive(stream: AdbServerStream) { + const response = await stream.readString(); + const next = AdbServerClient.parseDeviceList(response); + + const removed = this.current.slice(); + const added: AdbServerClient.Device[] = []; + for (const nextDevice of next) { + const index = removed.findIndex( + (device) => device.transportId === nextDevice.transportId, + ); + + if (index === -1) { + added.push(nextDevice); + continue; + } + + unorderedRemove(removed, index); + } + + this.current = next; + + if (added.length) { + for (const observer of this.#observers) { + observer.onDeviceAdd.fire(added); + } + } + if (removed.length) { + for (const observer of this.#observers) { + observer.onDeviceRemove.fire(removed); + } + } + + for (const observer of this.#observers) { + observer.onListChange.fire(this.current); + } + } + + async #receiveLoop(stream: AdbServerStream) { try { while (true) { - const response = await stream.readString(); - const next = AdbServerClient.parseDeviceList(response); - - const added: AdbServerClient.Device[] = []; - for (const nextDevice of next) { - const index = this.current.findIndex( - (device) => - device.transportId === nextDevice.transportId, - ); - if (index === -1) { - added.push(nextDevice); - continue; - } - - unorderedRemove(this.current, index); - } - - if (added.length) { - for (const observer of this.#observers) { - observer.onDeviceAdd.fire(added); - } - } - if (this.current.length) { - for (const observer of this.#observers) { - observer.onDeviceRemove.fire(this.current); - } - } - - this.current = next; - for (const observer of this.#observers) { - observer.onListChange.fire(this.current); - } + await this.#receive(stream); } } catch (e) { + this.#stream = undefined; + for (const observer of this.#observers) { observer.onError.fire(e as Error); } @@ -76,7 +87,11 @@ export class AdbServerDeviceObserverOwner { { unref: true }, ); - void this.#receive(stream); + // Set `current` and `onListChange` value before returning + await this.#receive(stream); + + // Then start receive loop + void this.#receiveLoop(stream); return stream; } @@ -91,55 +106,65 @@ export class AdbServerDeviceObserverOwner { async createObserver( options?: AdbServerClient.ServerConnectionOptions, ): Promise { - if (options?.signal?.aborted) { - throw options.signal.reason; - } + options?.signal?.throwIfAborted(); - const onDeviceAdd = new EventEmitter(); - const onDeviceRemove = new EventEmitter(); - const onListChange = new StickyEventEmitter(); + const onDeviceAdd = new EventEmitter< + readonly AdbServerClient.Device[] + >(); + const onDeviceRemove = new EventEmitter< + readonly AdbServerClient.Device[] + >(); + const onListChange = new StickyEventEmitter< + readonly AdbServerClient.Device[] + >(); const onError = new StickyEventEmitter(); const observer = { onDeviceAdd, onDeviceRemove, onListChange, onError }; // Register `observer` before `#connect`. - // Because `#connect` might immediately receive some data - // and want to trigger observers + // So `#handleObserverStop` knows if there is any observer. this.#observers.push(observer); - this.#stream ??= this.#connect(); - const stream = await this.#stream; + let stream: AdbServerStream; + if (!this.#stream) { + this.#stream = this.#connect(); - if (options?.signal?.aborted) { - await this.#handleObserverStop(stream); - throw options.signal.reason; + try { + stream = await this.#stream; + } catch (e) { + this.#stream = undefined; + throw e; + } + } else { + stream = await this.#stream; + onListChange.fire(this.current); } const ref = new Ref(options); const stop = async () => { - const index = self.#observers.indexOf(observer); - if (index === -1) { - return; - } - - unorderedRemove(this.#observers, index); - + unorderedRemove(this.#observers, this.#observers.indexOf(observer)); await this.#handleObserverStop(stream); - ref.unref(); }; - options?.signal?.addEventListener("abort", () => void stop()); + if (options?.signal) { + if (options.signal.aborted) { + await stop(); + throw options.signal.reason; + } + + options.signal.addEventListener("abort", () => void stop()); + } // eslint-disable-next-line @typescript-eslint/no-this-alias - const self = this; + const _this = this; return { onDeviceAdd: onDeviceAdd.event, onDeviceRemove: onDeviceRemove.event, onListChange: onListChange.event, onError: onError.event, get current() { - return self.current; + return _this.current; }, stop, }; diff --git a/libraries/event/src/event-emitter.ts b/libraries/event/src/event-emitter.ts index adc25c73..c8251a9d 100644 --- a/libraries/event/src/event-emitter.ts +++ b/libraries/event/src/event-emitter.ts @@ -1,5 +1,5 @@ import type { Disposable } from "./disposable.js"; -import type { EventListener, RemoveEventListener } from "./event.js"; +import type { Event, EventListener, RemoveEventListener } from "./event.js"; export interface EventListenerInfo { listener: EventListener; @@ -9,17 +9,6 @@ export interface EventListenerInfo { args: unknown[]; } -export interface AddEventListener { - ( - listener: EventListener, - ): RemoveEventListener; - ( - listener: EventListener, - thisArg: TThis, - ...args: TArgs - ): RemoveEventListener; -} - export class EventEmitter implements Disposable { protected readonly listeners: EventListenerInfo[] = []; @@ -42,7 +31,7 @@ export class EventEmitter implements Disposable { return remove; } - event: AddEventListener = ( + event: Event = ( listener: EventListener, thisArg?: TThis, ...args: TArgs diff --git a/libraries/event/src/sticky-event-emitter.ts b/libraries/event/src/sticky-event-emitter.ts index 9ee3606a..3591699d 100644 --- a/libraries/event/src/sticky-event-emitter.ts +++ b/libraries/event/src/sticky-event-emitter.ts @@ -2,16 +2,18 @@ import type { EventListenerInfo } from "./event-emitter.js"; import { EventEmitter } from "./event-emitter.js"; import type { RemoveEventListener } from "./event.js"; +const Undefined = Symbol("undefined"); + export class StickyEventEmitter extends EventEmitter< TEvent, TResult > { - #value: TEvent | undefined; + #value: TEvent | typeof Undefined = Undefined; protected override addEventListener( info: EventListenerInfo, ): RemoveEventListener { - if (this.#value) { + if (this.#value !== Undefined) { info.listener.call(info.thisArg, this.#value, ...info.args); } return super.addEventListener(info); diff --git a/libraries/scrcpy-decoder-tinyh264/src/decoder.ts b/libraries/scrcpy-decoder-tinyh264/src/decoder.ts index 3d8fc26d..5b7ced21 100644 --- a/libraries/scrcpy-decoder-tinyh264/src/decoder.ts +++ b/libraries/scrcpy-decoder-tinyh264/src/decoder.ts @@ -1,5 +1,5 @@ import { PromiseResolver } from "@yume-chan/async"; -import { EventEmitter } from "@yume-chan/event"; +import { StickyEventEmitter } from "@yume-chan/event"; import type { ScrcpyMediaStreamPacket } from "@yume-chan/scrcpy"; import { AndroidAvcLevel, @@ -45,7 +45,7 @@ export class TinyH264Decoder implements ScrcpyVideoDecoder { return this.#renderer; } - #sizeChanged = new EventEmitter<{ width: number; height: number }>(); + #sizeChanged = new StickyEventEmitter<{ width: number; height: number }>(); get sizeChanged() { return this.#sizeChanged.event; } diff --git a/libraries/scrcpy-decoder-webcodecs/src/video/decoder.ts b/libraries/scrcpy-decoder-webcodecs/src/video/decoder.ts index 433ca2cc..bb61953a 100644 --- a/libraries/scrcpy-decoder-webcodecs/src/video/decoder.ts +++ b/libraries/scrcpy-decoder-webcodecs/src/video/decoder.ts @@ -1,4 +1,4 @@ -import { EventEmitter } from "@yume-chan/event"; +import { StickyEventEmitter } from "@yume-chan/event"; import type { ScrcpyMediaStreamPacket } from "@yume-chan/scrcpy"; import { ScrcpyVideoCodecId } from "@yume-chan/scrcpy"; import type { @@ -61,7 +61,7 @@ export class WebCodecsVideoDecoder implements ScrcpyVideoDecoder { return this.#framesSkipped; } - #sizeChanged = new EventEmitter<{ width: number; height: number }>(); + #sizeChanged = new StickyEventEmitter<{ width: number; height: number }>(); get sizeChanged() { return this.#sizeChanged.event; }