feat(adb): change how to close a socket

This commit is contained in:
Simon Chan 2023-10-16 13:23:59 +08:00
parent 1aa7a92d2c
commit e45fb2ed55
No known key found for this signature in database
GPG key ID: A8B69F750B9BCEDD
6 changed files with 157 additions and 132 deletions

View file

@ -71,6 +71,10 @@ export class AdbSyncSocketLocked implements AsyncExactReadable {
this.#combiner.flush(); this.#combiner.flush();
this.#socketLock.notifyOne(); this.#socketLock.notifyOne();
} }
async close() {
await this.#readable.cancel();
}
} }
export class AdbSyncSocket { export class AdbSyncSocket {
@ -94,6 +98,7 @@ export class AdbSyncSocket {
} }
async close() { async close() {
await this.#locked.close();
await this.#socket.close(); await this.#socket.close();
} }
} }

View file

@ -1,4 +1,8 @@
import { AsyncOperationManager, PromiseResolver } from "@yume-chan/async"; import {
AsyncOperationManager,
PromiseResolver,
delay,
} from "@yume-chan/async";
import type { import type {
Consumable, Consumable,
ReadableWritablePair, ReadableWritablePair,
@ -12,7 +16,7 @@ import {
import { EMPTY_UINT8_ARRAY } from "@yume-chan/struct"; import { EMPTY_UINT8_ARRAY } from "@yume-chan/struct";
import type { AdbIncomingSocketHandler, AdbSocket, Closeable } from "../adb.js"; import type { AdbIncomingSocketHandler, AdbSocket, Closeable } from "../adb.js";
import { decodeUtf8, encodeUtf8, unreachable } from "../utils/index.js"; import { decodeUtf8, encodeUtf8 } from "../utils/index.js";
import type { AdbPacketData, AdbPacketInit } from "./packet.js"; import type { AdbPacketData, AdbPacketInit } from "./packet.js";
import { AdbCommand, calculateChecksum } from "./packet.js"; import { AdbCommand, calculateChecksum } from "./packet.js";
@ -80,18 +84,18 @@ export class AdbPacketDispatcher implements Closeable {
new WritableStream({ new WritableStream({
write: async (packet) => { write: async (packet) => {
switch (packet.command) { switch (packet.command) {
case AdbCommand.OK:
this.#handleOk(packet);
break;
case AdbCommand.Close: case AdbCommand.Close:
await this.#handleClose(packet); await this.#handleClose(packet);
break; break;
case AdbCommand.Write: case AdbCommand.Okay:
await this.#handleWrite(packet); this.#handleOkay(packet);
break; break;
case AdbCommand.Open: case AdbCommand.Open:
await this.#handleOpen(packet); await this.#handleOpen(packet);
break; break;
case AdbCommand.Write:
await this.#handleWrite(packet);
break;
default: default:
// Junk data may only appear in the authentication phase, // Junk data may only appear in the authentication phase,
// since the dispatcher only works after authentication, // since the dispatcher only works after authentication,
@ -125,24 +129,6 @@ export class AdbPacketDispatcher implements Closeable {
this.#writer = connection.writable.getWriter(); this.#writer = connection.writable.getWriter();
} }
#handleOk(packet: AdbPacketData) {
if (this.#initializers.resolve(packet.arg1, packet.arg0)) {
// Device successfully created the socket
return;
}
const socket = this.#sockets.get(packet.arg1);
if (socket) {
// Device has received last `WRTE` to the socket
socket.ack();
return;
}
// Maybe the device is responding to a packet of last connection
// Tell the device to close the socket
void this.sendPacket(AdbCommand.Close, packet.arg1, packet.arg0);
}
async #handleClose(packet: AdbPacketData) { async #handleClose(packet: AdbPacketData) {
// If the socket is still pending // If the socket is still pending
if ( if (
@ -170,15 +156,8 @@ export class AdbPacketDispatcher implements Closeable {
// Ignore `arg0` and search for the socket // Ignore `arg0` and search for the socket
const socket = this.#sockets.get(packet.arg1); const socket = this.#sockets.get(packet.arg1);
if (socket) { if (socket) {
// The device want to close the socket await socket.close();
if (!socket.closed) { socket.dispose();
await this.sendPacket(
AdbCommand.Close,
packet.arg1,
packet.arg0,
);
}
await socket.dispose();
this.#sockets.delete(packet.arg1); this.#sockets.delete(packet.arg1);
return; return;
} }
@ -188,27 +167,22 @@ export class AdbPacketDispatcher implements Closeable {
// the device may also respond with two `CLSE` packets. // the device may also respond with two `CLSE` packets.
} }
async #handleWrite(packet: AdbPacketData) { #handleOkay(packet: AdbPacketData) {
const socket = this.#sockets.get(packet.arg1); if (this.#initializers.resolve(packet.arg1, packet.arg0)) {
if (!socket) { // Device successfully created the socket
throw new Error(`Unknown local socket id: ${packet.arg1}`); return;
} }
await socket.enqueue(packet.payload); const socket = this.#sockets.get(packet.arg1);
await this.sendPacket(AdbCommand.OK, packet.arg1, packet.arg0); if (socket) {
return; // Device has received last `WRTE` to the socket
} socket.ack();
return;
}
addReverseTunnel(service: string, handler: AdbIncomingSocketHandler) { // Maybe the device is responding to a packet of last connection
this.#incomingSocketHandlers.set(service, handler); // Tell the device to close the socket
} void this.sendPacket(AdbCommand.Close, packet.arg1, packet.arg0);
removeReverseTunnel(address: string) {
this.#incomingSocketHandlers.delete(address);
}
clearReverseTunnels() {
this.#incomingSocketHandlers.clear();
} }
async #handleOpen(packet: AdbPacketData) { async #handleOpen(packet: AdbPacketData) {
@ -240,12 +214,41 @@ export class AdbPacketDispatcher implements Closeable {
try { try {
await handler(controller.socket); await handler(controller.socket);
this.#sockets.set(localId, controller); this.#sockets.set(localId, controller);
await this.sendPacket(AdbCommand.OK, localId, remoteId); await this.sendPacket(AdbCommand.Okay, localId, remoteId);
} catch (e) { } catch (e) {
await this.sendPacket(AdbCommand.Close, 0, remoteId); await this.sendPacket(AdbCommand.Close, 0, remoteId);
} }
} }
async #handleWrite(packet: AdbPacketData) {
const socket = this.#sockets.get(packet.arg1);
if (!socket) {
throw new Error(`Unknown local socket id: ${packet.arg1}`);
}
let handled = false;
await Promise.race([
delay(5000).then(() => {
if (!handled) {
throw new Error(
`packet for \`${socket.service}\` not handled in 5 seconds`,
);
}
}),
(async () => {
await socket.enqueue(packet.payload);
await this.sendPacket(
AdbCommand.Okay,
packet.arg1,
packet.arg0,
);
handled = true;
})(),
]);
return;
}
async createSocket(service: string): Promise<AdbSocket> { async createSocket(service: string): Promise<AdbSocket> {
if (this.options.appendNullToServiceString) { if (this.options.appendNullToServiceString) {
service += "\0"; service += "\0";
@ -268,6 +271,18 @@ export class AdbPacketDispatcher implements Closeable {
return controller.socket; return controller.socket;
} }
addReverseTunnel(service: string, handler: AdbIncomingSocketHandler) {
this.#incomingSocketHandlers.set(service, handler);
}
removeReverseTunnel(address: string) {
this.#incomingSocketHandlers.delete(address);
}
clearReverseTunnels() {
this.#incomingSocketHandlers.clear();
}
async sendPacket( async sendPacket(
command: AdbCommand, command: AdbCommand,
arg0: number, arg0: number,
@ -306,7 +321,7 @@ export class AdbPacketDispatcher implements Closeable {
this.#closed = true; this.#closed = true;
this.#readAbortController.abort(); this.#readAbortController.abort();
if (this.options.preserveConnection ?? false) { if (this.options.preserveConnection) {
this.#writer.releaseLock(); this.#writer.releaseLock();
} else { } else {
await this.#writer.close(); await this.#writer.close();
@ -317,7 +332,7 @@ export class AdbPacketDispatcher implements Closeable {
#dispose() { #dispose() {
for (const socket of this.#sockets.values()) { for (const socket of this.#sockets.values()) {
socket.dispose().catch(unreachable); socket.dispose();
} }
this.#disconnected.resolve(); this.#disconnected.resolve();

View file

@ -5,7 +5,7 @@ export enum AdbCommand {
Auth = 0x48545541, // 'AUTH' Auth = 0x48545541, // 'AUTH'
Close = 0x45534c43, // 'CLSE' Close = 0x45534c43, // 'CLSE'
Connect = 0x4e584e43, // 'CNXN' Connect = 0x4e584e43, // 'CNXN'
OK = 0x59414b4f, // 'OKAY' Okay = 0x59414b4f, // 'OKAY'
Open = 0x4e45504f, // 'OPEN' Open = 0x4e45504f, // 'OPEN'
Write = 0x45545257, // 'WRTE' Write = 0x45545257, // 'WRTE'
} }

View file

@ -5,16 +5,15 @@ import type {
PushReadableStreamController, PushReadableStreamController,
ReadableStream, ReadableStream,
WritableStream, WritableStream,
WritableStreamDefaultController,
} from "@yume-chan/stream-extra"; } from "@yume-chan/stream-extra";
import { import {
ConsumableWritableStream, ConsumableWritableStream,
DistributionStream,
DuplexStreamFactory,
PushReadableStream, PushReadableStream,
pipeFrom,
} from "@yume-chan/stream-extra"; } from "@yume-chan/stream-extra";
import type { AdbSocket } from "../adb.js"; import type { AdbSocket } from "../adb.js";
import { raceSignal } from "../server/index.js";
import type { AdbPacketDispatcher } from "./dispatcher.js"; import type { AdbPacketDispatcher } from "./dispatcher.js";
import { AdbCommand } from "./packet.js"; import { AdbCommand } from "./packet.js";
@ -44,8 +43,6 @@ export class AdbDaemonSocketController
readonly localCreated!: boolean; readonly localCreated!: boolean;
readonly service!: string; readonly service!: string;
#duplex: DuplexStreamFactory<Uint8Array, Consumable<Uint8Array>>;
#readable: ReadableStream<Uint8Array>; #readable: ReadableStream<Uint8Array>;
#readableController!: PushReadableStreamController<Uint8Array>; #readableController!: PushReadableStreamController<Uint8Array>;
get readable() { get readable() {
@ -53,16 +50,14 @@ export class AdbDaemonSocketController
} }
#writePromise: PromiseResolver<void> | undefined; #writePromise: PromiseResolver<void> | undefined;
#writableController!: WritableStreamDefaultController;
readonly writable: WritableStream<Consumable<Uint8Array>>; readonly writable: WritableStream<Consumable<Uint8Array>>;
#closed = false; #closed = false;
/**
* Whether the socket is half-closed (i.e. the local side initiated the close). #closedPromise = new PromiseResolver<void>();
*
* It's only used by dispatcher to avoid sending another `CLSE` packet to remote.
*/
get closed() { get closed() {
return this.#closed; return this.#closedPromise.promise;
} }
#socket: AdbDaemonSocket; #socket: AdbDaemonSocket;
@ -77,66 +72,44 @@ export class AdbDaemonSocketController
this.localCreated = options.localCreated; this.localCreated = options.localCreated;
this.service = options.service; this.service = options.service;
// Check this image to help you understand the stream graph this.#readable = new PushReadableStream((controller) => {
// cspell: disable-next-line this.#readableController = controller;
// https://www.plantuml.com/plantuml/png/TL0zoeGm4ErpYc3l5JxyS0yWM6mX5j4C6p4cxcJ25ejttuGX88ZftizxUKmJI275pGhXl0PP_UkfK_CAz5Z2hcWsW9Ny2fdU4C1f5aSchFVxA8vJjlTPRhqZzDQMRB7AklwJ0xXtX0ZSKH1h24ghoKAdGY23FhxC4nS2pDvxzIvxb-8THU0XlEQJ-ZB7SnXTAvc_LhOckhMdLBnbtndpb-SB7a8q2SRD_W00
this.#duplex = new DuplexStreamFactory<
Uint8Array,
Consumable<Uint8Array>
>({
close: async () => {
this.#closed = true;
await this.#dispatcher.sendPacket(
AdbCommand.Close,
this.localId,
this.remoteId,
);
// Don't `dispose` here, we need to wait for `CLSE` response packet.
return false;
},
dispose: () => {
// Error out the pending writes
this.#writePromise?.reject(new Error("Socket closed"));
},
}); });
this.#readable = this.#duplex.wrapReadable( this.writable = new ConsumableWritableStream<Uint8Array>({
new PushReadableStream( start: (controller) => {
(controller) => { this.#writableController = controller;
this.#readableController = controller; },
}, write: async (data, controller) => {
{ highWaterMark: 0 }, const size = data.length;
), const chunkSize = this.#dispatcher.options.maxPayloadSize;
); for (
let start = 0, end = chunkSize;
this.writable = pipeFrom( start < size;
this.#duplex.createWritable( start = end, end += chunkSize
new ConsumableWritableStream<Uint8Array>({ ) {
write: async (chunk) => { this.#writePromise = new PromiseResolver();
// Wait for an ack packet await this.#dispatcher.sendPacket(
this.#writePromise = new PromiseResolver(); AdbCommand.Write,
await this.#dispatcher.sendPacket( this.localId,
AdbCommand.Write, this.remoteId,
this.localId, data.subarray(start, end),
this.remoteId, );
chunk, // Wait for ack packet
); await raceSignal(
await this.#writePromise.promise; () => this.#writePromise!.promise,
}, controller.signal,
}), );
), }
new DistributionStream(this.#dispatcher.options.maxPayloadSize), },
); });
this.#socket = new AdbDaemonSocket(this); this.#socket = new AdbDaemonSocket(this);
} }
async enqueue(data: Uint8Array) { async enqueue(data: Uint8Array) {
// Consumer may abort the `ReadableStream` to close the socket, // Consumers can `cancel` the `readable` if they are not interested in future data.
// it's OK to throw away further packets in this case. // Throw away the data if that happens.
if (this.#readableController.abortSignal.aborted) { if (this.#readableController.abortSignal.aborted) {
return; return;
} }
@ -149,11 +122,32 @@ export class AdbDaemonSocketController
} }
async close(): Promise<void> { async close(): Promise<void> {
await this.#duplex.close(); if (this.#closed) {
return;
}
this.#closed = true;
try {
this.#writableController.error(new Error("Socket closed"));
} catch {
// ignore
}
await this.#dispatcher.sendPacket(
AdbCommand.Close,
this.localId,
this.remoteId,
);
} }
dispose() { dispose() {
return this.#duplex.dispose(); try {
this.#readableController.close();
} catch {
// ignore
}
this.#closedPromise.resolve();
} }
} }
@ -188,7 +182,7 @@ export class AdbDaemonSocket implements AdbDaemonSocketInfo, AdbSocket {
return this.#controller.writable; return this.#controller.writable;
} }
get closed(): boolean { get closed(): Promise<void> {
return this.#controller.closed; return this.#controller.closed;
} }

