diff --git a/.vscode/settings.json b/.vscode/settings.json index 842e96cb..2a0ea936 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -19,6 +19,8 @@ "Demuxer", "Deserialization", "DESERIALIZERS", + "diskstats", + "dumpsys", "ebml", "Embedder", "entrypoints", diff --git a/apps/cli/src/index.ts b/apps/cli/src/index.ts index 32724136..39b52ab9 100644 --- a/apps/cli/src/index.ts +++ b/apps/cli/src/index.ts @@ -1,3 +1,5 @@ +/// + import "source-map-support/register.js"; import { Adb, AdbServerClient } from "@yume-chan/adb"; diff --git a/apps/demo/next.config.js b/apps/demo/next.config.js index 527f5bda..c04d0214 100644 --- a/apps/demo/next.config.js +++ b/apps/demo/next.config.js @@ -24,9 +24,8 @@ function pipe(value, ...callbacks) { return value; } -/** @type {import('next').NextConfig} */ module.exports = pipe( - { + /** @type {import('next').NextConfig} */ ({ basePath, pageExtensions: ["js", "jsx", "ts", "tsx", "md", "mdx"], reactStrictMode: false, @@ -72,7 +71,8 @@ module.exports = pipe( }, ]; }, - }, + poweredByHeader: false, + }), withBundleAnalyzer, withPwa, withMDX diff --git a/apps/demo/package.json b/apps/demo/package.json index 35af0723..7f885f98 100644 --- a/apps/demo/package.json +++ b/apps/demo/package.json @@ -4,7 +4,7 @@ "private": true, "scripts": { "postinstall": "fetch-scrcpy-server 2.1 && node scripts/manifest.mjs", - "dev": "next dev", + "dev": "next dev -p 5000", "build": "next build", "start": "next start", "lint": "next lint" diff --git a/apps/demo/src/components/scrcpy/settings.tsx b/apps/demo/src/components/scrcpy/settings.tsx index 39c3d03d..b236febe 100644 --- a/apps/demo/src/components/scrcpy/settings.tsx +++ b/apps/demo/src/components/scrcpy/settings.tsx @@ -13,6 +13,7 @@ import { import { makeStyles } from "@griffel/react"; import { AdbSyncError } from "@yume-chan/adb"; import { AdbScrcpyClient, AdbScrcpyOptionsLatest } from "@yume-chan/adb-scrcpy"; +import { VERSION } from "@yume-chan/fetch-scrcpy-server"; import { DEFAULT_SERVER_PATH, ScrcpyDisplay, @@ -26,8 +27,7 @@ import { ScrcpyVideoDecoderConstructor, TinyH264Decoder, } from "@yume-chan/scrcpy-decoder-tinyh264"; -import { VERSION } from "@yume-chan/fetch-scrcpy-server"; -import { DecodeUtf8Stream, GatherStringStream } from "@yume-chan/stream-extra"; +import { ConcatStringStream, DecodeUtf8Stream } from "@yume-chan/stream-extra"; import { autorun, computed, @@ -223,12 +223,12 @@ autorun(() => { (async () => { const sync = await GLOBAL_STATE.adb!.sync(); try { - const content = new GatherStringStream(); - await sync - .read(SCRCPY_SETTINGS_FILENAME) - .pipeThrough(new DecodeUtf8Stream()) - .pipeTo(content); - const settings = JSON.parse(content.result); + const settings = JSON.parse( + await sync + .read(SCRCPY_SETTINGS_FILENAME) + .pipeThrough(new DecodeUtf8Stream()) + .pipeThrough(new ConcatStringStream()) + ); runInAction(() => { SETTING_STATE.settings = { ...DEFAULT_SETTINGS, diff --git a/libraries/adb-scrcpy/src/options/1_16.ts b/libraries/adb-scrcpy/src/options/1_16.ts index b1415e88..8f14324a 100644 --- a/libraries/adb-scrcpy/src/options/1_16.ts +++ b/libraries/adb-scrcpy/src/options/1_16.ts @@ -62,14 +62,18 @@ export class AdbScrcpyOptions1_16 extends AdbScrcpyOptionsBase ): Promise { try { - // Server will exit before opening connections when an invalid display id was given. + // Server will exit before opening connections when an invalid display id was given + // so `start` will throw an `AdbScrcpyExitedError` const client = await AdbScrcpyClient.start( adb, path, version, options ); + + // If the server didn't exit, manually stop it and throw an error await client.close(); + throw new Error("Unexpected server output"); } catch (e) { if (e instanceof AdbScrcpyExitedError) { const displays: ScrcpyDisplay[] = []; @@ -81,9 +85,9 @@ export class AdbScrcpyOptions1_16 extends AdbScrcpyOptionsBase - ) { + ): Promise { try { + // Similar to `AdbScrcpyOptions1_16.getDisplays`, + // server start process won't complete and `start `will throw const client = await AdbScrcpyClient.start( adb, path, version, options ); + + // If the server didn't exit, manually stop it and throw an error await client.close(); + throw new Error("Unexpected server output"); } catch (e) { if (e instanceof AdbScrcpyExitedError) { const encoders: ScrcpyEncoder[] = []; @@ -38,8 +43,9 @@ export class AdbScrcpyOptions2_0 extends AdbScrcpyOptionsBase { const socket = await this.createSocket(service); - const gatherStream = new GatherStringStream(); - await socket.readable + return await socket.readable .pipeThrough(new DecodeUtf8Stream()) - .pipeTo(gatherStream); - return gatherStream.result; + .pipeThrough(new ConcatStringStream()); } public async getProp(key: string): Promise { diff --git a/libraries/adb/src/commands/subprocess/command.ts b/libraries/adb/src/commands/subprocess/command.ts index 19ef5e2c..7a17f821 100644 --- a/libraries/adb/src/commands/subprocess/command.ts +++ b/libraries/adb/src/commands/subprocess/command.ts @@ -1,4 +1,4 @@ -import { DecodeUtf8Stream, GatherStringStream } from "@yume-chan/stream-extra"; +import { ConcatStringStream, DecodeUtf8Stream } from "@yume-chan/stream-extra"; import { AdbCommandBase } from "../base.js"; @@ -110,18 +110,19 @@ export class AdbSubprocess extends AdbCommandBase { ): Promise { const process = await this.spawn(command, options); - const stdout = new GatherStringStream(); - const stderr = new GatherStringStream(); - - const [, , exitCode] = await Promise.all([ - process.stdout.pipeThrough(new DecodeUtf8Stream()).pipeTo(stdout), - process.stderr.pipeThrough(new DecodeUtf8Stream()).pipeTo(stderr), + const [stdout, stderr, exitCode] = await Promise.all([ + process.stdout + .pipeThrough(new DecodeUtf8Stream()) + .pipeThrough(new ConcatStringStream()), + process.stderr + .pipeThrough(new DecodeUtf8Stream()) + .pipeThrough(new ConcatStringStream()), process.exit, ]); return { - stdout: stdout.result, - stderr: stderr.result, + stdout, + stderr, exitCode, }; } diff --git a/libraries/android-bin/src/cmd.ts b/libraries/android-bin/src/cmd.ts index 2247579d..5ddf9e89 100644 --- a/libraries/android-bin/src/cmd.ts +++ b/libraries/android-bin/src/cmd.ts @@ -10,7 +10,7 @@ import { AdbSubprocessNoneProtocol, AdbSubprocessShellProtocol, } from "@yume-chan/adb"; -import { DecodeUtf8Stream, GatherStringStream } from "@yume-chan/stream-extra"; +import { ConcatStringStream, DecodeUtf8Stream } from "@yume-chan/stream-extra"; export class Cmd extends AdbCommandBase { #supportsShellV2: boolean; @@ -82,18 +82,19 @@ export class Cmd extends AdbCommandBase { ): Promise { const process = await this.spawn(true, command, ...args); - const stdout = new GatherStringStream(); - const stderr = new GatherStringStream(); - - const [, , exitCode] = await Promise.all([ - process.stdout.pipeThrough(new DecodeUtf8Stream()).pipeTo(stdout), - process.stderr.pipeThrough(new DecodeUtf8Stream()).pipeTo(stderr), + const [stdout, stderr, exitCode] = await Promise.all([ + process.stdout + .pipeThrough(new DecodeUtf8Stream()) + .pipeThrough(new ConcatStringStream()), + process.stderr + .pipeThrough(new DecodeUtf8Stream()) + .pipeThrough(new ConcatStringStream()), process.exit, ]); return { - stdout: stdout.result, - stderr: stderr.result, + stdout, + stderr, exitCode, }; } diff --git a/libraries/android-bin/src/dumpsys.ts b/libraries/android-bin/src/dumpsys.ts index 7fae0a6e..1ced1fb1 100644 --- a/libraries/android-bin/src/dumpsys.ts +++ b/libraries/android-bin/src/dumpsys.ts @@ -1,3 +1,36 @@ import { AdbCommandBase } from "@yume-chan/adb"; -export class DumpSys extends AdbCommandBase {} +export class DumpSys extends AdbCommandBase { + async diskStats() { + const output = await this.adb.subprocess.spawnAndWaitLegacy([ + "dumpsys", + "diskstats", + ]); + + function getSize(name: string) { + const match = output.match( + new RegExp(`${name}-Free: (\\d+)K / (\\d+)K`) + ); + if (!match) { + return [0, 0]; + } + return [ + Number.parseInt(match[1]!, 10) * 1024, + Number.parseInt(match[2]!, 10) * 1024, + ]; + } + + const [dataFree, dataTotal] = getSize("Data"); + const [cacheFree, cacheTotal] = getSize("Cache"); + const [systemFree, systemTotal] = getSize("System"); + + return { + dataFree, + dataTotal, + cacheFree, + cacheTotal, + systemFree, + systemTotal, + }; + } +} diff --git a/libraries/stream-extra/src/concat.spec.ts b/libraries/stream-extra/src/concat.spec.ts new file mode 100644 index 00000000..ef1286c5 --- /dev/null +++ b/libraries/stream-extra/src/concat.spec.ts @@ -0,0 +1,131 @@ +import { describe, expect, it } from "@jest/globals"; + +import { ConcatBufferStream, ConcatStringStream } from "./concat.js"; +import { ReadableStream } from "./stream.js"; + +describe("ConcatStringStream", () => { + it("should have Promise interface", () => { + const readable = new ConcatStringStream().readable; + expect(readable).toBeInstanceOf(ReadableStream); + expect(readable).toHaveProperty("then", expect.any(Function)); + expect(readable).toHaveProperty("catch", expect.any(Function)); + expect(readable).toHaveProperty("finally", expect.any(Function)); + }); + + it("should resolve to result", async () => { + const readable = new ReadableStream({ + start(controller) { + controller.enqueue("foo"); + controller.enqueue("bar"); + controller.close(); + }, + }).pipeThrough(new ConcatStringStream()); + + await expect(readable).resolves.toBe("foobar"); + }); + + it("should read result", async () => { + const readable = new ReadableStream({ + start(controller) { + controller.enqueue("foo"); + controller.enqueue("bar"); + controller.close(); + }, + }).pipeThrough(new ConcatStringStream()); + + const reader = readable.getReader(); + await expect(reader.read()).resolves.toEqual({ + done: false, + value: "foobar", + }); + await expect(reader.read()).resolves.toEqual({ + done: true, + value: undefined, + }); + }); + + it("should report error when aborted", async () => { + const stream = new ConcatStringStream(); + const reason = "aborted"; + await stream.writable.getWriter().abort(reason); + await expect(stream.readable).rejects.toBe(reason); + await expect(() => stream.readable.getReader().read()).rejects.toBe( + reason + ); + }); +}); + +describe("ConcatBufferStream", () => { + it("should have Promise interface", () => { + const readable = new ConcatBufferStream().readable; + expect(readable).toBeInstanceOf(ReadableStream); + expect(readable).toHaveProperty("then", expect.any(Function)); + expect(readable).toHaveProperty("catch", expect.any(Function)); + expect(readable).toHaveProperty("finally", expect.any(Function)); + }); + + it("should return empty buffer if no input", async () => { + const readable = new ReadableStream({ + start(controller) { + controller.close(); + }, + }).pipeThrough(new ConcatBufferStream()); + + await expect(readable).resolves.toEqual(new Uint8Array()); + }); + + it("should return one segment", async () => { + const readable = new ReadableStream({ + start(controller) { + controller.enqueue(new Uint8Array([1, 2, 3])); + controller.close(); + }, + }).pipeThrough(new ConcatBufferStream()); + + await expect(readable).resolves.toEqual(new Uint8Array([1, 2, 3])); + }); + + it("should resolve to result", async () => { + const readable = new ReadableStream({ + start(controller) { + controller.enqueue(new Uint8Array([1, 2, 3])); + controller.enqueue(new Uint8Array([4, 5, 6])); + controller.close(); + }, + }).pipeThrough(new ConcatBufferStream()); + + await expect(readable).resolves.toEqual( + new Uint8Array([1, 2, 3, 4, 5, 6]) + ); + }); + + it("should read result", async () => { + const readable = new ReadableStream({ + start(controller) { + controller.enqueue(new Uint8Array([1, 2, 3])); + controller.enqueue(new Uint8Array([4, 5, 6])); + controller.close(); + }, + }).pipeThrough(new ConcatBufferStream()); + + const reader = readable.getReader(); + await expect(reader.read()).resolves.toEqual({ + done: false, + value: new Uint8Array([1, 2, 3, 4, 5, 6]), + }); + await expect(reader.read()).resolves.toEqual({ + done: true, + value: undefined, + }); + }); + + it("should report error when aborted", async () => { + const stream = new ConcatBufferStream(); + const reason = "aborted"; + await stream.writable.getWriter().abort(reason); + await expect(stream.readable).rejects.toBe(reason); + await expect(() => stream.readable.getReader().read()).rejects.toBe( + reason + ); + }); +}); diff --git a/libraries/stream-extra/src/concat.ts b/libraries/stream-extra/src/concat.ts new file mode 100644 index 00000000..835a19b0 --- /dev/null +++ b/libraries/stream-extra/src/concat.ts @@ -0,0 +1,160 @@ +import { PromiseResolver } from "@yume-chan/async"; + +import type { ReadableStreamDefaultController } from "./stream.js"; +import { ReadableStream, WritableStream } from "./stream.js"; + +export interface ConcatStringReadableStream + extends ReadableStream, + Promise {} + +// `TransformStream` only calls its `source.flush` method when its `readable` is being read. +// If the user want to use the `Promise` interface, the `flush` method will never be called, +// so the `PromiseResolver` will never be resolved. +// Thus we need to implement our own `TransformStream` using a `WritableStream` and a `ReadableStream`. + +/** + * A `TransformStream` that concatenates strings. + * + * Its `readable` is also a `Promise`, so it's possible to `await` it to get the result. + * + * ```ts + * const result: string = await readable.pipeThrough(new ConcatStringStream()); + * ``` + */ +export class ConcatStringStream { + // PERF: rope (concat strings) is faster than `[].join('')` + #result = ""; + + #resolver = new PromiseResolver(); + + #writable = new WritableStream({ + write: (chunk) => { + this.#result += chunk; + }, + close: () => { + this.#resolver.resolve(this.#result); + this.#readableController.enqueue(this.#result); + this.#readableController.close(); + }, + abort: (reason) => { + this.#resolver.reject(reason); + this.#readableController.error(reason); + }, + }); + public get writable(): WritableStream { + return this.#writable; + } + + #readableController!: ReadableStreamDefaultController; + #readable = new ReadableStream({ + start: (controller) => { + this.#readableController = controller; + }, + }) as ConcatStringReadableStream; + public get readable(): ConcatStringReadableStream { + return this.#readable; + } + + public constructor() { + void Object.defineProperties(this.#readable, { + then: { + get: () => + this.#resolver.promise.then.bind(this.#resolver.promise), + }, + catch: { + get: () => + this.#resolver.promise.catch.bind(this.#resolver.promise), + }, + finally: { + get: () => + this.#resolver.promise.finally.bind(this.#resolver.promise), + }, + }); + } +} + +export interface ConcatBufferReadableStream + extends ReadableStream, + Promise {} + +/** + * A `TransformStream` that concatenates `Uint8Array`s. + * + * If you want to decode the result as string, + * prefer `.pipeThrough(new DecodeUtf8Stream()).pipeThrough(new ConcatStringStream())`, + * than `.pipeThough(new ConcatBufferStream()).pipeThrough(new DecodeUtf8Stream())`, + * because concatenating strings is faster than concatenating `Uint8Array`s. + */ +export class ConcatBufferStream { + #segments: Uint8Array[] = []; + + #resolver = new PromiseResolver(); + + #writable = new WritableStream({ + write: (chunk) => { + this.#segments.push(chunk); + }, + close: () => { + let result: Uint8Array; + let offset = 0; + switch (this.#segments.length) { + case 0: + result = new Uint8Array(0); + break; + case 1: + result = this.#segments[0]!; + break; + default: + result = new Uint8Array( + this.#segments.reduce( + (prev, item) => prev + item.length, + 0 + ) + ); + for (const segment of this.#segments) { + result.set(segment, offset); + offset += segment.length; + } + break; + } + + this.#resolver.resolve(result); + this.#readableController.enqueue(result); + this.#readableController.close(); + }, + abort: (reason) => { + this.#resolver.reject(reason); + this.#readableController.error(reason); + }, + }); + public get writable(): WritableStream { + return this.#writable; + } + + #readableController!: ReadableStreamDefaultController; + #readable = new ReadableStream({ + start: (controller) => { + this.#readableController = controller; + }, + }) as ConcatBufferReadableStream; + public get readable(): ConcatBufferReadableStream { + return this.#readable; + } + + public constructor() { + void Object.defineProperties(this.#readable, { + then: { + get: () => + this.#resolver.promise.then.bind(this.#resolver.promise), + }, + catch: { + get: () => + this.#resolver.promise.catch.bind(this.#resolver.promise), + }, + finally: { + get: () => + this.#resolver.promise.finally.bind(this.#resolver.promise), + }, + }); + } +} diff --git a/libraries/stream-extra/src/gather-string.ts b/libraries/stream-extra/src/gather-string.ts deleted file mode 100644 index 47656240..00000000 --- a/libraries/stream-extra/src/gather-string.ts +++ /dev/null @@ -1,17 +0,0 @@ -import { WritableStream } from "./stream.js"; - -export class GatherStringStream extends WritableStream { - // PERF: rope (concat strings) is faster than `[].join('')` - #result = ""; - public get result() { - return this.#result; - } - - public constructor() { - super({ - write: (chunk) => { - this.#result += chunk; - }, - }); - } -} diff --git a/libraries/stream-extra/src/index.ts b/libraries/stream-extra/src/index.ts index 838c637d..664394f9 100644 --- a/libraries/stream-extra/src/index.ts +++ b/libraries/stream-extra/src/index.ts @@ -1,10 +1,10 @@ export * from "./buffered-transform.js"; export * from "./buffered.js"; +export * from "./concat.js"; export * from "./consumable.js"; export * from "./decode-utf8.js"; export * from "./distribution.js"; export * from "./duplex.js"; -export * from "./gather-string.js"; export * from "./inspect.js"; export * from "./pipe-from.js"; export * from "./push-readable.js"; diff --git a/libraries/stream-extra/src/stream.ts b/libraries/stream-extra/src/stream.ts index a04f58ea..2d0e6db7 100644 --- a/libraries/stream-extra/src/stream.ts +++ b/libraries/stream-extra/src/stream.ts @@ -45,20 +45,7 @@ export type TransformStream = TransformStreamPolyfill; export let TransformStream = TransformStreamPolyfill; if (GLOBAL.ReadableStream && GLOBAL.WritableStream && GLOBAL.TransformStream) { - // Use browser native implementation ReadableStream = GLOBAL.ReadableStream; WritableStream = GLOBAL.WritableStream; TransformStream = GLOBAL.TransformStream; -} else { - // TODO: enable loading Node.js stream implementation when bundler supports Top Level Await - // try { - // // Use Node.js native implementation - // const MODULE_NAME = "node:stream/web"; - // const StreamWeb = (await import(MODULE_NAME)) as GlobalExtension; - // ReadableStream = StreamWeb.ReadableStream; - // WritableStream = StreamWeb.WritableStream; - // TransformStream = StreamWeb.TransformStream; - // } catch { - // // ignore - // } }