feat(adb): buffer writes in sync push

This commit is contained in:
Simon Chan 2023-03-05 21:51:59 +08:00
parent 228d2ab7d3
commit 1fb86f5eed
11 changed files with 308 additions and 229 deletions

View file

@ -55,6 +55,7 @@ export class Adb implements Closeable {
): Promise<Adb> {
// Initially, set to highest-supported version and payload size.
let version = 0x01000001;
// Android 4: 4K, Android 7: 256K, Android 9: 1M
let maxPayloadSize = 0x100000;
const resolver = new PromiseResolver<string>();
@ -167,11 +168,16 @@ export class Adb implements Closeable {
return this.dispatcher.disconnected;
}
private _protocolVersion: number | undefined;
private _protocolVersion: number;
public get protocolVersion() {
return this._protocolVersion;
}
private _maxPayloadSize: number;
public get maxPayloadSize() {
return this._maxPayloadSize;
}
private _product: string | undefined;
public get product() {
return this._product;
@ -222,6 +228,7 @@ export class Adb implements Closeable {
});
this._protocolVersion = version;
this._maxPayloadSize = maxPayloadSize;
this.subprocess = new AdbSubprocess(this);
this.power = new AdbPower(this);

View file

@ -3,5 +3,6 @@ export * from "./pull.js";
export * from "./push.js";
export * from "./request.js";
export * from "./response.js";
export * from "./socket.js";
export * from "./stat.js";
export * from "./sync.js";

View file

@ -1,11 +1,8 @@
import type {
BufferedReadableStream,
WritableStreamDefaultWriter,
} from "@yume-chan/stream-extra";
import Struct from "@yume-chan/struct";
import { AdbSyncRequestId, adbSyncWriteRequest } from "./request.js";
import { AdbSyncResponseId, adbSyncReadResponses } from "./response.js";
import type { AdbSyncSocket } from "./socket.js";
import type { AdbSyncStat } from "./stat.js";
import { AdbSyncLstatResponse, AdbSyncStatResponse } from "./stat.js";
@ -31,16 +28,15 @@ export const AdbSyncEntry2Response = new Struct({ littleEndian: true })
export type AdbSyncEntry2Response =
(typeof AdbSyncEntry2Response)["TDeserializeResult"];
export async function* adbSyncOpenDir(
stream: BufferedReadableStream,
writer: WritableStreamDefaultWriter<Uint8Array>,
path: string,
v2: boolean
): AsyncGenerator<AdbSyncEntry, void, void> {
if (v2) {
await adbSyncWriteRequest(writer, AdbSyncRequestId.List2, path);
export async function* adbSyncOpenDirV2(
socket: AdbSyncSocket,
path: string
): AsyncGenerator<AdbSyncEntry2Response, void, void> {
const locked = await socket.lock();
try {
await adbSyncWriteRequest(locked, AdbSyncRequestId.List2, path);
for await (const item of adbSyncReadResponses(
stream,
locked,
AdbSyncResponseId.Entry2,
AdbSyncEntry2Response
)) {
@ -52,13 +48,39 @@ export async function* adbSyncOpenDir(
}
yield item;
}
} else {
await adbSyncWriteRequest(writer, AdbSyncRequestId.List, path);
} finally {
locked.release();
}
}
export async function* adbSyncOpenDirV1(
socket: AdbSyncSocket,
path: string
): AsyncGenerator<AdbSyncEntryResponse, void, void> {
const locked = await socket.lock();
try {
await adbSyncWriteRequest(locked, AdbSyncRequestId.List, path);
for await (const item of adbSyncReadResponses(
stream,
locked,
AdbSyncResponseId.Entry,
AdbSyncEntryResponse
)) {
yield item;
}
} finally {
locked.release();
}
}
export async function* adbSyncOpenDir(
socket: AdbSyncSocket,
path: string,
v2: boolean
): AsyncGenerator<AdbSyncEntry, void, void> {
if (v2) {
yield* adbSyncOpenDirV2(socket, path);
} else {
for await (const item of adbSyncOpenDirV1(socket, path)) {
// Convert to same format as `AdbSyncEntry2Response` for easier consumption.
// However it will add some overhead.
yield {

View file

@ -1,12 +1,10 @@
import type {
BufferedReadableStream,
WritableStreamDefaultWriter,
} from "@yume-chan/stream-extra";
import { ReadableStream } from "@yume-chan/stream-extra";
import type { ReadableStream } from "@yume-chan/stream-extra";
import { PushReadableStream } from "@yume-chan/stream-extra";
import Struct from "@yume-chan/struct";
import { AdbSyncRequestId, adbSyncWriteRequest } from "./request.js";
import { AdbSyncResponseId, adbSyncReadResponses } from "./response.js";
import type { AdbSyncSocket } from "./socket.js";
export const AdbSyncDataResponse = new Struct({ littleEndian: true })
.uint32("dataLength")
@ -16,47 +14,44 @@ export const AdbSyncDataResponse = new Struct({ littleEndian: true })
export type AdbSyncDataResponse =
(typeof AdbSyncDataResponse)["TDeserializeResult"];
export function adbSyncPull(
stream: BufferedReadableStream,
writer: WritableStreamDefaultWriter<Uint8Array>,
export async function* adbSyncPullGenerator(
socket: AdbSyncSocket,
path: string
): ReadableStream<Uint8Array> {
let generator!: AsyncGenerator<AdbSyncDataResponse, void, void>;
return new ReadableStream<Uint8Array>(
{
async start() {
// TODO: If `ReadableStream.from(AsyncGenerator)` is added to spec, use it instead.
await adbSyncWriteRequest(
writer,
AdbSyncRequestId.Receive,
path
);
generator = adbSyncReadResponses(
stream,
): AsyncGenerator<Uint8Array, void, void> {
const locked = await socket.lock();
let done = false;
try {
await adbSyncWriteRequest(locked, AdbSyncRequestId.Receive, path);
for await (const packet of adbSyncReadResponses(
locked,
AdbSyncResponseId.Data,
AdbSyncDataResponse
);
},
async pull(controller) {
const { done, value } = await generator.next();
if (done) {
controller.close();
return;
)) {
yield packet.data;
}
done = true;
} finally {
if (!done) {
// sync pull can't be cancelled, so we have to read all data
for await (const packet of adbSyncReadResponses(
locked,
AdbSyncResponseId.Data,
AdbSyncDataResponse
)) {
void packet;
}
}
locked.release();
}
}
export function adbSyncPull(
socket: AdbSyncSocket,
path: string
): ReadableStream<Uint8Array> {
return new PushReadableStream(async (controller) => {
for await (const data of adbSyncPullGenerator(socket, path)) {
await controller.enqueue(data);
}
controller.enqueue(value.data);
},
cancel() {
generator.return().catch((e) => {
void e;
});
throw new Error(`Sync commands can't be canceled.`);
},
},
{
highWaterMark: 16 * 1024,
size(chunk) {
return chunk.byteLength;
},
}
);
}

View file

@ -1,13 +1,10 @@
import type {
BufferedReadableStream,
ReadableStream,
WritableStreamDefaultWriter,
} from "@yume-chan/stream-extra";
import type { ReadableStream } from "@yume-chan/stream-extra";
import { ChunkStream, WritableStream } from "@yume-chan/stream-extra";
import Struct from "@yume-chan/struct";
import { AdbSyncRequestId, adbSyncWriteRequest } from "./request.js";
import { AdbSyncResponseId, adbSyncReadResponse } from "./response.js";
import type { AdbSyncSocket } from "./socket.js";
import { LinuxFileType } from "./stat.js";
export const AdbSyncOkResponse = new Struct({ littleEndian: true }).uint32(
@ -17,25 +14,37 @@ export const AdbSyncOkResponse = new Struct({ littleEndian: true }).uint32(
export const ADB_SYNC_MAX_PACKET_SIZE = 64 * 1024;
export async function adbSyncPush(
stream: BufferedReadableStream,
writer: WritableStreamDefaultWriter<Uint8Array>,
socket: AdbSyncSocket,
filename: string,
file: ReadableStream<Uint8Array>,
mode: number = (LinuxFileType.File << 12) | 0o666,
mtime: number = (Date.now() / 1000) | 0,
packetSize: number = ADB_SYNC_MAX_PACKET_SIZE
) {
const locked = await socket.lock();
try {
const pathAndMode = `${filename},${mode.toString()}`;
await adbSyncWriteRequest(writer, AdbSyncRequestId.Send, pathAndMode);
await adbSyncWriteRequest(locked, AdbSyncRequestId.Send, pathAndMode);
await file.pipeThrough(new ChunkStream(packetSize)).pipeTo(
new WritableStream({
write: async (chunk) => {
await adbSyncWriteRequest(writer, AdbSyncRequestId.Data, chunk);
await adbSyncWriteRequest(
locked,
AdbSyncRequestId.Data,
chunk
);
},
})
);
await adbSyncWriteRequest(writer, AdbSyncRequestId.Done, mtime);
await adbSyncReadResponse(stream, AdbSyncResponseId.Ok, AdbSyncOkResponse);
await adbSyncWriteRequest(locked, AdbSyncRequestId.Done, mtime);
await adbSyncReadResponse(
locked,
AdbSyncResponseId.Ok,
AdbSyncOkResponse
);
} finally {
locked.release();
}
}

View file

@ -1,4 +1,3 @@
import type { WritableStreamDefaultWriter } from "@yume-chan/stream-extra";
import Struct from "@yume-chan/struct";
import { encodeUtf8 } from "../../utils/index.js";
@ -23,27 +22,32 @@ export const AdbSyncDataRequest = new Struct({ littleEndian: true })
.fields(AdbSyncNumberRequest)
.uint8Array("data", { lengthField: "arg" });
export interface AdbSyncWritable {
write(buffer: Uint8Array): Promise<void>;
}
export async function adbSyncWriteRequest(
writer: WritableStreamDefaultWriter<Uint8Array>,
writable: AdbSyncWritable,
id: AdbSyncRequestId | string,
value: number | string | Uint8Array
): Promise<void> {
let buffer: Uint8Array;
if (typeof value === "number") {
buffer = AdbSyncNumberRequest.serialize({
const buffer = AdbSyncNumberRequest.serialize({
id,
arg: value,
});
await writable.write(buffer);
} else if (typeof value === "string") {
buffer = AdbSyncDataRequest.serialize({
id,
data: encodeUtf8(value),
});
// Let `writable` buffer writes
const buffer = encodeUtf8(value);
await writable.write(
AdbSyncNumberRequest.serialize({ id, arg: buffer.byteLength })
);
await writable.write(buffer);
} else {
buffer = AdbSyncDataRequest.serialize({
id,
data: value,
});
await writable.write(
AdbSyncNumberRequest.serialize({ id, arg: value.byteLength })
);
await writable.write(value);
}
await writer.write(buffer);
}

View file

@ -1,5 +1,8 @@
import type { BufferedReadableStream } from "@yume-chan/stream-extra";
import type { StructLike, StructValueType } from "@yume-chan/struct";
import type {
StructAsyncDeserializeStream,
StructLike,
StructValueType,
} from "@yume-chan/struct";
import Struct from "@yume-chan/struct";
import { decodeUtf8 } from "../../utils/index.js";
@ -24,7 +27,7 @@ export const AdbSyncFailResponse = new Struct({ littleEndian: true })
});
export async function adbSyncReadResponse<T>(
stream: BufferedReadableStream,
stream: StructAsyncDeserializeStream,
id: AdbSyncResponseId,
type: StructLike<T>
): Promise<T> {
@ -43,7 +46,7 @@ export async function adbSyncReadResponse<T>(
export async function* adbSyncReadResponses<
T extends Struct<object, PropertyKey, object, any>
>(
stream: BufferedReadableStream,
stream: StructAsyncDeserializeStream,
id: AdbSyncResponseId,
type: T
): AsyncGenerator<StructValueType<T>, void, void> {

View file

@ -0,0 +1,95 @@
import type { WritableStreamDefaultWriter } from "@yume-chan/stream-extra";
import { BufferedReadableStream } from "@yume-chan/stream-extra";
import type { StructAsyncDeserializeStream } from "@yume-chan/struct";
import type { AdbSocket } from "../../index.js";
import { AutoResetEvent } from "../../index.js";
export class AdbSyncSocketLocked implements StructAsyncDeserializeStream {
private _writer: WritableStreamDefaultWriter<Uint8Array>;
private _readable: BufferedReadableStream;
private _bufferSize: number;
private _buffered: Uint8Array[] = [];
private _bufferedLength = 0;
private _lock: AutoResetEvent;
public constructor(
writer: WritableStreamDefaultWriter<Uint8Array>,
readable: BufferedReadableStream,
bufferSize: number,
lock: AutoResetEvent
) {
this._writer = writer;
this._readable = readable;
this._bufferSize = bufferSize;
this._lock = lock;
}
public async flush() {
if (this._bufferedLength === 0) {
return;
}
if (this._buffered.length === 1) {
await this._writer.write(this._buffered[0]!);
this._buffered.length = 0;
this._bufferedLength = 0;
return;
}
const data = new Uint8Array(this._bufferedLength);
let offset = 0;
for (const chunk of this._buffered) {
data.set(chunk, offset);
offset += chunk.byteLength;
}
this._buffered.length = 0;
this._bufferedLength = 0;
// Let AdbSocket chunk the data for us
await this._writer.write(data);
}
public async write(data: Uint8Array) {
this._buffered.push(data);
this._bufferedLength += data.byteLength;
if (this._bufferedLength >= this._bufferSize) {
await this.flush();
}
}
public async read(length: number) {
await this.flush();
return await this._readable.read(length);
}
public release(): void {
this._buffered.length = 0;
this._bufferedLength = 0;
this._lock.notifyOne();
}
}
export class AdbSyncSocket {
private _lock = new AutoResetEvent();
private _socket: AdbSocket;
private _locked: AdbSyncSocketLocked;
public constructor(socket: AdbSocket, bufferSize: number) {
this._socket = socket;
this._locked = new AdbSyncSocketLocked(
socket.writable.getWriter(),
new BufferedReadableStream(socket.readable),
bufferSize,
this._lock
);
}
public async lock() {
await this._lock.wait();
return this._locked;
}
public async close() {
await this._socket.close();
}
}

View file

@ -1,11 +1,8 @@
import type {
BufferedReadableStream,
WritableStreamDefaultWriter,
} from "@yume-chan/stream-extra";
import Struct, { placeholder } from "@yume-chan/struct";
import { AdbSyncRequestId, adbSyncWriteRequest } from "./request.js";
import { AdbSyncResponseId, adbSyncReadResponse } from "./response.js";
import type { AdbSyncSocket } from "./socket.js";
// https://github.com/python/cpython/blob/4e581d64b8aff3e2eda99b12f080c877bb78dfca/Lib/stat.py#L36
export enum LinuxFileType {
@ -104,22 +101,23 @@ export type AdbSyncStatResponse =
(typeof AdbSyncStatResponse)["TDeserializeResult"];
export async function adbSyncLstat(
stream: BufferedReadableStream,
writer: WritableStreamDefaultWriter<Uint8Array>,
socket: AdbSyncSocket,
path: string,
v2: boolean
): Promise<AdbSyncStat> {
const locked = await socket.lock();
try {
if (v2) {
await adbSyncWriteRequest(writer, AdbSyncRequestId.Lstat2, path);
await adbSyncWriteRequest(locked, AdbSyncRequestId.Lstat2, path);
return await adbSyncReadResponse(
stream,
locked,
AdbSyncResponseId.Lstat2,
AdbSyncStatResponse
);
} else {
await adbSyncWriteRequest(writer, AdbSyncRequestId.Lstat, path);
await adbSyncWriteRequest(locked, AdbSyncRequestId.Lstat, path);
const response = await adbSyncReadResponse(
stream,
locked,
AdbSyncResponseId.Lstat,
AdbSyncLstatResponse
);
@ -136,17 +134,24 @@ export async function adbSyncLstat(
},
};
}
} finally {
locked.release();
}
}
export async function adbSyncStat(
stream: BufferedReadableStream,
writer: WritableStreamDefaultWriter<Uint8Array>,
socket: AdbSyncSocket,
path: string
): Promise<AdbSyncStatResponse> {
await adbSyncWriteRequest(writer, AdbSyncRequestId.Stat, path);
const locked = await socket.lock();
try {
await adbSyncWriteRequest(locked, AdbSyncRequestId.Stat, path);
return await adbSyncReadResponse(
stream,
locked,
AdbSyncResponseId.Stat,
AdbSyncStatResponse
);
} finally {
locked.release();
}
}

View file

@ -1,23 +1,16 @@
import { AutoDisposable } from "@yume-chan/event";
import type {
ReadableStream,
WritableStreamDefaultWriter,
} from "@yume-chan/stream-extra";
import {
BufferedReadableStream,
WrapReadableStream,
} from "@yume-chan/stream-extra";
import type { ReadableStream } from "@yume-chan/stream-extra";
import type { Adb } from "../../adb.js";
import { AdbFeature } from "../../features.js";
import type { AdbSocket } from "../../socket/index.js";
import { AutoResetEvent } from "../../utils/index.js";
import { escapeArg } from "../subprocess/index.js";
import type { AdbSyncEntry } from "./list.js";
import { adbSyncOpenDir } from "./list.js";
import { adbSyncPull } from "./pull.js";
import { adbSyncPush } from "./push.js";
import { AdbSyncSocket } from "./socket.js";
import { adbSyncLstat, adbSyncStat } from "./stat.js";
/**
@ -37,55 +30,38 @@ export function dirname(path: string): string {
}
export class AdbSync extends AutoDisposable {
protected adb: Adb;
protected stream: BufferedReadableStream;
// Getting another writer on a locked WritableStream will throw.
// We don't want this behavior on higher-level APIs.
// So we acquire the writer early and use a blocking lock to guard it.
protected writer: WritableStreamDefaultWriter<Uint8Array>;
protected sendLock = this.addDisposable(new AutoResetEvent());
protected _adb: Adb;
protected _socket: AdbSyncSocket;
public get supportsStat(): boolean {
return this.adb.supportsFeature(AdbFeature.StatV2);
return this._adb.supportsFeature(AdbFeature.StatV2);
}
public get supportsList2(): boolean {
return this.adb.supportsFeature(AdbFeature.ListV2);
return this._adb.supportsFeature(AdbFeature.ListV2);
}
public get fixedPushMkdir(): boolean {
return this.adb.supportsFeature(AdbFeature.FixedPushMkdir);
return this._adb.supportsFeature(AdbFeature.FixedPushMkdir);
}
public get needPushMkdirWorkaround(): boolean {
// https://android.googlesource.com/platform/packages/modules/adb/+/91768a57b7138166e0a3d11f79cd55909dda7014/client/file_sync_client.cpp#1361
return (
this.adb.supportsFeature(AdbFeature.ShellV2) && !this.fixedPushMkdir
this._adb.supportsFeature(AdbFeature.ShellV2) &&
!this.fixedPushMkdir
);
}
public constructor(adb: Adb, socket: AdbSocket) {
super();
this.adb = adb;
this.stream = new BufferedReadableStream(socket.readable);
this.writer = socket.writable.getWriter();
this._adb = adb;
this._socket = new AdbSyncSocket(socket, adb.maxPayloadSize);
}
public async lstat(path: string) {
await this.sendLock.wait();
try {
return adbSyncLstat(
this.stream,
this.writer,
path,
this.supportsStat
);
} finally {
this.sendLock.notifyOne();
}
return await adbSyncLstat(this._socket, path, this.supportsStat);
}
public async stat(path: string) {
@ -93,13 +69,7 @@ export class AdbSync extends AutoDisposable {
throw new Error("Not supported");
}
await this.sendLock.wait();
try {
return adbSyncStat(this.stream, this.writer, path);
} finally {
this.sendLock.notifyOne();
}
return await adbSyncStat(this._socket, path);
}
public async isDirectory(path: string): Promise<boolean> {
@ -111,21 +81,8 @@ export class AdbSync extends AutoDisposable {
}
}
public async *opendir(
path: string
): AsyncGenerator<AdbSyncEntry, void, void> {
await this.sendLock.wait();
try {
yield* adbSyncOpenDir(
this.stream,
this.writer,
path,
this.supportsList2
);
} finally {
this.sendLock.notifyOne();
}
public opendir(path: string): AsyncGenerator<AdbSyncEntry, void, void> {
return adbSyncOpenDir(this._socket, path, this.supportsList2);
}
public async readdir(path: string) {
@ -143,15 +100,7 @@ export class AdbSync extends AutoDisposable {
* @returns A `ReadableStream` that reads from the file.
*/
public read(filename: string): ReadableStream<Uint8Array> {
return new WrapReadableStream({
start: async () => {
await this.sendLock.wait();
return adbSyncPull(this.stream, this.writer, filename);
},
close: () => {
this.sendLock.notifyOne();
},
});
return adbSyncPull(this._socket, filename);
}
/**
@ -169,35 +118,22 @@ export class AdbSync extends AutoDisposable {
mode?: number,
mtime?: number
) {
await this.sendLock.wait();
try {
if (this.needPushMkdirWorkaround) {
// It may fail if the path is already existed.
// Ignore the result.
// TODO: sync: test push mkdir workaround (need an Android 8 device)
await this.adb.subprocess.spawnAndWait([
await this._adb.subprocess.spawnAndWait([
"mkdir",
"-p",
escapeArg(dirname(filename)),
]);
}
await adbSyncPush(
this.stream,
this.writer,
filename,
file,
mode,
mtime
);
} finally {
this.sendLock.notifyOne();
}
await adbSyncPush(this._socket, filename, file, mode, mtime);
}
public override async dispose() {
super.dispose();
await this.writer.close();
await this._socket.close();
}
}

View file

@ -81,7 +81,9 @@ export class AdbPacketSerializeStream extends TransformStream<
);
if (init.payload.byteLength) {
// Enqueue payload separately to avoid copying
// USB protocol preserves packet boundaries,
// so we must write payload separately as native ADB does,
// otherwise the read operation on device will fail.
controller.enqueue(init.payload);
}
},