refactor(stream): rename BufferedStream to BufferedReadableStream

refs #427
This commit is contained in:
Simon Chan 2022-06-12 21:24:06 +08:00
parent e779f80385
commit 2464b62a59
21 changed files with 57 additions and 39 deletions

View file

@ -51,7 +51,7 @@ module.exports = withMDX({
enforce: 'pre', enforce: 'pre',
}); });
config.experiments.topLevelAwait = true; // config.experiments.topLevelAwait = true;
return config; return config;
}, },

View file

@ -37,7 +37,7 @@ if (typeof window !== 'undefined') {
const { publicRuntimeConfig } = getConfig(); const { publicRuntimeConfig } = getConfig();
// Can't use `import` here because ESM is read-only (can't set `mitm` field) // Can't use `import` here because ESM is read-only (can't set `mitm` field)
// Add `await` here because top-level await is on, so every import can be a `Promise` // Add `await` here because top-level await is on, so every import can be a `Promise`
StreamSaver = await require('streamsaver'); StreamSaver = require('streamsaver');
StreamSaver.mitm = publicRuntimeConfig.basePath + '/StreamSaver/mitm.html'; StreamSaver.mitm = publicRuntimeConfig.basePath + '/StreamSaver/mitm.html';
} }

View file

@ -9,6 +9,9 @@
"references": [ "references": [
{ {
"path": "../adb/tsconfig.build.json" "path": "../adb/tsconfig.build.json"
},
{
"path": "../stream-extra/tsconfig.build.json"
} }
] ]
} }

View file

@ -13,6 +13,9 @@
{ {
"path": "../adb/tsconfig.build.json" "path": "../adb/tsconfig.build.json"
}, },
{
"path": "../stream-extra/tsconfig.build.json"
},
{ {
"path": "../struct/tsconfig.build.json" "path": "../struct/tsconfig.build.json"
} }

View file

@ -9,6 +9,9 @@
"references": [ "references": [
{ {
"path": "../adb/tsconfig.build.json" "path": "../adb/tsconfig.build.json"
},
{
"path": "../stream-extra/tsconfig.build.json"
} }
] ]
} }

View file

