refactor: optimize streams

This commit is contained in:
Simon Chan 2022-03-01 15:19:34 +08:00
parent f4016df906
commit a92d80951b
No known key found for this signature in database
GPG key ID: 8F75717685A974FB
8 changed files with 133 additions and 105 deletions

View file

@ -1,6 +1,6 @@
// cspell: ignore scrollback // cspell: ignore scrollback
import { AdbSubprocessProtocol, encodeUtf8 } from "@yume-chan/adb"; import { AbortController, AdbSubprocessProtocol, encodeUtf8 } from "@yume-chan/adb";
import { AutoDisposable } from "@yume-chan/event"; import { AutoDisposable } from "@yume-chan/event";
import { Terminal } from 'xterm'; import { Terminal } from 'xterm';
import { FitAddon } from 'xterm-addon-fit'; import { FitAddon } from 'xterm-addon-fit';

View file

@ -548,6 +548,7 @@ const FileManager: NextPage = (): JSX.Element | null => {
const sync = await globalState.device!.sync(); const sync = await globalState.device!.sync();
try { try {
const readable = sync.read(path); const readable = sync.read(path);
// @ts-ignore ReadableStream definitions are slightly incompatible
const response = new Response(readable); const response = new Response(readable);
const blob = await response.blob(); const blob = await response.blob();
const url = window.URL.createObjectURL(blob); const url = window.URL.createObjectURL(blob);

View file

@ -533,13 +533,15 @@ class ScrcpyPageState {
async stop() { async stop() {
// Request to close client first // Request to close client first
await this.client?.close(); await this.client?.close();
this.client = undefined;
// Otherwise some packets may still arrive at decoder // Otherwise some packets may still arrive at decoder
this.decoder?.dispose(); this.decoder?.dispose();
this.decoder = undefined;
this.running = false; runInAction(() => {
this.client = undefined;
this.decoder = undefined;
this.running = false;
});
} }
handleDeviceViewRef(element: DeviceViewRef | null) { handleDeviceViewRef(element: DeviceViewRef | null) {

View file

@ -37,7 +37,8 @@ export class AdbWebUsbBackendStream implements ReadableWritablePair<Uint8Array,
// "ok" and "babble" both have received `data`, // "ok" and "babble" both have received `data`,
// "babble" just means there is more data to be read. // "babble" just means there is more data to be read.
// From spec, the `result.data` always covers the whole `buffer`. // From spec, the `result.data` always covers the whole `buffer`.
controller.enqueue(new Uint8Array(result.data!.buffer)); const chunk = new Uint8Array(result.data!.buffer);
controller.enqueue(chunk);
}, },
}, { }, {
highWaterMark: 16 * 1024, highWaterMark: 16 * 1024,

View file

@ -172,30 +172,30 @@ export class StructDeserializeStream<T extends Struct<any, any, any, any>>
public constructor(struct: T) { public constructor(struct: T) {
// Convert incoming chunks to a `BufferedStream` // Convert incoming chunks to a `BufferedStream`
let incomingStreamController!: PushReadableStreamController<Uint8Array>; let incomingStreamController!: PushReadableStreamController<Uint8Array>;
const incomingStream = new BufferedStream(new PushReadableStream<Uint8Array>( const incomingStream = new BufferedStream(
controller => incomingStreamController = controller, new PushReadableStream<Uint8Array>(
{ controller => incomingStreamController = controller,
highWaterMark: struct.size * 5, )
size(chunk) { return chunk.byteLength; }, );
}
));
this._readable = new PushReadableStream<StructValueType<T>>(async controller => { this._readable = new PushReadableStream<StructValueType<T>>(
try { async controller => {
// Unless we make `deserialize` be capable of pausing/resuming, try {
// We always need at least one pull loop // Unless we make `deserialize` be capable of pausing/resuming,
while (true) { // We always need at least one pull loop
const value = await struct.deserialize(incomingStream); while (true) {
controller.enqueue(value); const value = await struct.deserialize(incomingStream);
await controller.enqueue(value);
}
} catch (e) {
if (e instanceof BufferedStreamEndedError) {
controller.close();
return;
}
controller.error(e);
} }
} catch (e) {
if (e instanceof BufferedStreamEndedError) {
controller.close();
return;
}
controller.error(e);
} }
}); );
this._writable = new WritableStream({ this._writable = new WritableStream({
async write(chunk) { async write(chunk) {
@ -376,34 +376,26 @@ export type PushReadableStreamSource<T> = (controller: PushReadableStreamControl
export class PushReadableStream<T> extends ReadableStream<T> { export class PushReadableStream<T> extends ReadableStream<T> {
public constructor(source: PushReadableStreamSource<T>, strategy?: QueuingStrategy<T>) { public constructor(source: PushReadableStreamSource<T>, strategy?: QueuingStrategy<T>) {
let pendingPull: PromiseResolver<T> | undefined; let waterMarkLow: PromiseResolver<void> | undefined;
const canceled: AbortController = new AbortController();
let pendingPush: T | undefined;
let pendingPushFinished: PromiseResolver<void> | undefined;
let canceled = false;
let canceledAbortController: AbortController = new AbortController();
super({ super({
start: (controller) => { start: (controller) => {
source({ source({
abortSignal: canceledAbortController.signal, abortSignal: canceled.signal,
async enqueue(chunk) { async enqueue(chunk) {
if (pendingPull) { // Only when the stream in errored, `desiredSize` will be `null`.
pendingPull.resolve(chunk); // But since `null <= 0` is `true`
pendingPull = undefined; // (`null <= 0` is evaluated as `!(null > 0)` => `!false` => `true`),
return; // not handling it will cause a deadlock.
if ((controller.desiredSize ?? 1) <= 0) {
waterMarkLow = new PromiseResolver<void>();
await waterMarkLow.promise;
} }
// When cancelled, let `enqueue` to throw an native error // `controller.enqueue` will throw error for us
if (canceled || (controller.desiredSize ?? 1 > 0)) { // if the stream is already errored.
controller.enqueue(chunk); controller.enqueue(chunk);
return;
}
pendingPush = chunk;
pendingPushFinished = new PromiseResolver();
return pendingPushFinished.promise;
}, },
close() { close() {
controller.close(); controller.close();
@ -413,24 +405,12 @@ export class PushReadableStream<T> extends ReadableStream<T> {
}, },
}); });
}, },
pull: async (controller) => { pull: () => {
if (pendingPushFinished) { waterMarkLow?.resolve();
controller.enqueue(pendingPush);
pendingPushFinished!.resolve();
pendingPushFinished = undefined;
return;
}
pendingPull = new PromiseResolver<T>();
return pendingPull.promise.then((chunk) => {
controller.enqueue(chunk);
});
}, },
cancel: async (reason) => { cancel: async (reason) => {
if (pendingPushFinished) { waterMarkLow?.reject(reason);
pendingPushFinished.reject(reason); canceled.abort();
}
canceledAbortController.abort();
}, },
}, strategy); }, strategy);
} }

View file

@ -541,34 +541,30 @@ export class Struct<
const value = new StructValue(); const value = new StructValue();
Object.defineProperties(value.value, this._extra); Object.defineProperties(value.value, this._extra);
return Syncbird.try(() => { return Syncbird
const iterator = this._fields[Symbol.iterator](); .each(this._fields, ([name, definition]) => {
const iterate: () => StructValue | Syncbird<StructValue> = () => {
const result = iterator.next();
if (result.done) {
return value;
}
const [name, definition] = result.value;
return Syncbird.resolve( return Syncbird.resolve(
definition.deserialize(this.options, stream as any, value) definition.deserialize(this.options, stream as any, value)
).then(fieldValue => { ).then(fieldValue => {
value.set(name, fieldValue); value.set(name, fieldValue);
return iterate();
}); });
}; })
return iterate(); .then(() => {
}).then(value => { const object = value.value;
if (this._postDeserialized) {
const object = value.value as TFields;
const result = this._postDeserialized.call(object, object);
if (result) {
return result;
}
}
return value.value; // Run `postDeserialized`
}).valueOrPromise(); if (this._postDeserialized) {
const override = this._postDeserialized.call(object, object);
// If it returns a new value, use that as result
// Otherwise it only inspects/mutates the object in place.
if (override) {
return override;
}
}
return object;
})
.valueOrPromise();
} }
public serialize(init: Evaluate<Omit<TFields, TOmitInitKey>>): Uint8Array; public serialize(init: Evaluate<Omit<TFields, TOmitInitKey>>): Uint8Array;

View file

@ -2,7 +2,8 @@
import Bluebird from 'bluebird'; import Bluebird from 'bluebird';
type Resolvable<R> = R | PromiseLike<R>; export type Resolvable<R> = R | PromiseLike<R>;
export type IterateFunction<T, R> = (item: T, index: number, arrayLength: number) => Resolvable<R>;
export interface Syncbird<R> extends Bluebird<R> { export interface Syncbird<R> extends Bluebird<R> {
valueOrPromise(): R | PromiseLike<R>; valueOrPromise(): R | PromiseLike<R>;
@ -15,6 +16,39 @@ export interface Syncbird<R> extends Bluebird<R> {
} }
interface SyncbirdStatic { interface SyncbirdStatic {
/**
* Iterate over an array, or a promise of an array,
* which contains promises (or a mix of promises and values) with the given iterator function with the signature `(item, index, value)`
* where item is the resolved value of a respective promise in the input array.
* Iteration happens serially. If any promise in the input array is rejected the returned promise is rejected as well.
*
* Resolves to the original array unmodified, this method is meant to be used for side effects.
* If the iterator function returns a promise or a thenable, the result for the promise is awaited for before continuing with next iteration.
*/
each<R>(
values: Resolvable<Iterable<Resolvable<R>>>,
iterator: IterateFunction<R, any>
): Syncbird<R[]>;
/**
* Reduce an array, or a promise of an array,
* which contains a promises (or a mix of promises and values) with the given `reducer` function with the signature `(total, current, index, arrayLength)`
* where `item` is the resolved value of a respective promise in the input array.
* If any promise in the input array is rejected the returned promise is rejected as well.
*
* If the reducer function returns a promise or a thenable, the result for the promise is awaited for before continuing with next iteration.
*
* *The original array is not modified. If no `initialValue` is given and the array doesn't contain at least 2 items,
* the callback will not be called and `undefined` is returned.
*
* If `initialValue` is given and the array doesn't have at least 1 item, `initialValue` is returned.*
*/
reduce<R, U>(
values: Resolvable<Iterable<Resolvable<R>>>,
reducer: (total: U, current: R, index: number, arrayLength: number) => Resolvable<U>,
initialValue?: U
): Syncbird<U>;
/** /**
* Create a promise that is resolved with the given `value`. If `value` is a thenable or promise, the returned promise will assume its state. * Create a promise that is resolved with the given `value`. If `value` is a thenable or promise, the returned promise will assume its state.
*/ */
@ -29,20 +63,31 @@ interface SyncbirdStatic {
export const Syncbird: SyncbirdStatic = Bluebird.getNewLibraryCopy() as any; export const Syncbird: SyncbirdStatic = Bluebird.getNewLibraryCopy() as any;
const _then = Bluebird.prototype.then; // Bluebird uses `_then` internally.
Syncbird.prototype.then = function <T, TResult1 = T, TResult2 = never>( const _then = (Syncbird.prototype as any)._then;
this: Bluebird<T>, Syncbird.prototype._then = function <T, TResult1 = T, TResult2 = never>(
onfulfilled?: ((value: T) => TResult1 | PromiseLike<TResult1>) | undefined | null, this: Syncbird<T>,
onrejected?: ((reason: any) => TResult2 | PromiseLike<TResult2>) | undefined | null onfulfilled: ((value: T) => unknown) | undefined | null,
): Syncbird<TResult1 | TResult2> { onrejected: ((reason: any) => unknown) | undefined | null,
_: never,
receiver: unknown,
internalData: unknown,
): Syncbird<unknown> {
if (this.isFulfilled()) { if (this.isFulfilled()) {
if (!onfulfilled) { if (!onfulfilled) {
return this as unknown as Syncbird<TResult1>; return this;
} else { } else {
return Syncbird.resolve(onfulfilled(this.value())) as Syncbird<TResult1 | TResult2>; // Synchronously call `onfulfilled`, and wrap the result in a new `Syncbird` object.
return Syncbird.resolve(
onfulfilled.call(
receiver,
this.value()
)
);
} }
} else { } else {
return _then.call(this, onfulfilled, onrejected) as Syncbird<TResult1 | TResult2>; // Forward to Bluebird's `_then` method.
return _then.call(this, onfulfilled, onrejected, _, receiver, internalData);
} }
}; };

View file

@ -83,16 +83,19 @@ export class NumberFieldDefinition<
stream: StructDeserializeStream | StructAsyncDeserializeStream, stream: StructDeserializeStream | StructAsyncDeserializeStream,
struct: StructValue, struct: StructValue,
): ValueOrPromise<NumberFieldValue<this>> { ): ValueOrPromise<NumberFieldValue<this>> {
return Syncbird.try(() => { return Syncbird
return stream.read(this.getSize()); .try(() => {
}).then(array => { return stream.read(this.getSize());
const view = new DataView(array.buffer, array.byteOffset, array.byteLength); })
const value = view[this.type.dataViewGetter]( .then(array => {
0, const view = new DataView(array.buffer, array.byteOffset, array.byteLength);
options.littleEndian const value = view[this.type.dataViewGetter](
); 0,
return this.create(options, struct, value as any); options.littleEndian
}).valueOrPromise(); );
return this.create(options, struct, value as any);
})
.valueOrPromise();
} }
} }