View file

@ -154,7 +154,7 @@ export class BufferedReadableStream implements AsyncExactReadable {
} }
} }
cancel(reason?: unknown) { async cancel(reason?: unknown) {
return this.reader.cancel(reason); await this.reader.cancel(reason);
} }
} }

View file

@ -1,6 +1,10 @@
import { PromiseResolver } from "@yume-chan/async"; import { PromiseResolver } from "@yume-chan/async";
import type { QueuingStrategy, WritableStreamDefaultWriter } from "./stream.js"; import type {
QueuingStrategy,
WritableStreamDefaultController,
WritableStreamDefaultWriter,
} from "./stream.js";
import { ReadableStream, TransformStream, WritableStream } from "./stream.js"; import { ReadableStream, TransformStream, WritableStream } from "./stream.js";
interface Task { interface Task {
@ -161,8 +165,13 @@ export class ConsumableReadableStream<T> extends ReadableStream<Consumable<T>> {
} }
export interface ConsumableWritableStreamSink<T> { export interface ConsumableWritableStreamSink<T> {
start?(): void | PromiseLike<void>; start?(
write?(chunk: T): void | PromiseLike<void>; controller: WritableStreamDefaultController,
): void | PromiseLike<void>;
write?(
chunk: T,
controller: WritableStreamDefaultController,
): void | PromiseLike<void>;
abort?(reason: any): void | PromiseLike<void>; abort?(reason: any): void | PromiseLike<void>;
close?(): void | PromiseLike<void>; close?(): void | PromiseLike<void>;
} }
@ -196,11 +205,13 @@ export class ConsumableWritableStream<T> extends WritableStream<Consumable<T>> {
super( super(
{ {
start() { start(controller) {
return sink.start?.(); return sink.start?.(controller);
}, },
async write(chunk) { async write(chunk, controller) {
await chunk.tryConsume((value) => sink.write?.(value)); await chunk.tryConsume(
(value) => sink.write?.(value, controller),
);
chunk.consume(); chunk.consume();
}, },
abort(reason) { abort(reason) {