feat: migrate scrcpy to streams

This commit is contained in:
Simon Chan 2022-02-16 23:43:41 +08:00
parent 8b78c9c331
commit b7725567a6
21 changed files with 308 additions and 460 deletions

View file

@ -1,8 +1,7 @@
import { AsyncOperationManager } from '@yume-chan/async';
import { AutoDisposable, EventEmitter } from '@yume-chan/event';
import { AdbBackend } from '../backend';
import { AdbCommand, AdbPacket, AdbPacketInit, AdbPacketSerializeStream } from '../packet';
import { AbortController, decodeUtf8, encodeUtf8, StructDeserializeStream, WritableStream, WritableStreamDefaultWriter } from '../utils';
import { AbortController, decodeUtf8, encodeUtf8, ReadableStream, StructDeserializeStream, TransformStream, WritableStream, WritableStreamDefaultWriter } from '../utils';
import { AdbSocketController } from './controller';
import { AdbLogger } from './logger';
import { AdbSocket } from './socket';
@ -32,7 +31,6 @@ export class AdbPacketDispatcher extends AutoDisposable {
private readonly sockets = new Map<number, AdbSocketController>();
private readonly logger: AdbLogger | undefined;
public readonly backend: AdbBackend;
private _packetSerializeStream!: AdbPacketSerializeStream;
private _packetSerializeStreamWriter!: WritableStreamDefaultWriter<AdbPacketInit>;
@ -50,15 +48,62 @@ export class AdbPacketDispatcher extends AutoDisposable {
private readonly errorEvent = this.addDisposable(new EventEmitter<Error>());
public get onError() { return this.errorEvent.event; }
private _running = false;
public get running() { return this._running; }
private _runningAbortController!: AbortController;
private _abortController = new AbortController();
public constructor(backend: AdbBackend, logger?: AdbLogger) {
public constructor(readable: ReadableStream<ArrayBuffer>, writable: WritableStream<ArrayBuffer>, logger?: AdbLogger) {
super();
this.backend = backend;
this.logger = logger;
readable
.pipeThrough(new TransformStream(), { signal: this._abortController.signal })
.pipeThrough(new StructDeserializeStream(AdbPacket))
.pipeTo(new WritableStream({
write: async (packet) => {
try {
this.logger?.onIncomingPacket?.(packet);
switch (packet.command) {
case AdbCommand.OK:
this.handleOk(packet);
return;
case AdbCommand.Close:
await this.handleClose(packet);
return;
case AdbCommand.Write:
if (this.sockets.has(packet.arg1)) {
await this.sockets.get(packet.arg1)!.enqueue(packet.payload!);
await this.sendPacket(AdbCommand.OK, packet.arg1, packet.arg0);
}
// Maybe the device is responding to a packet of last connection
// Just ignore it
return;
case AdbCommand.Open:
await this.handleOpen(packet);
return;
}
const args: AdbPacketReceivedEventArgs = {
handled: false,
packet,
};
this.packetEvent.fire(args);
if (!args.handled) {
this.dispose();
throw new Error(`Unhandled packet with command '${packet.command}'`);
}
} catch (e) {
readable.cancel(e);
writable.abort(e);
this.errorEvent.fire(e as Error);
}
}
}));
this._packetSerializeStream = new AdbPacketSerializeStream();
this._packetSerializeStream.readable.pipeTo(writable, { signal: this._abortController.signal });
this._packetSerializeStreamWriter = this._packetSerializeStream.writable.getWriter();
}
private handleOk(packet: AdbPacket) {
@ -143,70 +188,6 @@ export class AdbPacketDispatcher extends AutoDisposable {
}
}
public start() {
this._running = true;
this._runningAbortController = new AbortController();
this.backend.readable!
.pipeThrough(
new StructDeserializeStream(AdbPacket),
{
preventAbort: true,
preventCancel: true,
signal: this._runningAbortController.signal,
}
)
.pipeTo(new WritableStream({
write: async (packet) => {
try {
this.logger?.onIncomingPacket?.(packet);
switch (packet.command) {
case AdbCommand.OK:
this.handleOk(packet);
return;
case AdbCommand.Close:
await this.handleClose(packet);
return;
case AdbCommand.Write:
if (this.sockets.has(packet.arg1)) {
await this.sockets.get(packet.arg1)!.enqueue(packet.payload!);
await this.sendPacket(AdbCommand.OK, packet.arg1, packet.arg0);
}
// Maybe the device is responding to a packet of last connection
// Just ignore it
return;
case AdbCommand.Open:
await this.handleOpen(packet);
return;
}
const args: AdbPacketReceivedEventArgs = {
handled: false,
packet,
};
this.packetEvent.fire(args);
if (!args.handled) {
this.dispose();
throw new Error(`Unhandled packet with command '${packet.command}'`);
}
} catch (e) {
if (!this._running) {
// ignore error when not running
return;
}
this.errorEvent.fire(e as Error);
}
}
}));
this._packetSerializeStream = new AdbPacketSerializeStream();
this._packetSerializeStream.readable.pipeTo(this.backend.writable!);
this._packetSerializeStreamWriter = this._packetSerializeStream.writable.getWriter();
}
public async createSocket(serviceString: string): Promise<AdbSocket> {
if (this.appendNullToServiceString) {
serviceString += '\0';
@ -263,14 +244,12 @@ export class AdbPacketDispatcher extends AutoDisposable {
}
public override dispose() {
this._running = false;
this._runningAbortController.abort();
for (const socket of this.sockets.values()) {
socket.dispose();
}
this.sockets.clear();
this._abortController.abort();
super.dispose();
}
}