feat(stream): improve ConcatStringStream

This commit is contained in:
Simon Chan 2023-07-06 15:07:08 +08:00
parent e3bfd1592f
commit 721e14fa7a
No known key found for this signature in database
GPG key ID: A8B69F750B9BCEDD
16 changed files with 380 additions and 72 deletions

View file

@ -19,6 +19,8 @@
"Demuxer",
"Deserialization",
"DESERIALIZERS",
"diskstats",
"dumpsys",
"ebml",
"Embedder",
"entrypoints",

View file

@ -1,3 +1,5 @@
/// <reference types="node" />
import "source-map-support/register.js";
import { Adb, AdbServerClient } from "@yume-chan/adb";

View file

@ -24,9 +24,8 @@ function pipe(value, ...callbacks) {
return value;
}
/** @type {import('next').NextConfig} */
module.exports = pipe(
{
/** @type {import('next').NextConfig} */ ({
basePath,
pageExtensions: ["js", "jsx", "ts", "tsx", "md", "mdx"],
reactStrictMode: false,
@ -72,7 +71,8 @@ module.exports = pipe(
},
];
},
},
poweredByHeader: false,
}),
withBundleAnalyzer,
withPwa,
withMDX

View file

@ -4,7 +4,7 @@
"private": true,
"scripts": {
"postinstall": "fetch-scrcpy-server 2.1 && node scripts/manifest.mjs",
"dev": "next dev",
"dev": "next dev -p 5000",
"build": "next build",
"start": "next start",
"lint": "next lint"

View file

@ -13,6 +13,7 @@ import {
import { makeStyles } from "@griffel/react";
import { AdbSyncError } from "@yume-chan/adb";
import { AdbScrcpyClient, AdbScrcpyOptionsLatest } from "@yume-chan/adb-scrcpy";
import { VERSION } from "@yume-chan/fetch-scrcpy-server";
import {
DEFAULT_SERVER_PATH,
ScrcpyDisplay,
@ -26,8 +27,7 @@ import {
ScrcpyVideoDecoderConstructor,
TinyH264Decoder,
} from "@yume-chan/scrcpy-decoder-tinyh264";
import { VERSION } from "@yume-chan/fetch-scrcpy-server";
import { DecodeUtf8Stream, GatherStringStream } from "@yume-chan/stream-extra";
import { ConcatStringStream, DecodeUtf8Stream } from "@yume-chan/stream-extra";
import {
autorun,
computed,
@ -223,12 +223,12 @@ autorun(() => {
(async () => {
const sync = await GLOBAL_STATE.adb!.sync();
try {
const content = new GatherStringStream();
const settings = JSON.parse(
await sync
.read(SCRCPY_SETTINGS_FILENAME)
.pipeThrough(new DecodeUtf8Stream())
.pipeTo(content);
const settings = JSON.parse(content.result);
.pipeThrough(new ConcatStringStream())
);
runInAction(() => {
SETTING_STATE.settings = {
...DEFAULT_SETTINGS,

View file

@ -62,14 +62,18 @@ export class AdbScrcpyOptions1_16 extends AdbScrcpyOptionsBase<ScrcpyOptionsInit
options: AdbScrcpyOptions<object>
): Promise<ScrcpyDisplay[]> {
try {
// Server will exit before opening connections when an invalid display id was given.
// Server will exit before opening connections when an invalid display id was given
// so `start` will throw an `AdbScrcpyExitedError`
const client = await AdbScrcpyClient.start(
adb,
path,
version,
options
);
// If the server didn't exit, manually stop it and throw an error
await client.close();
throw new Error("Unexpected server output");
} catch (e) {
if (e instanceof AdbScrcpyExitedError) {
const displays: ScrcpyDisplay[] = [];
@ -81,9 +85,9 @@ export class AdbScrcpyOptions1_16 extends AdbScrcpyOptionsBase<ScrcpyOptionsInit
}
return displays;
}
}
throw new Error("failed to get displays");
throw e;
}
}
public override getEncoders(

View file

@ -18,15 +18,20 @@ export class AdbScrcpyOptions2_0 extends AdbScrcpyOptionsBase<ScrcpyOptionsInit2
path: string,
version: string,
options: AdbScrcpyOptions<object>
) {
): Promise<ScrcpyEncoder[]> {
try {
// Similar to `AdbScrcpyOptions1_16.getDisplays`,
// server start process won't complete and `start `will throw
const client = await AdbScrcpyClient.start(
adb,
path,
version,
options
);
// If the server didn't exit, manually stop it and throw an error
await client.close();
throw new Error("Unexpected server output");
} catch (e) {
if (e instanceof AdbScrcpyExitedError) {
const encoders: ScrcpyEncoder[] = [];
@ -38,8 +43,9 @@ export class AdbScrcpyOptions2_0 extends AdbScrcpyOptionsBase<ScrcpyOptionsInit2
}
return encoders;
}
throw e;
}
throw new Error("Unexpected error");
}
public override async getEncoders(

View file

@ -1,5 +1,5 @@
import type { Consumable, ReadableWritablePair } from "@yume-chan/stream-extra";
import { DecodeUtf8Stream, GatherStringStream } from "@yume-chan/stream-extra";
import { ConcatStringStream, DecodeUtf8Stream } from "@yume-chan/stream-extra";
import type { ValueOrPromise } from "@yume-chan/struct";
import type { AdbBanner } from "./banner.js";
@ -93,11 +93,9 @@ export class Adb implements Closeable {
public async createSocketAndWait(service: string): Promise<string> {
const socket = await this.createSocket(service);
const gatherStream = new GatherStringStream();
await socket.readable
return await socket.readable
.pipeThrough(new DecodeUtf8Stream())
.pipeTo(gatherStream);
return gatherStream.result;
.pipeThrough(new ConcatStringStream());
}
public async getProp(key: string): Promise<string> {

View file

@ -1,4 +1,4 @@
import { DecodeUtf8Stream, GatherStringStream } from "@yume-chan/stream-extra";
import { ConcatStringStream, DecodeUtf8Stream } from "@yume-chan/stream-extra";
import { AdbCommandBase } from "../base.js";
@ -110,18 +110,19 @@ export class AdbSubprocess extends AdbCommandBase {
): Promise<AdbSubprocessWaitResult> {
const process = await this.spawn(command, options);
const stdout = new GatherStringStream();
const stderr = new GatherStringStream();
const [, , exitCode] = await Promise.all([
process.stdout.pipeThrough(new DecodeUtf8Stream()).pipeTo(stdout),
process.stderr.pipeThrough(new DecodeUtf8Stream()).pipeTo(stderr),
const [stdout, stderr, exitCode] = await Promise.all([
process.stdout
.pipeThrough(new DecodeUtf8Stream())
.pipeThrough(new ConcatStringStream()),
process.stderr
.pipeThrough(new DecodeUtf8Stream())
.pipeThrough(new ConcatStringStream()),
process.exit,
]);
return {
stdout: stdout.result,
stderr: stderr.result,
stdout,
stderr,
exitCode,
};
}

View file

@ -10,7 +10,7 @@ import {
AdbSubprocessNoneProtocol,
AdbSubprocessShellProtocol,
} from "@yume-chan/adb";
import { DecodeUtf8Stream, GatherStringStream } from "@yume-chan/stream-extra";
import { ConcatStringStream, DecodeUtf8Stream } from "@yume-chan/stream-extra";
export class Cmd extends AdbCommandBase {
#supportsShellV2: boolean;
@ -82,18 +82,19 @@ export class Cmd extends AdbCommandBase {
): Promise<AdbSubprocessWaitResult> {
const process = await this.spawn(true, command, ...args);
const stdout = new GatherStringStream();
const stderr = new GatherStringStream();
const [, , exitCode] = await Promise.all([
process.stdout.pipeThrough(new DecodeUtf8Stream()).pipeTo(stdout),
process.stderr.pipeThrough(new DecodeUtf8Stream()).pipeTo(stderr),
const [stdout, stderr, exitCode] = await Promise.all([
process.stdout
.pipeThrough(new DecodeUtf8Stream())
.pipeThrough(new ConcatStringStream()),
process.stderr
.pipeThrough(new DecodeUtf8Stream())
.pipeThrough(new ConcatStringStream()),
process.exit,
]);
return {
stdout: stdout.result,
stderr: stderr.result,
stdout,
stderr,
exitCode,
};
}

View file

@ -1,3 +1,36 @@
import { AdbCommandBase } from "@yume-chan/adb";
export class DumpSys extends AdbCommandBase {}
export class DumpSys extends AdbCommandBase {
async diskStats() {
const output = await this.adb.subprocess.spawnAndWaitLegacy([
"dumpsys",
"diskstats",
]);
function getSize(name: string) {
const match = output.match(
new RegExp(`${name}-Free: (\\d+)K / (\\d+)K`)
);
if (!match) {
return [0, 0];
}
return [
Number.parseInt(match[1]!, 10) * 1024,
Number.parseInt(match[2]!, 10) * 1024,
];
}
const [dataFree, dataTotal] = getSize("Data");
const [cacheFree, cacheTotal] = getSize("Cache");
const [systemFree, systemTotal] = getSize("System");
return {
dataFree,
dataTotal,
cacheFree,
cacheTotal,
systemFree,
systemTotal,
};
}
}

View file

@ -0,0 +1,131 @@
import { describe, expect, it } from "@jest/globals";
import { ConcatBufferStream, ConcatStringStream } from "./concat.js";
import { ReadableStream } from "./stream.js";
describe("ConcatStringStream", () => {
it("should have Promise interface", () => {
const readable = new ConcatStringStream().readable;
expect(readable).toBeInstanceOf(ReadableStream);
expect(readable).toHaveProperty("then", expect.any(Function));
expect(readable).toHaveProperty("catch", expect.any(Function));
expect(readable).toHaveProperty("finally", expect.any(Function));
});
it("should resolve to result", async () => {
const readable = new ReadableStream<string>({
start(controller) {
controller.enqueue("foo");
controller.enqueue("bar");
controller.close();
},
}).pipeThrough(new ConcatStringStream());
await expect(readable).resolves.toBe("foobar");
});
it("should read result", async () => {
const readable = new ReadableStream<string>({
start(controller) {
controller.enqueue("foo");
controller.enqueue("bar");
controller.close();
},
}).pipeThrough(new ConcatStringStream());
const reader = readable.getReader();
await expect(reader.read()).resolves.toEqual({
done: false,
value: "foobar",
});
await expect(reader.read()).resolves.toEqual({
done: true,
value: undefined,
});
});
it("should report error when aborted", async () => {
const stream = new ConcatStringStream();
const reason = "aborted";
await stream.writable.getWriter().abort(reason);
await expect(stream.readable).rejects.toBe(reason);
await expect(() => stream.readable.getReader().read()).rejects.toBe(
reason
);
});
});
describe("ConcatBufferStream", () => {
it("should have Promise interface", () => {
const readable = new ConcatBufferStream().readable;
expect(readable).toBeInstanceOf(ReadableStream);
expect(readable).toHaveProperty("then", expect.any(Function));
expect(readable).toHaveProperty("catch", expect.any(Function));
expect(readable).toHaveProperty("finally", expect.any(Function));
});
it("should return empty buffer if no input", async () => {
const readable = new ReadableStream<Uint8Array>({
start(controller) {
controller.close();
},
}).pipeThrough(new ConcatBufferStream());
await expect(readable).resolves.toEqual(new Uint8Array());
});
it("should return one segment", async () => {
const readable = new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(new Uint8Array([1, 2, 3]));
controller.close();
},
}).pipeThrough(new ConcatBufferStream());
await expect(readable).resolves.toEqual(new Uint8Array([1, 2, 3]));
});
it("should resolve to result", async () => {
const readable = new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(new Uint8Array([1, 2, 3]));
controller.enqueue(new Uint8Array([4, 5, 6]));
controller.close();
},
}).pipeThrough(new ConcatBufferStream());
await expect(readable).resolves.toEqual(
new Uint8Array([1, 2, 3, 4, 5, 6])
);
});
it("should read result", async () => {
const readable = new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(new Uint8Array([1, 2, 3]));
controller.enqueue(new Uint8Array([4, 5, 6]));
controller.close();
},
}).pipeThrough(new ConcatBufferStream());
const reader = readable.getReader();
await expect(reader.read()).resolves.toEqual({
done: false,
value: new Uint8Array([1, 2, 3, 4, 5, 6]),
});
await expect(reader.read()).resolves.toEqual({
done: true,
value: undefined,
});
});
it("should report error when aborted", async () => {
const stream = new ConcatBufferStream();
const reason = "aborted";
await stream.writable.getWriter().abort(reason);
await expect(stream.readable).rejects.toBe(reason);
await expect(() => stream.readable.getReader().read()).rejects.toBe(
reason
);
});
});

View file

@ -0,0 +1,160 @@
import { PromiseResolver } from "@yume-chan/async";
import type { ReadableStreamDefaultController } from "./stream.js";
import { ReadableStream, WritableStream } from "./stream.js";
export interface ConcatStringReadableStream
extends ReadableStream<string>,
Promise<string> {}
// `TransformStream` only calls its `source.flush` method when its `readable` is being read.
// If the user want to use the `Promise` interface, the `flush` method will never be called,
// so the `PromiseResolver` will never be resolved.
// Thus we need to implement our own `TransformStream` using a `WritableStream` and a `ReadableStream`.
/**
* A `TransformStream` that concatenates strings.
*
* Its `readable` is also a `Promise<string>`, so it's possible to `await` it to get the result.
*
* ```ts
* const result: string = await readable.pipeThrough(new ConcatStringStream());
* ```
*/
export class ConcatStringStream {
// PERF: rope (concat strings) is faster than `[].join('')`
#result = "";
#resolver = new PromiseResolver<string>();
#writable = new WritableStream<string>({
write: (chunk) => {
this.#result += chunk;
},
close: () => {
this.#resolver.resolve(this.#result);
this.#readableController.enqueue(this.#result);
this.#readableController.close();
},
abort: (reason) => {
this.#resolver.reject(reason);
this.#readableController.error(reason);
},
});
public get writable(): WritableStream<string> {
return this.#writable;
}
#readableController!: ReadableStreamDefaultController<string>;
#readable = new ReadableStream<string>({
start: (controller) => {
this.#readableController = controller;
},
}) as ConcatStringReadableStream;
public get readable(): ConcatStringReadableStream {
return this.#readable;
}
public constructor() {
void Object.defineProperties(this.#readable, {
then: {
get: () =>
this.#resolver.promise.then.bind(this.#resolver.promise),
},
catch: {
get: () =>
this.#resolver.promise.catch.bind(this.#resolver.promise),
},
finally: {
get: () =>
this.#resolver.promise.finally.bind(this.#resolver.promise),
},
});
}
}
export interface ConcatBufferReadableStream
extends ReadableStream<Uint8Array>,
Promise<Uint8Array> {}
/**
* A `TransformStream` that concatenates `Uint8Array`s.
*
* If you want to decode the result as string,
* prefer `.pipeThrough(new DecodeUtf8Stream()).pipeThrough(new ConcatStringStream())`,
* than `.pipeThough(new ConcatBufferStream()).pipeThrough(new DecodeUtf8Stream())`,
* because concatenating strings is faster than concatenating `Uint8Array`s.
*/
export class ConcatBufferStream {
#segments: Uint8Array[] = [];
#resolver = new PromiseResolver<Uint8Array>();
#writable = new WritableStream<Uint8Array>({
write: (chunk) => {
this.#segments.push(chunk);
},
close: () => {
let result: Uint8Array;
let offset = 0;
switch (this.#segments.length) {
case 0:
result = new Uint8Array(0);
break;
case 1:
result = this.#segments[0]!;
break;
default:
result = new Uint8Array(
this.#segments.reduce(
(prev, item) => prev + item.length,
0
)
);
for (const segment of this.#segments) {
result.set(segment, offset);
offset += segment.length;
}
break;
}
this.#resolver.resolve(result);
this.#readableController.enqueue(result);
this.#readableController.close();
},
abort: (reason) => {
this.#resolver.reject(reason);
this.#readableController.error(reason);
},
});
public get writable(): WritableStream<Uint8Array> {
return this.#writable;
}
#readableController!: ReadableStreamDefaultController<Uint8Array>;
#readable = new ReadableStream<Uint8Array>({
start: (controller) => {
this.#readableController = controller;
},
}) as ConcatBufferReadableStream;
public get readable(): ConcatBufferReadableStream {
return this.#readable;
}
public constructor() {
void Object.defineProperties(this.#readable, {
then: {
get: () =>
this.#resolver.promise.then.bind(this.#resolver.promise),
},
catch: {
get: () =>
this.#resolver.promise.catch.bind(this.#resolver.promise),
},
finally: {
get: () =>
this.#resolver.promise.finally.bind(this.#resolver.promise),
},
});
}
}

