feat: move more things to stream

This commit is contained in:
Simon Chan 2022-02-15 18:42:30 +08:00
parent 68774620c3
commit 7f94da06e7
25 changed files with 478 additions and 398 deletions

View file

@ -1,4 +1,4 @@
import { AdbBackend, BufferedStream, Stream } from '@yume-chan/adb';
import { AdbBackend, ReadableStream, TransformStream, WritableStream } from '@yume-chan/adb';
import { EventEmitter } from '@yume-chan/event';
declare global {
@ -44,9 +44,24 @@ export default class AdbDirectSocketsBackend implements AdbBackend {
public name: string | undefined;
private socket: TCPSocket | undefined;
private reader: ReadableStreamDefaultReader<Uint8Array> | undefined;
private bufferedStream: BufferedStream<Stream> | undefined;
private writer: WritableStreamDefaultWriter<Uint8Array> | undefined;
private _readablePassthrough = new TransformStream<Uint8Array, ArrayBuffer>({
transform(chunk, controller) {
controller.enqueue(chunk.buffer);
},
});
public get readable(): ReadableStream<ArrayBuffer> {
return this._readablePassthrough.readable;
}
private _writablePassthrough = new TransformStream<ArrayBuffer, Uint8Array>({
transform(chunk, controller) {
controller.enqueue(new Uint8Array(chunk));
},
});
public get writable(): WritableStream<ArrayBuffer> {
return this._writablePassthrough.writable;
}
private _connected = false;
public get connected() { return this._connected; }
@ -69,29 +84,12 @@ export default class AdbDirectSocketsBackend implements AdbBackend {
});
this.socket = socket;
this.reader = this.socket.readable.getReader();
this.bufferedStream = new BufferedStream({
read: async () => {
const result = await this.reader!.read();
if (result.value) {
return result.value.buffer;
}
throw new Error('Stream ended');
}
});
this.writer = this.socket.writable.getWriter();
this.socket.readable.pipeTo(this._readablePassthrough.writable);
this._writablePassthrough.readable.pipeTo(this.socket.writable);
this._connected = true;
}
public write(buffer: ArrayBuffer): Promise<void> {
return this.writer!.write(new Uint8Array(buffer));
}
public async read(length: number): Promise<ArrayBuffer> {
return this.bufferedStream!.read(length);
}
public dispose(): void | Promise<void> {
this.socket?.close();
this._connected = false;

View file

@ -1,6 +1,6 @@
# @yume-chan/adb-backend-webusb
Backend for `@yume-chan/adb` using WebUSB API.
Backend for `@yume-chan/adb` using WebUSB ([MDN](https://developer.mozilla.org/en-US/docs/Web/API/USB), [Spec](https://wicg.github.io/webusb)) API.
- [Note](#note)
- [`pickDevice`](#pickdevice)
@ -9,7 +9,7 @@ Backend for `@yume-chan/adb` using WebUSB API.
## Note
WebUSB API requires a [secure context](https://developer.mozilla.org/en-US/docs/Web/Security/Secure_Contexts) (basically means HTTPS).
WebUSB API requires [secure context](https://developer.mozilla.org/en-US/docs/Web/Security/Secure_Contexts) (basically means HTTPS).
Chrome will treat `localhost` as secure, but if you want to access a dev server running on another machine, you need to add the domain to the allowlist:
@ -24,8 +24,6 @@ Chrome will treat `localhost` as secure, but if you want to access a dev server
static async pickDevice(): Promise<AdbWebBackend | undefined>
```
**WebUSB API** ([MDN](https://developer.mozilla.org/en-US/docs/Web/API/USB)) ([Spec](https://wicg.github.io/webusb))
Request browser to present a list of connected Android devices to let the user choose from.
Returns `undefined` if the user canceled the picker.
@ -36,12 +34,8 @@ Returns `undefined` if the user canceled the picker.
static async fromDevice(device: USBDevice): Promise<AdbWebBackend>
```
**WebUSB API** ([MDN](https://developer.mozilla.org/en-US/docs/Web/API/USB)) ([Spec](https://wicg.github.io/webusb))
Create an `AdbWebBackend` instance from an exist `USBDevice` instance.
## `read`/`write`
**WebUSB API** ([MDN](https://developer.mozilla.org/en-US/docs/Web/API/USB)) ([Spec](https://wicg.github.io/webusb))
Read/write data from/to the underlying `USBDevice` instance.

View file

@ -1,4 +1,4 @@
import { AdbBackend } from '@yume-chan/adb';
import { AdbBackend, ReadableStream } from '@yume-chan/adb';
import { EventEmitter } from '@yume-chan/event';
export const WebUsbDeviceFilter: USBDeviceFilter = {
@ -43,8 +43,11 @@ export class AdbWebUsbBackend implements AdbBackend {
private readonly disconnectEvent = new EventEmitter<void>();
public readonly onDisconnected = this.disconnectEvent.event;
private _inEndpointNumber!: number;
private _outEndpointNumber!: number;
private _readable: ReadableStream<ArrayBuffer> | undefined;
public get readable() { return this._readable; }
private _writable: WritableStream<ArrayBuffer> | undefined;
public get writable() { return this._writable; }
public constructor(device: USBDevice) {
this._device = device;
@ -84,15 +87,38 @@ export class AdbWebUsbBackend implements AdbBackend {
for (const endpoint of alternate.endpoints) {
switch (endpoint.direction) {
case 'in':
this._inEndpointNumber = endpoint.endpointNumber;
if (this._outEndpointNumber !== undefined) {
this._readable = new ReadableStream({
pull: async (controller) => {
let result = await this._device.transferIn(endpoint.endpointNumber, endpoint.packetSize);
if (result.status === 'stall') {
// https://android.googlesource.com/platform/packages/modules/adb/+/79010dc6d5ca7490c493df800d4421730f5466ca/client/usb_osx.cpp#543
await this._device.clearHalt('in', endpoint.endpointNumber);
result = await this._device.transferIn(endpoint.endpointNumber, endpoint.packetSize);
}
const { buffer } = result.data!;
controller.enqueue(buffer);
},
cancel: () => {
this.dispose();
},
});
if (this._writable !== undefined) {
this._connected = true;
return;
}
break;
case 'out':
this._outEndpointNumber = endpoint.endpointNumber;
if (this._inEndpointNumber !== undefined) {
this._writable = new WritableStream({
write: async (chunk) => {
await this._device.transferOut(endpoint.endpointNumber, chunk);
},
close: () => {
this.dispose();
},
});
if (this.readable !== undefined) {
this._connected = true;
return;
}
@ -107,21 +133,6 @@ export class AdbWebUsbBackend implements AdbBackend {
throw new Error('Unknown error');
}
public async write(buffer: ArrayBuffer): Promise<void> {
await this._device.transferOut(this._outEndpointNumber, buffer);
}
public async read(length: number): Promise<ArrayBuffer> {
const result = await this._device.transferIn(this._inEndpointNumber, length);
if (result.status === 'stall') {
await this._device.clearHalt('in', this._inEndpointNumber);
}
const { buffer } = result.data!;
return buffer;
}
public async dispose() {
this._connected = false;
window.navigator.usb.removeEventListener('disconnect', this.handleDisconnect);

View file

@ -38,12 +38,8 @@
"tslib": "^2.3.1"
},
"devDependencies": {
"@types/node": "^17.0.17",
"jest": "^26.6.3",
"typescript": "^4.5.5",
"@yume-chan/ts-package-builder": "^1.0.0"
},
"peerDependencies": {
"@types/node": "^17.0.17"
}
}

View file

@ -6,7 +6,7 @@ import { AdbChildProcess, AdbFrameBuffer, AdbPower, AdbReverseCommand, AdbSync,
import { AdbFeatures } from './features';
import { AdbCommand } from './packet';
import { AdbLogger, AdbPacketDispatcher, AdbSocket } from './socket';
import { decodeUtf8 } from "./utils";
import { decodeUtf8, ReadableStream } from "./utils";
export enum AdbPropKey {
Product = 'ro.product.name',
@ -208,7 +208,7 @@ export class Adb {
}
public async install(
apk: ArrayLike<number> | ArrayBufferLike | AsyncIterable<ArrayBuffer>,
apk: ReadableStream<ArrayBuffer>,
onProgress?: (uploaded: number) => void,
): Promise<void> {
return await install(this, apk, onProgress);
@ -229,13 +229,11 @@ export class Adb {
public async createSocketAndReadAll(service: string): Promise<string> {
const socket = await this.createSocket(service);
const resolver = new PromiseResolver<string>();
let result = '';
socket.onData(buffer => {
result += decodeUtf8(buffer);
});
socket.onClose(() => resolver.resolve(result));
return resolver.promise;
for await (const chunk of socket.readable) {
result += decodeUtf8(chunk);
}
return result;
}
public async dispose(): Promise<void> {

View file

@ -1,5 +1,6 @@
import type { Event } from '@yume-chan/event';
import type { ValueOrPromise } from '@yume-chan/struct';
import type { ReadableStream, WritableStream } from "./utils";
export interface AdbBackend {
readonly serial: string;
@ -10,11 +11,11 @@ export interface AdbBackend {
readonly onDisconnected: Event<void>;
readonly readable: ReadableStream<ArrayBuffer> | undefined;
readonly writable: WritableStream<ArrayBuffer> | undefined;
connect?(): ValueOrPromise<void>;
read(length: number): ValueOrPromise<ArrayBuffer>;
write(buffer: ArrayBuffer): ValueOrPromise<void>;
dispose(): ValueOrPromise<void>;
}

View file

@ -1,16 +1,18 @@
import { Adb } from "../adb";
import { ReadableStream } from "../utils";
import { escapeArg } from "./shell";
export async function install(
adb: Adb,
apk: ArrayLike<number> | ArrayBufferLike | AsyncIterable<ArrayBuffer>,
apk: ReadableStream<ArrayBuffer>,
onProgress?: (uploaded: number) => void,
): Promise<void> {
const filename = `/data/local/tmp/${Math.random().toString().substring(2)}.apk`;
// Upload apk file to tmp folder
const sync = await adb.sync();
await sync.write(filename, apk, undefined, undefined, onProgress);
const writable = sync.write(filename, undefined, undefined, onProgress);
await apk.pipeTo(writable);
sync.dispose();
// Invoke `pm install` to install it

View file

@ -1,6 +1,5 @@
import { once } from "@yume-chan/event";
import type { Adb } from '../../adb';
import { decodeUtf8 } from "../../utils";
import { decodeUtf8, WritableStream } from "../../utils";
import { AdbLegacyShell } from './legacy';
import { AdbShellProtocol } from './protocol';
import type { AdbShell, AdbShellConstructor } from './types';
@ -93,9 +92,17 @@ export class AdbChildProcess {
// Optimization: rope (concat strings) is faster than `[].join('')`
let stdout = '';
let stderr = '';
shell.onStdout(buffer => stdout += decodeUtf8(buffer));
shell.onStderr(buffer => stderr += decodeUtf8(buffer));
const exitCode = await once(shell.onExit);
shell.stdout.pipeTo(new WritableStream({
write(chunk) {
stdout += decodeUtf8(chunk);
}
}));
shell.stderr.pipeTo(new WritableStream({
write(chunk) {
stderr += decodeUtf8(chunk);
}
}));
const exitCode = await shell.exit;
return {
stdout,
stderr,

View file

@ -1,6 +1,6 @@
import { EventEmitter } from "@yume-chan/event";
import type { Adb } from "../../adb";
import type { AdbSocket } from "../../socket";
import { ReadableStream } from "../../utils";
import type { AdbShell } from "./types";
/**
@ -20,33 +20,24 @@ export class AdbLegacyShell implements AdbShell {
private readonly socket: AdbSocket;
private readonly stdoutEvent = new EventEmitter<ArrayBuffer>();
public get onStdout() { return this.stdoutEvent.event; }
// Legacy shell forwards all data to stdin.
public get stdin() { return this.socket.writable; }
private readonly stderrEvent = new EventEmitter<ArrayBuffer>();
public get onStderr() { return this.stderrEvent.event; }
private _stdout: ReadableStream<ArrayBuffer>;
// Legacy shell doesn't support splitting output streams.
public get stdout() { return this._stdout; }
private readonly exitEvent = new EventEmitter<number>();
public get onExit() { return this.exitEvent.event; }
// `stderr` of Legacy shell is always empty.
public readonly stderr = new ReadableStream({});
private _exit: Promise<number>;
public get exit() { return this._exit; }
public constructor(socket: AdbSocket) {
this.socket = socket;
this.socket.onData(this.handleData, this);
this.socket.onClose(this.handleExit, this);
}
private handleData(data: ArrayBuffer) {
// Legacy shell doesn't support splitting output streams.
this.stdoutEvent.fire(data);
}
private handleExit() {
// Legacy shell doesn't support returning exit code.
this.exitEvent.fire(0);
}
public async write(data: ArrayBuffer) {
this.socket.write(data);
let exit;
[this._stdout, exit] = this.socket.readable.tee();
this._exit = exit.getReader().closed.then(() => 0);
}
public resize() {

View file

@ -1,10 +1,10 @@
import { EventEmitter } from "@yume-chan/event";
import { PromiseResolver } from "@yume-chan/async";
import Struct, { placeholder } from "@yume-chan/struct";
import type { Adb } from "../../adb";
import { AdbFeatures } from "../../features";
import type { AdbSocket } from "../../socket";
import { AdbBufferedStream } from "../../stream";
import { encodeUtf8 } from "../../utils";
import { encodeUtf8, TransformStream, WritableStream, WritableStreamDefaultWriter } from "../../utils";
import type { AdbShell } from "./types";
export enum AdbShellProtocolId {
@ -26,6 +26,37 @@ function assertUnreachable(x: never): never {
throw new Error("Unreachable");
}
class WritableMultiplexer<T> {
private writer: WritableStreamDefaultWriter<T>;
public constructor(writer: WritableStreamDefaultWriter<T>) {
this.writer = writer;
}
public createWritable(): WritableStream<T> {
return new WritableStream({
write: (chunk) => {
return this.writer.write(chunk);
},
});
}
}
class StdinTransformStream extends TransformStream<ArrayBuffer, ArrayBuffer>{
constructor() {
super({
transform(chunk, controller) {
controller.enqueue(AdbShellProtocolPacket.serialize(
{
id: AdbShellProtocolId.Stdin,
data: chunk,
}
));
}
});
}
}
/**
* Shell v2 a.k.a Shell Protocol
*
@ -46,18 +77,29 @@ export class AdbShellProtocol implements AdbShell {
private readonly stream: AdbBufferedStream;
private readonly stdoutEvent = new EventEmitter<ArrayBuffer>();
public get onStdout() { return this.stdoutEvent.event; }
private _writableMultiplexer: WritableMultiplexer<ArrayBuffer>;
private _writer: WritableStreamDefaultWriter<ArrayBuffer>;
private readonly stderrEvent = new EventEmitter<ArrayBuffer>();
public get onStderr() { return this.stderrEvent.event; }
private _stdin = new StdinTransformStream();
public get stdin() { return this._stdin.writable; }
private readonly exitEvent = new EventEmitter<number>();
public get onExit() { return this.exitEvent.event; }
private _stdout = new TransformStream();
private _stdoutWriter = this._stdout.writable.getWriter();
public get stdout() { return this._stdout.readable; }
private _stderr = new TransformStream();
private _stderrWriter = this._stderr.writable.getWriter();
public get stderr() { return this._stderr.readable; }
private readonly _exit = new PromiseResolver<number>();
public get exit() { return this._exit.promise; }
public constructor(socket: AdbSocket) {
this.stream = new AdbBufferedStream(socket);
this.readData();
this._writableMultiplexer = new WritableMultiplexer(socket.writable.getWriter());
this._writer = this._writableMultiplexer.createWritable().getWriter();
this._stdin.readable.pipeTo(this._writableMultiplexer.createWritable());
}
private async readData() {
@ -66,13 +108,13 @@ export class AdbShellProtocol implements AdbShell {
const packet = await AdbShellProtocolPacket.deserialize(this.stream);
switch (packet.id) {
case AdbShellProtocolId.Stdout:
this.stdoutEvent.fire(packet.data);
this._stdoutWriter.write(packet.data);
break;
case AdbShellProtocolId.Stderr:
this.stderrEvent.fire(packet.data);
this._stderrWriter.write(packet.data);
break;
case AdbShellProtocolId.Exit:
this.exitEvent.fire(new Uint8Array(packet.data)[0]!);
this._exit.resolve(new Uint8Array(packet.data)[0]!);
break;
case AdbShellProtocolId.CloseStdin:
case AdbShellProtocolId.Stdin:
@ -88,19 +130,8 @@ export class AdbShellProtocol implements AdbShell {
}
}
public async write(data: ArrayBuffer) {
this.stream.write(
AdbShellProtocolPacket.serialize(
{
id: AdbShellProtocolId.Stdin,
data,
}
)
);
}
public async resize(rows: number, cols: number) {
await this.stream.write(
await this._writer.write(
AdbShellProtocolPacket.serialize(
{
id: AdbShellProtocolId.WindowSizeChange,

View file

@ -1,13 +1,18 @@
import type { Event } from "@yume-chan/event";
import type { ValueOrPromise } from "@yume-chan/struct";
import type { Adb } from "../../adb";
import type { AdbSocket } from "../../socket";
import type { ReadableStream, WritableStream } from "../../utils";
export interface AdbShell {
/**
* Writes raw PTY data into the `stdin` stream.
*/
readonly stdin: WritableStream<ArrayBuffer>;
/**
* Notifies that new data has been written into the `stdout` stream.
*/
readonly onStdout: Event<ArrayBuffer>;
readonly stdout: ReadableStream<ArrayBuffer>;
/**
* Notifies that new data has been written into the `stderr` stream.
@ -15,7 +20,7 @@ export interface AdbShell {
* Some `AdbShell`s may not support separate output streams
* and will always fire the `onStdout` event instead.
*/
readonly onStderr: Event<ArrayBuffer>;
readonly stderr: ReadableStream<ArrayBuffer>;
/**
* Notifies that the current process has exited.
@ -23,12 +28,7 @@ export interface AdbShell {
* The event arg is the exit code.
* Some `AdbShell`s may not support returning exit code and will always return `0` instead.
*/
readonly onExit: Event<number>;
/**
* Writes raw PTY data into the `stdin` stream.
*/
write(data: ArrayBuffer): Promise<void>;
readonly exit: Promise<number>;
/**
* Resizes the current shell.

View file

@ -1,5 +1,6 @@
import Struct from '@yume-chan/struct';
import { AdbBufferedStream } from '../../stream';
import { WritableStreamDefaultWriter } from "../../utils";
import { AdbSyncRequestId, adbSyncWriteRequest } from './request';
import { AdbSyncDoneResponse, adbSyncReadResponse, AdbSyncResponseId } from './response';
import { AdbSyncLstatResponse } from './stat';
@ -20,9 +21,10 @@ const ResponseTypes = {
export async function* adbSyncOpenDir(
stream: AdbBufferedStream,
path: string
writer: WritableStreamDefaultWriter<ArrayBuffer>,
path: string,
): AsyncGenerator<AdbSyncEntryResponse, void, void> {
await adbSyncWriteRequest(stream, AdbSyncRequestId.List, path);
await adbSyncWriteRequest(writer, AdbSyncRequestId.List, path);
while (true) {
const response = await adbSyncReadResponse(stream, ResponseTypes);

View file

@ -1,5 +1,6 @@
import Struct from '@yume-chan/struct';
import { AdbBufferedStream } from '../../stream';
import { ReadableStream, WritableStreamDefaultWriter } from "../../utils";
import { AdbSyncRequestId, adbSyncWriteRequest } from './request';
import { AdbSyncDoneResponse, adbSyncReadResponse, AdbSyncResponseId } from './response';
@ -14,21 +15,35 @@ const ResponseTypes = {
[AdbSyncResponseId.Done]: new AdbSyncDoneResponse(AdbSyncDataResponse.size),
};
export async function* adbSyncPull(
export function adbSyncPull(
stream: AdbBufferedStream,
writer: WritableStreamDefaultWriter<ArrayBuffer>,
path: string,
): AsyncGenerator<ArrayBuffer, void, void> {
await adbSyncWriteRequest(stream, AdbSyncRequestId.Receive, path);
while (true) {
highWaterMark: number = 16 * 1024,
): ReadableStream<ArrayBuffer> {
return new ReadableStream({
async start(controller) {
try {
await adbSyncWriteRequest(writer, AdbSyncRequestId.Receive, path);
} catch (e) {
controller.error(e);
}
},
async pull(controller) {
const response = await adbSyncReadResponse(stream, ResponseTypes);
switch (response.id) {
case AdbSyncResponseId.Data:
yield response.data!;
controller.enqueue(response.data!);
break;
case AdbSyncResponseId.Done:
controller.close();
return;
default:
throw new Error('Unexpected response id');
controller.error(new Error('Unexpected response id'));
}
}
}, {
highWaterMark,
size(chunk) { return chunk.byteLength; }
});
}

View file

@ -1,6 +1,6 @@
import Struct from '@yume-chan/struct';
import { AdbBufferedStream } from '../../stream';
import { chunkArrayLike, chunkAsyncIterable } from '../../utils';
import { chunkArrayLike, WritableStream, WritableStreamDefaultWriter } from '../../utils';
import { AdbSyncRequestId, adbSyncWriteRequest } from './request';
import { adbSyncReadResponse, AdbSyncResponseId } from './response';
import { LinuxFileType } from './stat';
@ -15,32 +15,31 @@ const ResponseTypes = {
export const AdbSyncMaxPacketSize = 64 * 1024;
export async function adbSyncPush(
export function adbSyncPush(
stream: AdbBufferedStream,
writer: WritableStreamDefaultWriter<ArrayBuffer>,
filename: string,
content: ArrayLike<number> | ArrayBufferLike | AsyncIterable<ArrayBuffer>,
mode: number = (LinuxFileType.File << 12) | 0o666,
mtime: number = (Date.now() / 1000) | 0,
packetSize: number = AdbSyncMaxPacketSize,
onProgress?: (uploaded: number) => void,
): Promise<void> {
const pathAndMode = `${filename},${mode.toString()}`;
await adbSyncWriteRequest(stream, AdbSyncRequestId.Send, pathAndMode);
let chunkReader: Iterable<ArrayBuffer> | AsyncIterable<ArrayBuffer>;
if ('length' in content || 'byteLength' in content) {
chunkReader = chunkArrayLike(content, packetSize);
} else {
chunkReader = chunkAsyncIterable(content, packetSize);
}
): WritableStream<ArrayBuffer> {
let uploaded = 0;
for await (const buffer of chunkReader) {
await adbSyncWriteRequest(stream, AdbSyncRequestId.Data, buffer);
return new WritableStream({
async start() {
const pathAndMode = `${filename},${mode.toString()}`;
await adbSyncWriteRequest(writer, AdbSyncRequestId.Send, pathAndMode);
},
async write(chunk) {
for (const buffer of chunkArrayLike(chunk, packetSize)) {
await adbSyncWriteRequest(writer, AdbSyncRequestId.Data, buffer);
uploaded += buffer.byteLength;
onProgress?.(uploaded);
}
await adbSyncWriteRequest(stream, AdbSyncRequestId.Done, mtime);
},
async close() {
await adbSyncWriteRequest(writer, AdbSyncRequestId.Done, mtime);
await adbSyncReadResponse(stream, ResponseTypes);
}
});
}

View file

@ -1,6 +1,5 @@
import Struct from '@yume-chan/struct';
import type { AdbBufferedStream } from '../../stream';
import { encodeUtf8 } from "../../utils";
import { encodeUtf8, WritableStreamDefaultWriter } from "../../utils";
export enum AdbSyncRequestId {
List = 'LIST',
@ -24,7 +23,7 @@ export const AdbSyncDataRequest =
.arrayBuffer('data', { lengthField: 'arg' });
export async function adbSyncWriteRequest(
stream: AdbBufferedStream,
writer: WritableStreamDefaultWriter<ArrayBuffer>,
id: AdbSyncRequestId | string,
value: number | string | ArrayBuffer
): Promise<void> {
@ -45,5 +44,5 @@ export async function adbSyncWriteRequest(
data: value,
});
}
await stream.write(buffer);
await writer.write(buffer);
}

View file

@ -1,5 +1,6 @@
import Struct, { placeholder } from '@yume-chan/struct';
import { AdbBufferedStream } from '../../stream';
import { WritableStreamDefaultWriter } from "../../utils";
import { AdbSyncRequestId, adbSyncWriteRequest } from './request';
import { adbSyncReadResponse, AdbSyncResponseId } from './response';
@ -94,6 +95,7 @@ const Lstat2ResponseType = {
export async function adbSyncLstat(
stream: AdbBufferedStream,
writer: WritableStreamDefaultWriter<ArrayBuffer>,
path: string,
v2: boolean,
): Promise<AdbSyncLstatResponse | AdbSyncStatResponse> {
@ -108,14 +110,15 @@ export async function adbSyncLstat(
responseType = LstatResponseType;
}
await adbSyncWriteRequest(stream, requestId, path);
await adbSyncWriteRequest(writer, requestId, path);
return adbSyncReadResponse(stream, responseType);
}
export async function adbSyncStat(
stream: AdbBufferedStream,
writer: WritableStreamDefaultWriter<ArrayBuffer>,
path: string,
): Promise<AdbSyncStatResponse> {
await adbSyncWriteRequest(stream, AdbSyncRequestId.Stat, path);
await adbSyncWriteRequest(writer, AdbSyncRequestId.Stat, path);
return adbSyncReadResponse(stream, StatResponseType);
}

View file

@ -3,17 +3,32 @@ import { Adb } from '../../adb';
import { AdbFeatures } from '../../features';
import { AdbSocket } from '../../socket';
import { AdbBufferedStream } from '../../stream';
import { AutoResetEvent } from '../../utils';
import { AutoResetEvent, QueuingStrategy, ReadableStream, TransformStream, WritableStream, WritableStreamDefaultWriter } from '../../utils';
import { AdbSyncEntryResponse, adbSyncOpenDir } from './list';
import { adbSyncPull } from './pull';
import { adbSyncPush } from './push';
import { adbSyncLstat, adbSyncStat } from './stat';
class LockTransformStream<T> extends TransformStream<T, T>{
constructor(lock: AutoResetEvent, writableStrategy?: QueuingStrategy<T>, readableStrategy?: QueuingStrategy<T>) {
super({
start() {
return lock.wait();
},
flush() {
lock.notify();
}
}, writableStrategy, readableStrategy);
}
}
export class AdbSync extends AutoDisposable {
protected adb: Adb;
protected stream: AdbBufferedStream;
protected writer: WritableStreamDefaultWriter<ArrayBuffer>;
protected sendLock = this.addDisposable(new AutoResetEvent());
public get supportsStat(): boolean {
@ -25,13 +40,14 @@ export class AdbSync extends AutoDisposable {
this.adb = adb;
this.stream = new AdbBufferedStream(socket);
this.writer = socket.writable.getWriter();
}
public async lstat(path: string) {
await this.sendLock.wait();
try {
return adbSyncLstat(this.stream, path, this.supportsStat);
return adbSyncLstat(this.stream, this.writer, path, this.supportsStat);
} finally {
this.sendLock.notify();
}
@ -45,7 +61,7 @@ export class AdbSync extends AutoDisposable {
await this.sendLock.wait();
try {
return adbSyncStat(this.stream, path);
return adbSyncStat(this.stream, this.writer, path);
} finally {
this.sendLock.notify();
}
@ -66,7 +82,7 @@ export class AdbSync extends AutoDisposable {
await this.sendLock.wait();
try {
yield* adbSyncOpenDir(this.stream, path);
yield* adbSyncOpenDir(this.stream, this.writer, path);
} finally {
this.sendLock.notify();
}
@ -80,34 +96,45 @@ export class AdbSync extends AutoDisposable {
return results;
}
public async *read(filename: string): AsyncGenerator<ArrayBuffer, void, void> {
await this.sendLock.wait();
try {
yield* adbSyncPull(this.stream, filename);
} finally {
this.sendLock.notify();
}
public read(filename: string, highWaterMark = 16 * 1024): ReadableStream<ArrayBuffer> {
const readable = adbSyncPull(this.stream, this.writer, filename, highWaterMark);
return readable.pipeThrough(new LockTransformStream(
this.sendLock,
{ highWaterMark, size(chunk) { return chunk.byteLength; } },
{ highWaterMark, size(chunk) { return chunk.byteLength; } }
));
}
public async write(
public write(
filename: string,
content: ArrayLike<number> | ArrayBufferLike | AsyncIterable<ArrayBuffer>,
mode?: number,
mtime?: number,
onProgress?: (uploaded: number) => void,
): Promise<void> {
await this.sendLock.wait();
highWaterMark = 16 * 1024,
): WritableStream<ArrayBuffer> {
const lockStream = new LockTransformStream<ArrayBuffer>(
this.sendLock,
{ highWaterMark, size(chunk) { return chunk.byteLength; } },
{ highWaterMark, size(chunk) { return chunk.byteLength; } }
);
try {
await adbSyncPush(this.stream, filename, content, mode, mtime, undefined, onProgress);
} finally {
this.sendLock.notify();
}
const writable = adbSyncPush(
this.stream,
this.writer,
filename,
mode,
mtime,
undefined,
onProgress
);
lockStream.readable.pipeTo(writable);
return lockStream.writable;
}
public override dispose() {
super.dispose();
this.stream.close();
this.writer.close();
}
}

View file

@ -1,6 +1,6 @@
import Struct from '@yume-chan/struct';
import { AdbBackend } from './backend';
import { BufferedStream } from './stream';
import { TransformStream, TransformStreamDefaultController, WritableStreamDefaultWriter } from "./utils";
export enum AdbCommand {
Auth = 0x48545541, // 'AUTH'
@ -29,36 +29,50 @@ export type AdbPacket = typeof AdbPacketStruct['TDeserializeResult'];
export type AdbPacketInit = Omit<typeof AdbPacketStruct['TInit'], 'checksum' | 'magic'>;
export namespace AdbPacket {
export async function read(backend: AdbBackend): Promise<AdbPacket> {
// Detect boundary
// Note that it relies on the backend to only return data from one write operation
let buffer: ArrayBuffer;
do {
// Maybe it's a payload from last connection.
// Ignore and try again
buffer = await backend.read(24);
} while (buffer.byteLength !== 24);
export class AdbPacketStream extends TransformStream<ArrayBuffer, AdbPacket> {
private _passthrough = new TransformStream<ArrayBuffer, ArrayBuffer>({});
private _passthroughWriter = this._passthrough.writable.getWriter();
private _buffered = new BufferedStream(this._passthrough.readable);
private _controller!: TransformStreamDefaultController<AdbPacket>;
private _closed = false;
let bufferUsed = false;
const stream = new BufferedStream({
async read(length: number) {
if (!bufferUsed) {
bufferUsed = true;
return buffer;
}
return backend.read(length);
}
public constructor() {
super({
start: (controller) => {
this._controller = controller;
this.receiveLoop();
},
transform: (chunk) => {
this._passthroughWriter.write(chunk);
},
flush: () => {
this._closed = true;
this._passthroughWriter.close();
},
});
return AdbPacketStruct.deserialize(stream);
}
export async function write(
private async receiveLoop() {
try {
while (true) {
const packet = await AdbPacketStruct.deserialize(this._buffered);
this._controller.enqueue(packet);
}
} catch (e) {
if (this._closed) {
return;
}
this._controller.error(e);
}
}
}
export async function writeAdbPacket(
init: AdbPacketInit,
calculateChecksum: boolean,
backend: AdbBackend
): Promise<void> {
writer: WritableStreamDefaultWriter<ArrayBuffer>
): Promise<void> {
let checksum: number;
if (calculateChecksum && init.payload) {
const array = new Uint8Array(init.payload);
@ -76,9 +90,8 @@ export namespace AdbPacket {
// Write payload separately to avoid an extra copy
const header = AdbPacketHeader.serialize(packet);
await backend.write(header);
await writer.write(header);
if (packet.payload.byteLength) {
await backend.write(packet.payload);
}
await writer.write(packet.payload);
}
}

View file

@ -1,28 +0,0 @@
import { EventEmitter, EventListenerInfo, RemoveEventListener } from '@yume-chan/event';
const NoopRemoveEventListener: RemoveEventListener = () => { };
NoopRemoveEventListener.dispose = NoopRemoveEventListener;
export class CloseEventEmitter extends EventEmitter<void, void>{
private closed = false;
protected override addEventListener(info: EventListenerInfo<void, void>) {
if (this.closed) {
info.listener.apply(info.thisArg, [undefined, ...info.args]);
return NoopRemoveEventListener;
} else {
return super.addEventListener(info);
}
}
public override fire() {
super.fire();
this.closed = true;
this.listeners.length = 0;
}
public override dispose() {
this.fire();
super.dispose();
}
}

View file

@ -1,8 +1,8 @@
import { AsyncOperationManager } from '@yume-chan/async';
import { AutoDisposable, EventEmitter } from '@yume-chan/event';
import { AdbBackend } from '../backend';
import { AdbCommand, AdbPacket, AdbPacketInit } from '../packet';
import { AutoResetEvent, decodeUtf8, encodeUtf8 } from '../utils';
import { AdbCommand, AdbPacket, AdbPacketInit, AdbPacketStream, writeAdbPacket } from '../packet';
import { AbortController, AutoResetEvent, decodeUtf8, encodeUtf8, WritableStreamDefaultWriter } from '../utils';
import { AdbSocketController } from './controller';
import { AdbLogger } from './logger';
import { AdbSocket } from './socket';
@ -34,6 +34,7 @@ export class AdbPacketDispatcher extends AutoDisposable {
private readonly logger: AdbLogger | undefined;
public readonly backend: AdbBackend;
private _backendWriter!: WritableStreamDefaultWriter<ArrayBuffer>;
public maxPayloadSize = 0;
public calculateChecksum = true;
@ -50,6 +51,9 @@ export class AdbPacketDispatcher extends AutoDisposable {
private _running = false;
public get running() { return this._running; }
private _runningAbortController!: AbortController;
private _packetStream!: AdbPacketStream;
public constructor(backend: AdbBackend, logger?: AdbLogger) {
super();
@ -60,8 +64,7 @@ export class AdbPacketDispatcher extends AutoDisposable {
private async receiveLoop() {
try {
while (this._running) {
const packet = await AdbPacket.read(this.backend);
for await (const packet of this._packetStream.readable) {
this.logger?.onIncomingPacket?.(packet);
switch (packet.command) {
@ -73,7 +76,7 @@ export class AdbPacketDispatcher extends AutoDisposable {
continue;
case AdbCommand.Write:
if (this.sockets.has(packet.arg1)) {
await this.sockets.get(packet.arg1)!.dataEvent.fire(packet.payload!);
await this.sockets.get(packet.arg1)!.enqueue(packet.payload!);
await this.sendPacket(AdbCommand.OK, packet.arg1, packet.arg0);
}
@ -97,7 +100,7 @@ export class AdbPacketDispatcher extends AutoDisposable {
}
} catch (e) {
if (!this._running) {
// ignore error
// ignore error when not running
return;
}
@ -188,7 +191,18 @@ export class AdbPacketDispatcher extends AutoDisposable {
}
public start() {
this._backendWriter = this.backend.writable!.getWriter();
this._running = true;
this._runningAbortController = new AbortController();
this._packetStream = new AdbPacketStream();
this.backend.readable!.pipeTo(this._packetStream.writable, {
preventAbort: true,
preventCancel: true,
signal: this._runningAbortController.signal,
});
this.receiveLoop();
}
@ -250,7 +264,7 @@ export class AdbPacketDispatcher extends AutoDisposable {
this.logger?.onOutgoingPacket?.(init);
await AdbPacket.write(init, this.calculateChecksum, this.backend);
await writeAdbPacket(init, this.calculateChecksum, this._backendWriter);
} finally {
this.sendLock.notify();
}
@ -258,6 +272,7 @@ export class AdbPacketDispatcher extends AutoDisposable {
public override dispose() {
this._running = false;
this._runningAbortController.abort();
for (const socket of this.sockets.values()) {
socket.dispose();

View file

@ -1,4 +1,3 @@
export * from './close-event-emitter';
export * from './controller';
export * from './data-event-emitter';
export * from './dispatcher';

View file

@ -1,36 +1,17 @@
import { StructAsyncDeserializeStream, ValueOrPromise } from '@yume-chan/struct';
import { StructAsyncDeserializeStream } from '@yume-chan/struct';
import { AdbSocket, AdbSocketInfo } from '../socket';
import { AdbSocketStream } from './stream';
import { ReadableStream, ReadableStreamDefaultReader } from '../utils';
export class StreamEndedError extends Error {
public constructor() {
super('Stream ended');
// Fix Error's prototype chain when compiling to ES5
Object.setPrototypeOf(this, new.target.prototype);
}
}
export interface Stream {
/**
* When the stream is ended (no more data can be read),
* An `StreamEndedError` should be thrown.
*
* @param length A hint of how much data should be read.
* @returns Data, which can be either more or less than `length`.
*/
read(length: number): ValueOrPromise<ArrayBuffer>;
close?(): void;
}
export class BufferedStream<T extends Stream> {
export class BufferedStream {
private buffer: Uint8Array | undefined;
protected readonly stream: T;
protected readonly stream: ReadableStream<ArrayBuffer>;
public constructor(stream: T) {
protected readonly reader: ReadableStreamDefaultReader<ArrayBuffer>;
public constructor(stream: ReadableStream<ArrayBuffer>) {
this.stream = stream;
this.reader = stream.getReader();
}
/**
@ -54,66 +35,76 @@ export class BufferedStream<T extends Stream> {
index = buffer.byteLength;
this.buffer = undefined;
} else {
const buffer = await this.stream.read(length);
if (buffer.byteLength === length) {
return buffer;
const result = await this.reader.read();
if (result.done) {
if (readToEnd) {
return new ArrayBuffer(0);
} else {
throw new Error('Unexpected end of stream');
}
}
if (buffer.byteLength > length) {
this.buffer = new Uint8Array(buffer, length);
return buffer.slice(0, length);
const { value } = result;
if (value.byteLength === length) {
return value;
}
if (value.byteLength > length) {
this.buffer = new Uint8Array(value, length);
return value.slice(0, length);
}
array = new Uint8Array(length);
array.set(new Uint8Array(buffer), 0);
index = buffer.byteLength;
array.set(new Uint8Array(value), 0);
index = value.byteLength;
}
try {
while (index < length) {
const left = length - index;
const buffer = await this.stream.read(left);
if (buffer.byteLength > left) {
array.set(new Uint8Array(buffer, 0, left), index);
this.buffer = new Uint8Array(buffer, left);
const result = await this.reader.read();
if (result.done) {
if (readToEnd) {
return new ArrayBuffer(0);
} else {
throw new Error('Unexpected end of stream');
}
}
const { value } = result;
if (value.byteLength > left) {
array.set(new Uint8Array(value, 0, left), index);
this.buffer = new Uint8Array(value, left);
return array.buffer;
}
array.set(new Uint8Array(buffer), index);
index += buffer.byteLength;
}
}
catch (e) {
if (readToEnd && e instanceof StreamEndedError) {
return array.buffer;
}
throw e;
array.set(new Uint8Array(value), index);
index += value.byteLength;
}
return array.buffer;
}
public close() {
this.stream.close?.();
this.reader.cancel();
}
}
export class AdbBufferedStream
extends BufferedStream<AdbSocketStream>
extends BufferedStream
implements AdbSocketInfo, StructAsyncDeserializeStream {
public get backend() { return this.stream.backend; }
public get localId() { return this.stream.localId; }
public get remoteId() { return this.stream.remoteId; }
public get localCreated() { return this.stream.localCreated; }
public get serviceString() { return this.stream.serviceString; }
protected readonly socket: AdbSocket;
public get backend() { return this.socket.backend; }
public get localId() { return this.socket.localId; }
public get remoteId() { return this.socket.remoteId; }
public get localCreated() { return this.socket.localCreated; }
public get serviceString() { return this.socket.serviceString; }
public get writable() { return this.socket.writable; }
public constructor(socket: AdbSocket) {
super(new AdbSocketStream(socket));
}
public write(data: ArrayBuffer): Promise<void> {
return this.stream.write(data);
super(socket.readable);
this.socket = socket;
}
}

View file

@ -1,2 +1 @@
export * from './buffered-stream';
export * from './stream';

View file

@ -1,55 +0,0 @@
import { once } from '@yume-chan/event';
import { ValueOrPromise } from '@yume-chan/struct';
import { AdbSocket, AdbSocketInfo } from '../socket';
import { EventQueue, EventQueueEndedError } from '../utils';
import { StreamEndedError } from "./buffered-stream";
export class AdbSocketStream implements AdbSocketInfo {
private socket: AdbSocket;
private queue: EventQueue<ArrayBuffer>;
public get backend() { return this.socket.backend; }
public get localId() { return this.socket.localId; }
public get remoteId() { return this.socket.remoteId; }
public get localCreated() { return this.socket.localCreated; }
public get serviceString() { return this.socket.serviceString; }
public constructor(socket: AdbSocket) {
this.socket = socket;
this.queue = new EventQueue<ArrayBuffer>({
highWaterMark: 16 * 1024,
});
this.socket.onData((buffer): ValueOrPromise<void> => {
if (!this.queue.enqueue(buffer, buffer.byteLength)) {
return once(this.queue.onDrain);
}
});
this.socket.onClose(() => {
this.queue.end();
});
}
public async read(): Promise<ArrayBuffer> {
try {
return await this.queue.dequeue();
} catch (e) {
if (e instanceof EventQueueEndedError) {
throw new StreamEndedError();
}
throw e;
}
}
public write(data: ArrayBuffer): Promise<void> {
return this.socket.write(data);
}
close(): void {
this.socket.close();
}
}

View file

@ -1,3 +1,44 @@
// cspell: ignore chainable
// cspell: ignore backpressure
// cspell: ignore endregion
//#region borrowed
// from https://github.com/microsoft/TypeScript/blob/38da7c600c83e7b31193a62495239a0fe478cb67/lib/lib.webworker.d.ts#L633 until moved to separate lib
/** A controller object that allows you to abort one or more DOM requests as and when desired. */
export interface AbortController {
/**
* Returns the AbortSignal object associated with this object.
*/
readonly signal: AbortSignal;
/**
* Invoking this method will set this object's AbortSignal's aborted flag and signal to any observers that the associated activity is to be aborted.
*/
abort(): void;
}
/** A signal object that allows you to communicate with a DOM request (such as a Fetch) and abort it if required via an AbortController object. */
export interface AbortSignal {
/**
* Returns true if this AbortSignal's AbortController has signaled to abort, and false otherwise.
*/
readonly aborted: boolean;
}
export let AbortController: {
prototype: AbortController;
new(): AbortController;
};
export let AbortSignal: {
prototype: AbortSignal;
new(): AbortSignal;
// TODO: Add abort() static
};
({ AbortController, AbortSignal } = globalThis as any);
//#endregion borrowed
// https://github.com/microsoft/TypeScript-DOM-lib-generator/blob/11d922f302743cb3fcee9ab59b03d40074a2965c/baselines/dom.generated.d.ts#L1194-L1206
export interface QueuingStrategy<T = any> {
highWaterMark?: number;
@ -241,8 +282,33 @@ export type ReadableStreamDefaultReadResult<T> = ReadableStreamDefaultReadValueR
export type ReadableStreamReader<T> = ReadableStreamDefaultReader<T>;
// Extra
export interface ReadableStreamIteratorOptions {
preventCancel?: boolean;
}
export interface ReadableStream<R> {
[Symbol.asyncIterator](): AsyncIterableIterator<R>;
values(options?: ReadableStreamIteratorOptions): AsyncIterableIterator<R>;
}
async function* values(this: ReadableStream, options?: ReadableStreamIteratorOptions) {
let reader = this.getReader();
try {
while (true) {
const result = await reader.read();
if (result.done) {
reader.releaseLock();
return;
}
yield result.value;
}
} finally {
if (!options?.preventCancel) {
reader.cancel();
}
}
}
try {
@ -262,15 +328,21 @@ try {
ReadableStream,
ReadableStreamDefaultController,
ReadableStreamDefaultReader,
// @ts-expect-error `@types/node` is slightly different from `@types/web`
TransformStream,
TransformStreamDefaultController,
// @ts-expect-error `@types/node` is slightly different from `@types/web`
WritableStream,
WritableStreamDefaultController,
WritableStreamDefaultWriter,
// @ts-expect-error
} = await import('stream/web'));
}
if (!(Symbol.asyncIterator in ReadableStream.prototype)) {
ReadableStream.prototype[Symbol.asyncIterator] = values;
}
if (!('values' in ReadableStream.prototype)) {
ReadableStream.prototype.values = values;
}
} catch {
throw new Error('Web Streams API is not available');
}