fix: fix issues in stream state

This commit is contained in:
Simon Chan 2022-02-18 18:19:28 +08:00
parent 2a5843eb20
commit 5bc3d49334
14 changed files with 172 additions and 171 deletions

View file

@ -37,7 +37,9 @@ export class ProgressStream extends TransformStream<ArrayBuffer, ArrayBuffer> {
let StreamSaver: typeof import('streamsaver');
if (typeof window !== 'undefined') {
const { publicRuntimeConfig } = getConfig();
StreamSaver = require('streamsaver');
// Can't use `import` here because ESM is read-only (can't set `mitm` field)
// Add `await` here because top-level await is on, so every import can be a `Promise`
StreamSaver = await require('streamsaver');
StreamSaver.mitm = publicRuntimeConfig.basePath + '/StreamSaver/mitm.html';
}
@ -186,13 +188,21 @@ class FileManagerState {
(async () => {
const sync = await globalState.device!.sync();
try {
const itemPath = path.resolve(this.path, this.selectedItems[0].name!);
const readableStream = await sync.read(itemPath);
const item = this.selectedItems[0];
const itemPath = path.resolve(this.path, item.name);
const readable = await sync.read(itemPath);
const writeable = StreamSaver!.createWriteStream(this.selectedItems[0].name!, {
size: this.selectedItems[0].size,
});
await readableStream.pipeTo(writeable);
const writeable = StreamSaver!.createWriteStream(
item.name,
{ size: item.size }
);
await readable
.pipeThrough(new TransformStream({
transform(chunk, controller) {
controller.enqueue(new Uint8Array(chunk));
},
}))
.pipeTo(writeable);
} catch (e) {
globalState.showErrorDialog(e instanceof Error ? e.message : `${e}`);
} finally {

View file

@ -4,7 +4,7 @@ import { action, autorun, computed, makeAutoObservable } from "mobx";
import { observer } from "mobx-react-lite";
import { NextPage } from "next";
import Head from "next/head";
import React, { useCallback, useEffect, useRef } from 'react';
import { useCallback, useEffect, useRef } from 'react';
import { CommandBar, DemoModePanel, DeviceView } from '../components';
import { globalState } from "../state";
import { Icons, RouteStackProps } from "../utils";

View file

@ -5,9 +5,8 @@ import { action, makeAutoObservable, observable, runInAction } from "mobx";
import { observer } from "mobx-react-lite";
import { NextPage } from "next";
import Head from "next/head";
import React from "react";
import { globalState } from "../state";
import { chunkFile, pickFile, RouteStackProps } from "../utils";
import { pickFile, RouteStackProps } from "../utils";
import { ProgressStream } from './file-manager';
enum Stage {

View file

@ -1,8 +1,8 @@
import { CommandBar, Dialog, Dropdown, ICommandBarItemProps, Icon, IconButton, IDropdownOption, LayerHost, Position, ProgressIndicator, SpinButton, Stack, Toggle, TooltipHost } from "@fluentui/react";
import { useId } from "@fluentui/react-hooks";
import { TransformStream } from '@yume-chan/adb';
import { ADB_SYNC_MAX_PACKET_SIZE, ChunkStream, TransformStream } from '@yume-chan/adb';
import { EventEmitter } from "@yume-chan/event";
import { AndroidKeyCode, AndroidKeyEventAction, AndroidMotionEventAction, CodecOptions, DEFAULT_SERVER_PATH, H264Decoder, H264DecoderConstructor, pushServerStream, ScrcpyClient, ScrcpyLogLevel, ScrcpyOptions1_22, ScrcpyScreenOrientation, TinyH264Decoder, WebCodecsDecoder } from "@yume-chan/scrcpy";
import { AndroidKeyCode, AndroidKeyEventAction, AndroidMotionEventAction, CodecOptions, DEFAULT_SERVER_PATH, H264Decoder, H264DecoderConstructor, pushServer, ScrcpyClient, ScrcpyLogLevel, ScrcpyOptions1_22, ScrcpyScreenOrientation, TinyH264Decoder, WebCodecsDecoder } from "@yume-chan/scrcpy";
import SCRCPY_SERVER_VERSION from '@yume-chan/scrcpy/bin/version';
import { action, autorun, makeAutoObservable, observable, runInAction } from "mobx";
import { observer } from "mobx-react-lite";
@ -12,6 +12,7 @@ import React, { useEffect, useMemo, useState } from "react";
import { DemoModePanel, DeviceView, DeviceViewRef, ExternalLink } from "../components";
import { globalState } from "../state";
import { CommonStackTokens, formatSpeed, Icons, RouteStackProps } from "../utils";
import { ProgressStream } from "./file-manager";
const SERVER_URL = new URL('@yume-chan/scrcpy/bin/scrcpy-server?url', import.meta.url).toString();
@ -407,8 +408,17 @@ class ScrcpyPageState {
}), 1000);
try {
const writable = await pushServerStream(globalState.device);
// TODO: Scrcpy: push server
await new ReadableStream({
start(controller) {
controller.enqueue(serverBuffer);
controller.close();
},
})
.pipeThrough(new ChunkStream(ADB_SYNC_MAX_PACKET_SIZE))
.pipeThrough(new ProgressStream((progress) => {
this.serverUploadedSize = progress;
}))
.pipeTo(pushServer(globalState.device));
runInAction(() => {
this.serverUploadSpeed = this.serverUploadedSize - this.debouncedServerUploadedSize;
@ -440,8 +450,13 @@ class ScrcpyPageState {
// Run scrcpy once will delete the server file
// Re-push it
const writable = await pushServerStream(globalState.device);
// TODO: Scrcpy: push server
await new ReadableStream({
start(controller) {
controller.enqueue(serverBuffer);
controller.close();
},
})
.pipeTo(pushServer(globalState.device));
const factory = this.selectedDecoder.factory;
const decoder = new factory();

View file

@ -3,14 +3,14 @@ import { reaction } from "mobx";
import { observer } from "mobx-react-lite";
import { NextPage } from "next";
import Head from "next/head";
import React, { CSSProperties, useCallback, useEffect, useRef, useState } from 'react';
import { CSSProperties, useCallback, useEffect, useRef, useState } from 'react';
import 'xterm/css/xterm.css';
import { globalState } from "../state";
import { Icons, ResizeObserver, RouteStackProps } from '../utils';
let terminal: import('../components/terminal').AdbTerminal;
if (typeof window !== 'undefined') {
const AdbTerminal: typeof import('../components/terminal').AdbTerminal = require('../components/terminal').AdbTerminal;
const AdbTerminal: typeof import('../components/terminal').AdbTerminal = (await import('../components/terminal')).AdbTerminal;
terminal = new AdbTerminal();
}

View file

@ -5,7 +5,7 @@ import { autorun, makeAutoObservable, runInAction } from "mobx";
import { observer } from "mobx-react-lite";
import { NextPage } from "next";
import Head from "next/head";
import React, { useCallback, useEffect } from "react";
import { useCallback, useEffect } from "react";
import { ExternalLink } from "../components";
import { globalState } from "../state";
import { asyncEffect, Icons, RouteStackProps } from "../utils";
@ -190,14 +190,12 @@ const TcpIp: NextPage = () => {
offText="Disabled"
onChange={handleServicePortEnabledChange}
/>
{globalState && (
<TextField
disabled={!!state.serviceListenAddresses}
value={state.servicePort}
styles={{ root: { width: 300 } }}
onChange={handleServicePortChange}
/>
)}
<TextField
disabled={!globalState.device || !!state.serviceListenAddresses}
value={state.servicePort}
styles={{ root: { width: 300 } }}
onChange={handleServicePortChange}
/>
</StackItem>
<StackItem>

View file

@ -63,8 +63,9 @@ function createStream (port) {
controller.enqueue(data)
}
},
cancel () {
console.log('user aborted')
cancel (reason) {
console.log('user aborted', reason)
port.postMessage({ abort: true })
}
})
}

View file

@ -28,9 +28,3 @@ export function pickFile(options: { multiple?: boolean; } & PickFileOptions): Pr
input.click();
});
}
export async function* chunkFile(file: File, chunkSize: number): AsyncGenerator<ArrayBuffer, void, void> {
for (let i = 0; i < file.size; i += chunkSize) {
yield file.slice(i, i + chunkSize, file.type).arrayBuffer();
}
}

View file

@ -113,10 +113,7 @@ export class AdbSync extends AutoDisposable {
* @param filename The full path of the file on device to write.
* @param mode The unix permissions of the file.
* @param mtime The modified time of the file.
* @returns
* A promise that resolves to a `WritableStream`.
*
* If the promise doesn't resolve immediately, it means the sync object is busy processing another command.
* @returns A `WritableStream` that writes to the file.
*/
public write(
filename: string,

View file

@ -1,6 +1,7 @@
import { PromiseResolver } from "@yume-chan/async";
import { AutoDisposable } from '@yume-chan/event';
import { AdbCommand } from '../packet';
import { AutoResetEvent, chunkArrayLike, TransformStream, WritableStream, WritableStreamDefaultWriter } from '../utils';
import { ChunkStream, TransformStream, WritableStream, WritableStreamDefaultWriter } from '../utils';
import { AdbPacketDispatcher } from './dispatcher';
export interface AdbSocketInfo {
@ -27,9 +28,6 @@ export interface AdbSocketConstructionOptions {
}
export class AdbSocketController extends AutoDisposable implements AdbSocketInfo {
private readonly writeChunkLock = this.addDisposable(new AutoResetEvent());
private readonly writeLock = this.addDisposable(new AutoResetEvent());
private readonly dispatcher!: AdbPacketDispatcher;
public readonly localId!: number;
@ -37,12 +35,15 @@ export class AdbSocketController extends AutoDisposable implements AdbSocketInfo
public readonly localCreated!: boolean;
public readonly serviceString!: string;
private readonly _passthrough: TransformStream<ArrayBuffer, ArrayBuffer>;
private readonly _passthroughWriter: WritableStreamDefaultWriter<ArrayBuffer>;
public get readable() { return this._passthrough.readable; }
private readonly _readablePassthrough: TransformStream<ArrayBuffer, ArrayBuffer>;
private readonly _readablePassthroughWriter: WritableStreamDefaultWriter<ArrayBuffer>;
public get readable() { return this._readablePassthrough.readable; }
private _writePromise: PromiseResolver<void> | undefined;
public readonly writable: WritableStream<ArrayBuffer>;
private _writableClosed = false;
private _closed = false;
public get closed() { return this._closed; }
@ -50,83 +51,76 @@ export class AdbSocketController extends AutoDisposable implements AdbSocketInfo
super();
Object.assign(this, options);
this._passthrough = new TransformStream({}, {
highWaterMark: options.highWaterMark ?? 16 * 1024,
size(chunk) { return chunk.byteLength; }
});
this._passthroughWriter = this._passthrough.writable.getWriter();
// Check this image to help you understand the stream graph
// cspell: disable-next-line
// https://www.plantuml.com/plantuml/png/TL0zoeGm4ErpYc3l5JxyS0yWM6mX5j4C6p4cxcJ25ejttuGX88ZftizxUKmJI275pGhXl0PP_UkfK_CAz5Z2hcWsW9Ny2fdU4C1f5aSchFVxA8vJjlTPRhqZzDQMRB7AklwJ0xXtX0ZSKH1h24ghoKAdGY23FhxC4nS2pDvxzIvxb-8THU0XlEQJ-ZB7SnXTAvc_LhOckhMdLBnbtndpb-SB7a8q2SRD_W00
this.writable = new WritableStream({
write: async (chunk) => {
await this.write(chunk);
},
close: () => {
this.close();
},
}, {
this._readablePassthrough = new TransformStream({}, {
highWaterMark: options.highWaterMark ?? 16 * 1024,
size(chunk) { return chunk.byteLength; }
});
this._readablePassthroughWriter = this._readablePassthrough.writable.getWriter();
const writablePassthrough = new TransformStream();
writablePassthrough.readable
.pipeThrough(new ChunkStream(this.dispatcher.maxPayloadSize))
.pipeTo(new WritableStream<ArrayBuffer>({
write: async (chunk) => {
// Wait for an ack packet
this._writePromise = new PromiseResolver();
await this.dispatcher.sendPacket(
AdbCommand.Write,
this.localId,
this.remoteId,
chunk
);
await this._writePromise.promise;
},
close: async () => {
this._writableClosed = true;
await this.close();
},
}));
this.writable = writablePassthrough.writable;
}
public enqueue(packet: ArrayBuffer) {
return this._passthroughWriter.write(packet);
}
private async writeChunk(data: ArrayBuffer): Promise<void> {
try {
// Wait for an ack packet
await this.writeChunkLock.wait();
} catch {
// Lock has been disposed, which means the socket has been closed
throw new Error('Can not write after closed');
}
await this.dispatcher.sendPacket(
AdbCommand.Write,
this.localId,
this.remoteId,
data
);
}
public async write(data: ArrayBuffer): Promise<void> {
try {
// Keep write operations in order
await this.writeLock.wait();
} catch {
// Lock has been disposed, which means the socket has been closed
throw new Error('Can not write after closed');
}
for await (const chunk of chunkArrayLike(data, this.dispatcher.maxPayloadSize)) {
await this.writeChunk(chunk);
}
this.writeLock.notify();
return this._readablePassthroughWriter.write(packet);
}
public ack() {
this.writeChunkLock.notify();
this._writePromise?.resolve();
}
public async close(): Promise<void> {
if (!this._closed) {
// prevent nested calls
this._closed = true;
// Immediately cancel all pending writes
this.writeLock.dispose();
this.writeChunkLock.dispose();
await this.dispatcher.sendPacket(AdbCommand.Close, this.localId, this.remoteId);
this._passthroughWriter.close();
if (!this._writableClosed) {
// Disallow more data to be written
this.writable.close();
// Error out the pending writes
this._writePromise?.reject(new Error('Socket closed'));
}
if (!this._closed) {
// Only send close packet when `close` is called before `dispose`
// (the client initiated the close)
await this.dispatcher.sendPacket(
AdbCommand.Close,
this.localId,
this.remoteId
);
}
}
public override dispose() {
this._closed = true;
// Close `writable` side
this.close();
// Close `readable` side
this._readablePassthroughWriter.close();
super.dispose();
}
}

View file

@ -3,7 +3,6 @@ import { EventEmitter } from '@yume-chan/event';
import Struct from '@yume-chan/struct';
import { AndroidMotionEventAction, ScrcpyControlMessageType, ScrcpyInjectKeyCodeControlMessage, ScrcpyInjectTextControlMessage, ScrcpyInjectTouchControlMessage, type AndroidKeyEventAction } from './message';
import type { ScrcpyInjectScrollControlMessage1_22, ScrcpyOptions, VideoStreamPacket } from "./options";
import { PushServerOptions, pushServerStream } from "./push-server";
function* splitLines(text: string): Generator<string, void, void> {
let start = 0;
@ -27,15 +26,8 @@ const ClipboardMessage =
.string('content', { lengthField: 'length' });
export class ScrcpyClient {
public static pushServerStream(
device: Adb,
options?: PushServerOptions
) {
return pushServerStream(device, options);
}
public static async getEncoders(
device: Adb,
adb: Adb,
path: string,
version: string,
options: ScrcpyOptions<any>
@ -48,7 +40,7 @@ export class ScrcpyClient {
// Scrcpy server will open connections, before initializing encoder
// Thus although an invalid encoder name is given, the start process will success
const client = await ScrcpyClient.start(device, path, version, options);
const client = await ScrcpyClient.start(adb, path, version, options);
const encoderNameRegex = options.getOutputEncoderNameRegex();
const encoders: string[] = [];
@ -65,18 +57,18 @@ export class ScrcpyClient {
}
public static async start(
device: Adb,
adb: Adb,
path: string,
version: string,
options: ScrcpyOptions<any>
) {
const connection = options.createConnection(device);
const connection = options.createConnection(adb);
let process: AdbSubprocessProtocol | undefined;
try {
await connection.initialize();
process = await device.subprocess.spawn(
process = await adb.subprocess.spawn(
[
// cspell: disable-next-line
`CLASSPATH=${path}`,
@ -103,7 +95,7 @@ export class ScrcpyClient {
}
const [videoStream, controlStream] = result;
return new ScrcpyClient(device, options, process, videoStream, controlStream);
return new ScrcpyClient(adb, options, process, videoStream, controlStream);
} catch (e) {
await process?.kill();
throw e;
@ -112,11 +104,14 @@ export class ScrcpyClient {
}
}
private _adb: Adb;
public get adb() { return this._adb; }
private options: ScrcpyOptions<any>;
private process: AdbSubprocessProtocol;
private _stdout: TransformStream<string, string>;
public get stdout() { return this._stdout.readable; }
private _stdout: ReadableStream<string>;
public get stdout() { return this._stdout; }
public get exit() { return this.process.exit; }
@ -137,39 +132,43 @@ export class ScrcpyClient {
private sendingTouchMessage = false;
public constructor(
device: Adb,
adb: Adb,
options: ScrcpyOptions<any>,
process: AdbSubprocessProtocol,
videoStream: AdbBufferedStream,
controlStream: AdbSocket | undefined,
) {
this._adb = adb;
this.options = options;
this.process = process;
this._stdout = new TransformStream({
transform(chunk, controller) {
for (const line of splitLines(chunk)) {
if (line === '') {
continue;
}
controller.enqueue(line);
}
},
});
process.stdout
this._stdout = process.stdout
.pipeThrough(new DecodeUtf8Stream())
.pipeThrough(this._stdout);
.pipeThrough(new TransformStream({
transform(chunk, controller) {
for (const line of splitLines(chunk)) {
if (line === '') {
continue;
}
controller.enqueue(line);
}
},
}));
this._videoStream = new TransformStream();
const videoStreamWriter = this._videoStream.writable.getWriter();
(async () => {
while (true) {
const packet = await options.parseVideoStream(videoStream);
if (packet.type === 'configuration') {
this._screenWidth = packet.data.croppedWidth;
this._screenHeight = packet.data.croppedHeight;
try {
while (true) {
const packet = await options.parseVideoStream(videoStream);
if (packet.type === 'configuration') {
this._screenWidth = packet.data.croppedWidth;
this._screenHeight = packet.data.croppedHeight;
}
videoStreamWriter.write(packet);
}
videoStreamWriter.write(packet);
} catch {
videoStreamWriter.close();
}
})();
@ -177,16 +176,20 @@ export class ScrcpyClient {
const buffered = new AdbBufferedStream(controlStream);
this._controlStreamWriter = controlStream.writable.getWriter();
(async () => {
while (true) {
const type = await buffered.read(1);
switch (new Uint8Array(type)[0]) {
case 0:
const { content } = await ClipboardMessage.deserialize(buffered);
this.clipboardChangeEvent.fire(content!);
break;
default:
throw new Error('unknown control message type');
try {
while (true) {
const type = await buffered.read(1);
switch (new Uint8Array(type)[0]) {
case 0:
const { content } = await ClipboardMessage.deserialize(buffered);
this.clipboardChangeEvent.fire(content!);
break;
default:
throw new Error('unknown control message type');
}
}
} catch {
// TODO: Scrcpy: handle error
}
})();
}
@ -272,7 +275,7 @@ export class ScrcpyClient {
}
public async close() {
this._controlStreamWriter?.close();
// No need to close streams. Kill the process will destroy them from the other side.
await this.process?.kill();
}
}

View file

@ -195,7 +195,7 @@ export class ScrcpyOptions1_16<T extends ScrcpyOptionsInit1_16 = ScrcpyOptionsIn
.map(key => toScrcpyOptionValue(this.value[key] || defaults[key], '-'));
}
public createConnection(device: Adb): ScrcpyClientConnection {
public createConnection(adb: Adb): ScrcpyClientConnection {
const options: ScrcpyClientConnectionOptions = {
// Old scrcpy connection always have control stream no matter what the option is
control: true,
@ -203,9 +203,9 @@ export class ScrcpyOptions1_16<T extends ScrcpyOptionsInit1_16 = ScrcpyOptionsIn
sendDeviceMeta: true,
};
if (this.value.tunnelForward) {
return new ScrcpyClientForwardConnection(device, options);
return new ScrcpyClientForwardConnection(adb, options);
} else {
return new ScrcpyClientReverseConnection(device, options);
return new ScrcpyClientReverseConnection(adb, options);
}
}

View file

@ -47,7 +47,7 @@ export interface ScrcpyOptions<T> {
getOutputEncoderNameRegex(): RegExp;
createConnection(device: Adb): ScrcpyClientConnection;
createConnection(adb: Adb): ScrcpyClientConnection;
parseVideoStream(stream: AdbBufferedStream): Promise<VideoStreamPacket>;

View file

@ -1,35 +1,25 @@
import { Adb, AdbSync, WritableStream, WritableStreamDefaultWriter } from "@yume-chan/adb";
import { Adb, AdbSync, HookWritableStream, WritableStream } from "@yume-chan/adb";
import { DEFAULT_SERVER_PATH } from "./options";
export interface PushServerOptions {
path?: string;
}
export async function pushServerStream(
export function pushServer(
device: Adb,
options: PushServerOptions = {}
) {
const { path = DEFAULT_SERVER_PATH } = options;
let sync!: AdbSync;
let writable!: WritableStream<ArrayBuffer>;
let writer!: WritableStreamDefaultWriter<ArrayBuffer>;
return new WritableStream<ArrayBuffer>({
return new HookWritableStream<ArrayBuffer, WritableStream<ArrayBuffer>, AdbSync>({
async start() {
sync = await device.sync();
writable = sync.write(path);
writer = writable.getWriter();
const sync = await device.sync();
return {
writable: sync.write(path),
state: sync,
};
},
async write(chunk: ArrayBuffer) {
await writer.ready;
await writer.write(chunk);
},
async abort(e) {
await writer.abort(e);
sync.dispose();
},
async close() {
await writer.close();
async close(sync) {
await sync.dispose();
},
});