View file

@ -1,17 +0,0 @@
import { WritableStream } from "./stream.js";
export class GatherStringStream extends WritableStream<string> {
// PERF: rope (concat strings) is faster than `[].join('')`
#result = "";
public get result() {
return this.#result;
}
public constructor() {
super({
write: (chunk) => {
this.#result += chunk;
},
});
}
}

View file

@ -1,10 +1,10 @@
export * from "./buffered-transform.js";
export * from "./buffered.js";
export * from "./concat.js";
export * from "./consumable.js";
export * from "./decode-utf8.js";
export * from "./distribution.js";
export * from "./duplex.js";
export * from "./gather-string.js";
export * from "./inspect.js";
export * from "./pipe-from.js";
export * from "./push-readable.js";

View file

@ -45,20 +45,7 @@ export type TransformStream<I = any, O = any> = TransformStreamPolyfill<I, O>;
export let TransformStream = TransformStreamPolyfill;
if (GLOBAL.ReadableStream && GLOBAL.WritableStream && GLOBAL.TransformStream) {
// Use browser native implementation
ReadableStream = GLOBAL.ReadableStream;
WritableStream = GLOBAL.WritableStream;
TransformStream = GLOBAL.TransformStream;
} else {
// TODO: enable loading Node.js stream implementation when bundler supports Top Level Await
// try {
// // Use Node.js native implementation
// const MODULE_NAME = "node:stream/web";
// const StreamWeb = (await import(MODULE_NAME)) as GlobalExtension;
// ReadableStream = StreamWeb.ReadableStream;
// WritableStream = StreamWeb.WritableStream;
// TransformStream = StreamWeb.TransformStream;
// } catch {
// // ignore
// }
}