@ -1,4 +1,4 @@
import { BufferedStream } from '@yume-chan/stream-extra'; import { BufferedReadableStream } from '@yume-chan/stream-extra';
import Struct from '@yume-chan/struct'; import Struct from '@yume-chan/struct';
import type { Adb } from '../adb.js'; import type { Adb } from '../adb.js';
@ -61,7 +61,7 @@ export type AdbFrameBuffer = AdbFrameBufferV1 | AdbFrameBufferV2;
export async function framebuffer(adb: Adb): Promise<AdbFrameBuffer> { export async function framebuffer(adb: Adb): Promise<AdbFrameBuffer> {
const socket = await adb.createSocket('framebuffer:'); const socket = await adb.createSocket('framebuffer:');
const stream = new BufferedStream(socket.readable); const stream = new BufferedReadableStream(socket.readable);
const { version } = await Version.deserialize(stream); const { version } = await Version.deserialize(stream);
switch (version) { switch (version) {
case 1: case 1:

View file

@ -1,7 +1,7 @@
// cspell: ignore killforward // cspell: ignore killforward
import { AutoDisposable } from '@yume-chan/event'; import { AutoDisposable } from '@yume-chan/event';
import { BufferedStream, BufferedStreamEndedError } from '@yume-chan/stream-extra'; import { BufferedReadableStream, BufferedReadableStreamEndedError } from '@yume-chan/stream-extra';
import Struct from '@yume-chan/struct'; import Struct from '@yume-chan/struct';
import type { Adb } from '../adb.js'; import type { Adb } from '../adb.js';
@ -53,7 +53,7 @@ export class AdbReverseCommand extends AutoDisposable {
private async createBufferedStream(service: string) { private async createBufferedStream(service: string) {
const socket = await this.adb.createSocket(service); const socket = await this.adb.createSocket(service);
return new BufferedStream(socket.readable); return new BufferedReadableStream(socket.readable);
} }
private async sendRequest(service: string) { private async sendRequest(service: string) {
@ -97,7 +97,7 @@ export class AdbReverseCommand extends AutoDisposable {
try { try {
length = Number.parseInt(decodeUtf8(await stream.read(4)), 16); length = Number.parseInt(decodeUtf8(await stream.read(4)), 16);
} catch (e) { } catch (e) {
if (!(e instanceof BufferedStreamEndedError)) { if (!(e instanceof BufferedReadableStreamEndedError)) {
throw e; throw e;
} }

View file

@ -1,4 +1,4 @@
import type { BufferedStream, WritableStreamDefaultWriter } from '@yume-chan/stream-extra'; import type { BufferedReadableStream, WritableStreamDefaultWriter } from '@yume-chan/stream-extra';
import Struct from '@yume-chan/struct'; import Struct from '@yume-chan/struct';
import { AdbSyncRequestId, adbSyncWriteRequest } from './request.js'; import { AdbSyncRequestId, adbSyncWriteRequest } from './request.js';
@ -38,7 +38,7 @@ const LIST_V2_RESPONSE_TYPES = {
}; };
export async function* adbSyncOpenDir( export async function* adbSyncOpenDir(
stream: BufferedStream, stream: BufferedReadableStream,
writer: WritableStreamDefaultWriter<Uint8Array>, writer: WritableStreamDefaultWriter<Uint8Array>,
path: string, path: string,
v2: boolean, v2: boolean,

View file

@ -1,4 +1,4 @@
import { BufferedStream, ReadableStream, WritableStreamDefaultWriter } from '@yume-chan/stream-extra'; import { BufferedReadableStream, ReadableStream, WritableStreamDefaultWriter } from '@yume-chan/stream-extra';
import Struct from '@yume-chan/struct'; import Struct from '@yume-chan/struct';
import { AdbSyncRequestId, adbSyncWriteRequest } from './request.js'; import { AdbSyncRequestId, adbSyncWriteRequest } from './request.js';
@ -16,7 +16,7 @@ const RESPONSE_TYPES = {
}; };
export function adbSyncPull( export function adbSyncPull(
stream: BufferedStream, stream: BufferedReadableStream,
writer: WritableStreamDefaultWriter<Uint8Array>, writer: WritableStreamDefaultWriter<Uint8Array>,
path: string, path: string,
): ReadableStream<Uint8Array> { ): ReadableStream<Uint8Array> {

View file

@ -1,4 +1,4 @@
import { BufferedStream, ChunkStream, pipeFrom, WritableStream, WritableStreamDefaultWriter } from '@yume-chan/stream-extra'; import { BufferedReadableStream, ChunkStream, pipeFrom, WritableStream, WritableStreamDefaultWriter } from '@yume-chan/stream-extra';
import Struct from '@yume-chan/struct'; import Struct from '@yume-chan/struct';
import { AdbSyncRequestId, adbSyncWriteRequest } from './request.js'; import { AdbSyncRequestId, adbSyncWriteRequest } from './request.js';
@ -16,7 +16,7 @@ const ResponseTypes = {
export const ADB_SYNC_MAX_PACKET_SIZE = 64 * 1024; export const ADB_SYNC_MAX_PACKET_SIZE = 64 * 1024;
export function adbSyncPush( export function adbSyncPush(
stream: BufferedStream, stream: BufferedReadableStream,
writer: WritableStreamDefaultWriter<Uint8Array>, writer: WritableStreamDefaultWriter<Uint8Array>,
filename: string, filename: string,
mode: number = (LinuxFileType.File << 12) | 0o666, mode: number = (LinuxFileType.File << 12) | 0o666,

View file

@ -1,4 +1,4 @@
import type { BufferedStream } from '@yume-chan/stream-extra'; import type { BufferedReadableStream } from '@yume-chan/stream-extra';
import Struct, { type StructAsyncDeserializeStream, type StructLike, type StructValueType } from '@yume-chan/struct'; import Struct, { type StructAsyncDeserializeStream, type StructLike, type StructValueType } from '@yume-chan/struct';
import { decodeUtf8 } from '../../utils/index.js'; import { decodeUtf8 } from '../../utils/index.js';
@ -43,7 +43,7 @@ export const AdbSyncFailResponse =
}); });
export async function adbSyncReadResponse<T extends Record<string, StructLike<any>>>( export async function adbSyncReadResponse<T extends Record<string, StructLike<any>>>(
stream: BufferedStream, stream: BufferedReadableStream,
types: T, types: T,
// When `T` is a union type, `T[keyof T]` only includes their common keys. // When `T` is a union type, `T[keyof T]` only includes their common keys.
// For example, let `type T = { a: string, b: string } | { a: string, c: string}`, // For example, let `type T = { a: string, b: string } | { a: string, c: string}`,

View file

@ -1,4 +1,4 @@
import type { BufferedStream, WritableStreamDefaultWriter } from '@yume-chan/stream-extra'; import type { BufferedReadableStream, WritableStreamDefaultWriter } from '@yume-chan/stream-extra';
import Struct, { placeholder } from '@yume-chan/struct'; import Struct, { placeholder } from '@yume-chan/struct';
import { AdbSyncRequestId, adbSyncWriteRequest } from './request.js'; import { AdbSyncRequestId, adbSyncWriteRequest } from './request.js';
@ -109,7 +109,7 @@ const LSTAT_V2_RESPONSE_TYPES = {
}; };
export async function adbSyncLstat( export async function adbSyncLstat(
stream: BufferedStream, stream: BufferedReadableStream,
writer: WritableStreamDefaultWriter<Uint8Array>, writer: WritableStreamDefaultWriter<Uint8Array>,
path: string, path: string,
v2: boolean, v2: boolean,
@ -143,7 +143,7 @@ export async function adbSyncLstat(
} }
export async function adbSyncStat( export async function adbSyncStat(
stream: BufferedStream, stream: BufferedReadableStream,
writer: WritableStreamDefaultWriter<Uint8Array>, writer: WritableStreamDefaultWriter<Uint8Array>,
path: string, path: string,
): Promise<AdbSyncStatResponse> { ): Promise<AdbSyncStatResponse> {

View file

@ -1,5 +1,5 @@
import { AutoDisposable } from '@yume-chan/event'; import { AutoDisposable } from '@yume-chan/event';
import { BufferedStream, ReadableStream, WrapReadableStream, WrapWritableStream, WritableStream, WritableStreamDefaultWriter } from '@yume-chan/stream-extra'; import { BufferedReadableStream, ReadableStream, WrapReadableStream, WrapWritableStream, WritableStream, WritableStreamDefaultWriter } from '@yume-chan/stream-extra';
import type { Adb } from '../../adb.js'; import type { Adb } from '../../adb.js';
import { AdbFeatures } from '../../features.js'; import { AdbFeatures } from '../../features.js';
@ -30,7 +30,7 @@ export function dirname(path: string): string {
export class AdbSync extends AutoDisposable { export class AdbSync extends AutoDisposable {
protected adb: Adb; protected adb: Adb;
protected stream: BufferedStream; protected stream: BufferedReadableStream;
protected writer: WritableStreamDefaultWriter<Uint8Array>; protected writer: WritableStreamDefaultWriter<Uint8Array>;
@ -57,7 +57,7 @@ export class AdbSync extends AutoDisposable {
super(); super();
this.adb = adb; this.adb = adb;
this.stream = new BufferedStream(socket.readable); this.stream = new BufferedReadableStream(socket.readable);
this.writer = socket.writable.getWriter(); this.writer = socket.writable.getWriter();
} }

View file

@ -4,6 +4,9 @@
{ {
"path": "../event/tsconfig.json" "path": "../event/tsconfig.json"
}, },
{
"path": "../stream-extra/tsconfig.build.json"
},
{ {
"path": "../struct/tsconfig.build.json" "path": "../struct/tsconfig.build.json"
}, },

View file

@ -1,7 +1,7 @@
// cspell: ignore logcat // cspell: ignore logcat
import { AdbCommandBase, AdbSubprocessNoneProtocol } from '@yume-chan/adb'; import { AdbCommandBase, AdbSubprocessNoneProtocol } from '@yume-chan/adb';
import { BufferedStream, BufferedStreamEndedError, DecodeUtf8Stream, ReadableStream, SplitStringStream, WritableStream } from '@yume-chan/stream-extra'; import { BufferedReadableStream, BufferedReadableStreamEndedError, DecodeUtf8Stream, ReadableStream, SplitStringStream, WritableStream } from '@yume-chan/stream-extra';
import Struct, { decodeUtf8, StructAsyncDeserializeStream } from '@yume-chan/struct'; import Struct, { decodeUtf8, StructAsyncDeserializeStream } from '@yume-chan/struct';
// `adb logcat` is an alias to `adb shell logcat` // `adb logcat` is an alias to `adb shell logcat`
@ -185,7 +185,7 @@ export class Logcat extends AdbCommandBase {
} }
public binary(options?: LogcatOptions): ReadableStream<AndroidLogEntry> { public binary(options?: LogcatOptions): ReadableStream<AndroidLogEntry> {
let bufferedStream: BufferedStream; let bufferedStream: BufferedReadableStream;
return new ReadableStream({ return new ReadableStream({
start: async () => { start: async () => {
const { stdout } = await this.adb.subprocess.spawn([ const { stdout } = await this.adb.subprocess.spawn([
@ -197,14 +197,14 @@ export class Logcat extends AdbCommandBase {
// PERF: None protocol is 150% faster then Shell protocol // PERF: None protocol is 150% faster then Shell protocol
protocols: [AdbSubprocessNoneProtocol], protocols: [AdbSubprocessNoneProtocol],
}); });
bufferedStream = new BufferedStream(stdout); bufferedStream = new BufferedReadableStream(stdout);
}, },
async pull(controller) { async pull(controller) {
try { try {
const entry = await deserializeAndroidLogEntry(bufferedStream); const entry = await deserializeAndroidLogEntry(bufferedStream);
controller.enqueue(entry); controller.enqueue(entry);
} catch (e) { } catch (e) {
if (e instanceof BufferedStreamEndedError) { if (e instanceof BufferedReadableStreamEndedError) {
controller.close(); controller.close();
return; return;
} }

View file

@ -3,6 +3,9 @@
"references": [ "references": [
{ {
"path": "../adb/tsconfig.build.json" "path": "../adb/tsconfig.build.json"
},
{
"path": "../stream-extra/tsconfig.build.json"
} }
] ]
} }

View file

@ -1,5 +1,5 @@
import { EventEmitter } from '@yume-chan/event'; import { EventEmitter } from '@yume-chan/event';
import { BufferedStream, ReadableWritablePair, type WritableStreamDefaultWriter } from '@yume-chan/stream-extra'; import { BufferedReadableStream, ReadableWritablePair, type WritableStreamDefaultWriter } from '@yume-chan/stream-extra';
import type { ScrcpyInjectScrollControlMessage1_22, ScrcpyOptions } from '../options/index.js'; import type { ScrcpyInjectScrollControlMessage1_22, ScrcpyOptions } from '../options/index.js';
import { ClipboardMessage } from './clipboard.js'; import { ClipboardMessage } from './clipboard.js';
@ -25,7 +25,7 @@ export class ScrcpyControlClient {
) { ) {
this.options = options; this.options = options;
const buffered = new BufferedStream(stream.readable); const buffered = new BufferedReadableStream(stream.readable);
this.writer = stream.writable.getWriter(); this.writer = stream.writable.getWriter();
(async () => { (async () => {
try { try {

View file

@ -16,6 +16,9 @@
{ {
"path": "../event/tsconfig.json" "path": "../event/tsconfig.json"
}, },
{
"path": "../stream-extra/tsconfig.build.json"
},
{ {
"path": "../struct/tsconfig.build.json" "path": "../struct/tsconfig.build.json"
} }

View file

@ -1,6 +1,6 @@
import { describe, expect, it } from '@jest/globals'; import { describe, expect, it } from '@jest/globals';
import { BufferedStream } from "./buffered.js"; import { BufferedReadableStream } from "./buffered.js";
import { ReadableStream } from "./stream.js"; import { ReadableStream } from "./stream.js";
function randomUint8Array(length: number) { function randomUint8Array(length: number) {
@ -38,7 +38,7 @@ async function runTest(inputSizes: number[], readSizes: number[]) {
} }
const stream = new MockReadableStream(buffers); const stream = new MockReadableStream(buffers);
const buffered = new BufferedStream(stream); const buffered = new BufferedReadableStream(stream);
index = 0; index = 0;
for (const size of readSizes) { for (const size of readSizes) {
@ -52,14 +52,14 @@ describe('BufferedStream', () => {
describe('read 1 time', () => { describe('read 1 time', () => {
it('read 0 buffer', async () => { it('read 0 buffer', async () => {
const source = new MockReadableStream([]); const source = new MockReadableStream([]);
const buffered = new BufferedStream(source); const buffered = new BufferedReadableStream(source);
await expect(buffered.read(10)).rejects.toThrow(); await expect(buffered.read(10)).rejects.toThrow();
}); });
it('input 1 exact buffer', async () => { it('input 1 exact buffer', async () => {
const input = randomUint8Array(10); const input = randomUint8Array(10);
const source = new MockReadableStream([input]); const source = new MockReadableStream([input]);
const buffered = new BufferedStream(source); const buffered = new BufferedReadableStream(source);
await expect(buffered.read(10)).resolves.toBe(input); await expect(buffered.read(10)).resolves.toBe(input);
}); });
@ -69,7 +69,7 @@ describe('BufferedStream', () => {
it('read 1 small buffer', async () => { it('read 1 small buffer', async () => {
const source = new MockReadableStream([randomUint8Array(5)]); const source = new MockReadableStream([randomUint8Array(5)]);
const buffered = new BufferedStream(source); const buffered = new BufferedReadableStream(source);
await expect(buffered.read(10)).rejects.toThrow(); await expect(buffered.read(10)).rejects.toThrow();
}); });
@ -79,7 +79,7 @@ describe('BufferedStream', () => {
it('read 2 small buffers', async () => { it('read 2 small buffers', async () => {
const source = new MockReadableStream([randomUint8Array(5), randomUint8Array(5)]); const source = new MockReadableStream([randomUint8Array(5), randomUint8Array(5)]);
const buffered = new BufferedStream(source); const buffered = new BufferedReadableStream(source);
await expect(buffered.read(20)).rejects.toThrow(); await expect(buffered.read(20)).rejects.toThrow();
}); });

View file

@ -1,7 +1,7 @@
import { PushReadableStream } from "./push-readable.js"; import { PushReadableStream } from "./push-readable.js";
import type { ReadableStream, ReadableStreamDefaultReader } from "./stream.js"; import type { ReadableStream, ReadableStreamDefaultReader } from "./stream.js";
export class BufferedStreamEndedError extends Error { export class BufferedReadableStreamEndedError extends Error {
public constructor() { public constructor() {
super('Stream ended'); super('Stream ended');
@ -10,7 +10,7 @@ export class BufferedStreamEndedError extends Error {
} }
} }
export class BufferedStream { export class BufferedReadableStream {
private buffered: Uint8Array | undefined; private buffered: Uint8Array | undefined;
private bufferedOffset = 0; private bufferedOffset = 0;
private bufferedLength = 0; private bufferedLength = 0;
@ -27,7 +27,7 @@ export class BufferedStream {
private async readSource() { private async readSource() {
const { done, value } = await this.reader.read(); const { done, value } = await this.reader.read();
if (done) { if (done) {
throw new BufferedStreamEndedError(); throw new BufferedReadableStreamEndedError();
} }
return value; return value;
} }

View file

@ -1,6 +1,6 @@
import type Struct from "@yume-chan/struct"; import type Struct from "@yume-chan/struct";
import type { StructValueType } from "@yume-chan/struct"; import type { StructValueType } from "@yume-chan/struct";
import { BufferedStream, BufferedStreamEndedError } from "./buffered.js"; import { BufferedReadableStream, BufferedReadableStreamEndedError } from "./buffered.js";
import { PushReadableStream, PushReadableStreamController } from "./push-readable.js"; import { PushReadableStream, PushReadableStreamController } from "./push-readable.js";
import { ReadableStream, WritableStream, type ReadableWritablePair } from "./stream.js"; import { ReadableStream, WritableStream, type ReadableWritablePair } from "./stream.js";
@ -16,7 +16,7 @@ export class StructDeserializeStream<T extends Struct<any, any, any, any>>
public constructor(struct: T) { public constructor(struct: T) {
// Convert incoming chunks to a `BufferedStream` // Convert incoming chunks to a `BufferedStream`
let incomingStreamController!: PushReadableStreamController<Uint8Array>; let incomingStreamController!: PushReadableStreamController<Uint8Array>;
const incomingStream = new BufferedStream( const incomingStream = new BufferedReadableStream(
new PushReadableStream<Uint8Array>( new PushReadableStream<Uint8Array>(
controller => incomingStreamController = controller, controller => incomingStreamController = controller,
) )
@ -28,7 +28,7 @@ export class StructDeserializeStream<T extends Struct<any, any, any, any>>
const value = await struct.deserialize(incomingStream); const value = await struct.deserialize(incomingStream);
controller.enqueue(value); controller.enqueue(value);
} catch (e) { } catch (e) {
if (e instanceof BufferedStreamEndedError) { if (e instanceof BufferedReadableStreamEndedError) {
controller.close(); controller.close();
return; return;
} }