refactor(stream): create BufferedTransformStream to cover common transformation usage of BufferedStream

This commit is contained in:
Simon Chan 2022-06-25 13:52:45 +08:00
parent 3a8ec41293
commit 17448eb1d4
7 changed files with 94 additions and 87 deletions

View file

@ -44,20 +44,20 @@ export async function* adbSyncOpenDir(
v2: boolean, v2: boolean,
): AsyncGenerator<AdbSyncEntry, void, void> { ): AsyncGenerator<AdbSyncEntry, void, void> {
let requestId: AdbSyncRequestId.List | AdbSyncRequestId.List2; let requestId: AdbSyncRequestId.List | AdbSyncRequestId.List2;
let responseType: typeof LIST_V1_RESPONSE_TYPES | typeof LIST_V2_RESPONSE_TYPES; let responseTypes: typeof LIST_V1_RESPONSE_TYPES | typeof LIST_V2_RESPONSE_TYPES;
if (v2) { if (v2) {
requestId = AdbSyncRequestId.List2; requestId = AdbSyncRequestId.List2;
responseType = LIST_V2_RESPONSE_TYPES; responseTypes = LIST_V2_RESPONSE_TYPES;
} else { } else {
requestId = AdbSyncRequestId.List; requestId = AdbSyncRequestId.List;
responseType = LIST_V1_RESPONSE_TYPES; responseTypes = LIST_V1_RESPONSE_TYPES;
} }
await adbSyncWriteRequest(writer, requestId, path); await adbSyncWriteRequest(writer, requestId, path);
while (true) { while (true) {
const response = await adbSyncReadResponse(stream, responseType); const response = await adbSyncReadResponse(stream, responseTypes);
switch (response.id) { switch (response.id) {
case AdbSyncResponseId.Entry: case AdbSyncResponseId.Entry:
yield { yield {

View file

@ -115,23 +115,24 @@ export async function adbSyncLstat(
v2: boolean, v2: boolean,
): Promise<AdbSyncStat> { ): Promise<AdbSyncStat> {
let requestId: AdbSyncRequestId.Lstat | AdbSyncRequestId.Lstat2; let requestId: AdbSyncRequestId.Lstat | AdbSyncRequestId.Lstat2;
let responseType: typeof LSTAT_RESPONSE_TYPES | typeof LSTAT_V2_RESPONSE_TYPES; let responseTypes: typeof LSTAT_RESPONSE_TYPES | typeof LSTAT_V2_RESPONSE_TYPES;
if (v2) { if (v2) {
requestId = AdbSyncRequestId.Lstat2; requestId = AdbSyncRequestId.Lstat2;
responseType = LSTAT_V2_RESPONSE_TYPES; responseTypes = LSTAT_V2_RESPONSE_TYPES;
} else { } else {
requestId = AdbSyncRequestId.Lstat; requestId = AdbSyncRequestId.Lstat;
responseType = LSTAT_RESPONSE_TYPES; responseTypes = LSTAT_RESPONSE_TYPES;
} }
await adbSyncWriteRequest(writer, requestId, path); await adbSyncWriteRequest(writer, requestId, path);
const response = await adbSyncReadResponse(stream, responseType); const response = await adbSyncReadResponse(stream, responseTypes);
switch (response.id) { switch (response.id) {
case AdbSyncResponseId.Lstat: case AdbSyncResponseId.Lstat:
return { return {
mode: response.mode, mode: response.mode,
// Convert to `BigInt` to make it compatible with `AdbSyncStatResponse`
size: BigInt(response.size), size: BigInt(response.size),
mtime: BigInt(response.mtime), mtime: BigInt(response.mtime),
get type() { return response.type; }, get type() { return response.type; },

View file

@ -31,9 +31,10 @@ export class AdbSync extends AutoDisposable {
protected adb: Adb; protected adb: Adb;
protected stream: BufferedReadableStream; protected stream: BufferedReadableStream;
// Getting another writer on a locked WritableStream will throw.
// We don't want this behavior on higher-level APIs.
// So we acquire the writer early and use a blocking lock to guard it.
protected writer: WritableStreamDefaultWriter<Uint8Array>; protected writer: WritableStreamDefaultWriter<Uint8Array>;
protected sendLock = this.addDisposable(new AutoResetEvent()); protected sendLock = this.addDisposable(new AutoResetEvent());
public get supportsStat(): boolean { public get supportsStat(): boolean {
@ -152,7 +153,7 @@ export class AdbSync extends AutoDisposable {
if (this.needPushMkdirWorkaround) { if (this.needPushMkdirWorkaround) {
// It may fail if the path is already existed. // It may fail if the path is already existed.
// Ignore the result. // Ignore the result.
// TODO: sync: test this // TODO: sync: test push mkdir workaround (need an Android 8 device)
await this.adb.subprocess.spawnAndWait([ await this.adb.subprocess.spawnAndWait([
'mkdir', 'mkdir',
'-p', '-p',

View file

@ -1,7 +1,7 @@
// cspell: ignore logcat // cspell: ignore logcat
import { AdbCommandBase, AdbSubprocessNoneProtocol } from '@yume-chan/adb'; import { AdbCommandBase, AdbSubprocessNoneProtocol } from '@yume-chan/adb';
import { BufferedReadableStream, BufferedReadableStreamEndedError, DecodeUtf8Stream, ReadableStream, SplitStringStream, WritableStream } from '@yume-chan/stream-extra'; import { BufferedTransformStream, DecodeUtf8Stream, SplitStringStream, WrapReadableStream, WritableStream, type ReadableStream } from '@yume-chan/stream-extra';
import Struct, { decodeUtf8, StructAsyncDeserializeStream } from '@yume-chan/struct'; import Struct, { decodeUtf8, StructAsyncDeserializeStream } from '@yume-chan/struct';
// `adb logcat` is an alias to `adb shell logcat` // `adb logcat` is an alias to `adb shell logcat`
@ -185,36 +185,21 @@ export class Logcat extends AdbCommandBase {
} }
public binary(options?: LogcatOptions): ReadableStream<AndroidLogEntry> { public binary(options?: LogcatOptions): ReadableStream<AndroidLogEntry> {
let bufferedStream: BufferedReadableStream; return new WrapReadableStream(async () => {
return new ReadableStream({ // TODO: make `spawn` return synchronously with streams pending
start: async () => { // so it's easier to chain them.
const { stdout } = await this.adb.subprocess.spawn([ const { stdout } = await this.adb.subprocess.spawn([
'logcat', 'logcat',
'-B', '-B',
...(options?.pid ? ['--pid', options.pid.toString()] : []), ...(options?.pid ? ['--pid', options.pid.toString()] : []),
...(options?.ids ? ['-b', Logcat.joinLogId(options.ids)] : []) ...(options?.ids ? ['-b', Logcat.joinLogId(options.ids)] : [])
], { ], {
// PERF: None protocol is 150% faster then Shell protocol // PERF: None protocol is 150% faster then Shell protocol
protocols: [AdbSubprocessNoneProtocol], protocols: [AdbSubprocessNoneProtocol],
}); });
bufferedStream = new BufferedReadableStream(stdout); return stdout;
}, }).pipeThrough(new BufferedTransformStream(stream => {
async pull(controller) { return deserializeAndroidLogEntry(stream);
try { }))
const entry = await deserializeAndroidLogEntry(bufferedStream);
controller.enqueue(entry);
} catch (e) {
if (e instanceof BufferedReadableStreamEndedError) {
controller.close();
return;
}
throw e;
}
},
cancel() {
bufferedStream.close();
},
});
} }
} }

View file

@ -0,0 +1,59 @@
import type { ValueOrPromise } from '@yume-chan/struct';
import { BufferedReadableStream, BufferedReadableStreamEndedError } from './buffered.js';
import { PushReadableStream, PushReadableStreamController } from './push-readable.js';
import { ReadableStream, ReadableWritablePair, WritableStream } from './stream.js';
// TODO: BufferedTransformStream: find better implementation
export class BufferedTransformStream<T> implements ReadableWritablePair<T, Uint8Array> {
private _readable: ReadableStream<T>;
public get readable() { return this._readable; }
private _writable: WritableStream<Uint8Array>;
public get writable() { return this._writable; }
constructor(transform: (stream: BufferedReadableStream) => ValueOrPromise<T>) {
// Convert incoming chunks to a `BufferedReadableStream`
let sourceStreamController!: PushReadableStreamController<Uint8Array>;
const sourceStream = new PushReadableStream<Uint8Array>(
controller =>
sourceStreamController = controller,
)
const buffered = new BufferedReadableStream(sourceStream);
this._readable = new ReadableStream<T>({
async pull(controller) {
try {
const value = await transform(buffered);
controller.enqueue(value);
} catch (e) {
// TODO: BufferedTransformStream: The semantic of stream ending is not clear
// If the `transform` started but did not finish, it should really be an error?
// But we can't detect that, unless there is a `peek` method on buffered stream.
if (e instanceof BufferedReadableStreamEndedError) {
controller.close();
return;
}
throw e;
}
},
cancel: (reason) => {
// Propagate cancel to the source stream
// So future writes will be rejected
sourceStream.cancel(reason);
}
});
this._writable = new WritableStream({
async write(chunk) {
await sourceStreamController.enqueue(chunk);
},
abort() {
sourceStreamController.close();
},
close() {
sourceStreamController.close();
},
});
}
}

View file

@ -1,3 +1,4 @@
export * from './buffered-transform.js';
export * from './buffered.js'; export * from './buffered.js';
export * from './chunk.js'; export * from './chunk.js';
export * from './decode-utf8.js'; export * from './decode-utf8.js';

View file

@ -1,52 +1,12 @@
import type Struct from "@yume-chan/struct"; import type Struct from "@yume-chan/struct";
import type { StructValueType } from "@yume-chan/struct"; import type { StructValueType } from "@yume-chan/struct";
import { BufferedReadableStream, BufferedReadableStreamEndedError } from "./buffered.js"; import { BufferedTransformStream } from './buffered-transform.js';
import { PushReadableStream, PushReadableStreamController } from "./push-readable.js";
import { ReadableStream, WritableStream, type ReadableWritablePair } from "./stream.js";
// TODO: StructTransformStream: Looking for better implementation
export class StructDeserializeStream<T extends Struct<any, any, any, any>> export class StructDeserializeStream<T extends Struct<any, any, any, any>>
implements ReadableWritablePair<Uint8Array, StructValueType<T>>{ extends BufferedTransformStream<StructValueType<T>> {
private _readable: ReadableStream<StructValueType<T>>;
public get readable() { return this._readable; }
private _writable: WritableStream<Uint8Array>;
public get writable() { return this._writable; }
public constructor(struct: T) { public constructor(struct: T) {
// Convert incoming chunks to a `BufferedStream` super((stream) => {
let incomingStreamController!: PushReadableStreamController<Uint8Array>; return struct.deserialize(stream)
const incomingStream = new BufferedReadableStream(
new PushReadableStream<Uint8Array>(
controller => incomingStreamController = controller,
)
);
this._readable = new ReadableStream<StructValueType<T>>({
async pull(controller) {
try {
const value = await struct.deserialize(incomingStream);
controller.enqueue(value);
} catch (e) {
if (e instanceof BufferedReadableStreamEndedError) {
controller.close();
return;
}
throw e;
}
}
});
this._writable = new WritableStream({
async write(chunk) {
await incomingStreamController.enqueue(chunk);
},
abort() {
incomingStreamController.close();
},
close() {
incomingStreamController.close();
},
}); });
} }
} }