feat(adb): improve connection close handling

This commit is contained in:
Simon Chan 2022-05-01 18:31:19 +08:00
parent 704e70a585
commit 32987b5f2b
No known key found for this signature in database
GPG key ID: A8B69F750B9BCEDD
10 changed files with 80 additions and 71 deletions

View file

@ -169,6 +169,14 @@ function _Connect(): JSX.Element | null {
return;
}
function dispose() {
// Adb won't close the streams,
// so manually close them.
try { readable.cancel(); } catch { }
try { writable.close(); } catch { }
globalState.setDevice(undefined, undefined);
}
try {
const device = await Adb.authenticate(
{ readable, writable },
@ -177,20 +185,16 @@ function _Connect(): JSX.Element | null {
);
device.disconnected.then(() => {
globalState.setDevice(undefined, undefined);
dispose();
}, (e) => {
globalState.showErrorDialog(e);
globalState.setDevice(undefined, undefined);
dispose();
});
globalState.setDevice(selectedBackend, device);
} catch (e: any) {
globalState.showErrorDialog(e);
// The streams are still open when Adb authentication failed,
// manually close them to release the device.
readable.cancel();
writable.close();
dispose();
} finally {
setConnecting(false);
}

View file

@ -1,4 +1,4 @@
import { AdbPacketHeader, AdbPacketSerializeStream, DuplexStreamFactory, pipeFrom, ReadableStream, type AdbBackend, type AdbPacketData, type AdbPacketInit, type ReadableWritablePair, type WritableStream } from '@yume-chan/adb';
import { AdbPacketHeader, AdbPacketSerializeStream, DuplexStreamFactory, pipeFrom, ReadableStream, WritableStream, type AdbBackend, type AdbPacketData, type AdbPacketInit, type ReadableWritablePair } from '@yume-chan/adb';
import type { StructDeserializeStream } from "@yume-chan/struct";
export const ADB_DEVICE_FILTER: USBDeviceFilter = {
@ -77,14 +77,14 @@ export class AdbWebUsbBackendStream implements ReadableWritablePair<AdbPacketDat
}));
this._writable = pipeFrom(
factory.createWritable({
factory.createWritable(new WritableStream({
write: async (chunk) => {
await device.transferOut(outEndpoint.endpointNumber, chunk);
},
}, {
highWaterMark: 16 * 1024,
size(chunk) { return chunk.byteLength; },
}),
})),
new AdbPacketSerializeStream()
);
}

View file

@ -1,4 +1,4 @@
import { AdbPacket, AdbPacketSerializeStream, DuplexStreamFactory, pipeFrom, ReadableStream, StructDeserializeStream, type AdbBackend } from '@yume-chan/adb';
import { AdbPacket, AdbPacketSerializeStream, DuplexStreamFactory, pipeFrom, ReadableStream, StructDeserializeStream, WritableStream, type AdbBackend } from '@yume-chan/adb';
export default class AdbWsBackend implements AdbBackend {
public readonly serial: string;
@ -42,14 +42,14 @@ export default class AdbWsBackend implements AdbBackend {
size(chunk) { return chunk.byteLength; },
}));
const writable = factory.createWritable({
const writable = factory.createWritable(new WritableStream({
write: (chunk) => {
socket.send(chunk);
},
}, {
highWaterMark: 16 * 1024,
size(chunk) { return chunk.byteLength; },
});
}));
return {
readable: readable.pipeThrough(new StructDeserializeStream(AdbPacket)),

View file

@ -76,6 +76,7 @@ export class Adb implements Closeable {
await writer.write(calculateChecksum(init));
}
let banner: string;
try {
// https://android.googlesource.com/platform/packages/modules/adb/+/79010dc6d5ca7490c493df800d4421730f5466ca/transport.cpp#1252
// There are some other feature constants, but some of them are only used by ADB server, not devices (daemons).
@ -109,15 +110,16 @@ export class Adb implements Closeable {
payload: encodeUtf8(`host::features=${features};`),
});
const banner = await resolver.promise;
// Stop piping before creating `Adb` object
// Because `AdbPacketDispatcher` will lock the streams when initializing
banner = await resolver.promise;
} finally {
// When failed, release locks on `connection` so the caller can try again.
// When success, also release locks so `AdbPacketDispatcher` can use them.
abortController.abort();
writer.releaseLock();
// Wait until pipe stops (`ReadableStream` lock released)
await pipe;
}
return new Adb(
connection,
@ -125,11 +127,6 @@ export class Adb implements Closeable {
maxPayloadSize,
banner,
);
} catch (e) {
abortController.abort();
writer.releaseLock();
throw e;
}
}
private readonly dispatcher: AdbPacketDispatcher;

View file

@ -175,7 +175,7 @@ export class AdbSync extends AutoDisposable {
public override async dispose() {
super.dispose();
this.stream.close();
await this.stream.close();
await this.writer.close();
}
}

View file

@ -47,6 +47,7 @@ export class AdbPacketDispatcher implements Closeable {
public readonly options: AdbPacketDispatcherOptions;
private _closed = false;
private _disconnected = new PromiseResolver<void>();
public get disconnected() { return this._disconnected.promise; }
@ -90,17 +91,22 @@ export class AdbPacketDispatcher implements Closeable {
},
}), {
// There are multiple reasons for the pipe to stop,
// including device disconnection, protocol error, or user abort,
// (device disconnection, protocol error, or user abortion)
// if the underlying streams are still open,
// it's still possible to create another ADB connection.
// So don't close `readable` here.
preventCancel: false,
preventCancel: true,
signal: this._abortController.signal,
})
.then(() => {
this.dispose();
}, (e) => {
// https://github.com/MattiasBuelens/web-streams-polyfill/issues/115
// `e` is always `AbortError` (instead of what I give in `abortController.abort()`)
// so we can't check if `e` is a real error.
if (!this._closed) {
this._disconnected.reject(e);
}
this.dispose();
});
@ -265,6 +271,7 @@ export class AdbPacketDispatcher implements Closeable {
(init as AdbPacketInit).checksum = 0;
}
await this._writer.ready;
await this._writer.write(init as AdbPacketInit);
}
@ -280,13 +287,11 @@ export class AdbPacketDispatcher implements Closeable {
// Stop receiving
// It's possible that we haven't received all `CLSE` confirm packets,
// but it doesn't matter, the next connection can cope with them.
try {
this._closed = true;
this._abortController.abort();
} catch { }
this._writer.releaseLock();
// Adb connection doesn't have a method to confirm closing,
// so call `dispose` immediately
this.dispose();
// `pipe().then()` will call `dispose`
}
private dispose() {
@ -294,8 +299,6 @@ export class AdbPacketDispatcher implements Closeable {
socket.dispose();
}
this._writer.releaseLock();
this._disconnected.resolve();
}
}

View file

@ -1,7 +1,7 @@
import { PromiseResolver } from "@yume-chan/async";
import type { Disposable } from "@yume-chan/event";
import { AdbCommand } from '../packet.js';
import { ChunkStream, DuplexStreamFactory, pipeFrom, PushReadableStream, type PushReadableStreamController, type ReadableStream, type ReadableWritablePair, type WritableStream } from '../stream/index.js';
import { ChunkStream, DuplexStreamFactory, pipeFrom, PushReadableStream, WritableStream, type PushReadableStreamController, type ReadableStream, type ReadableWritablePair } from '../stream/index.js';
import type { AdbPacketDispatcher, Closeable } from './dispatcher.js';
export interface AdbSocketInfo {
@ -77,7 +77,8 @@ export class AdbSocketController implements AdbSocketInfo, ReadableWritablePair<
);
this.writable = pipeFrom(
this._factory.createWritable({
this._factory.createWritable(
new WritableStream({
write: async (chunk) => {
// Wait for an ack packet
this._writePromise = new PromiseResolver();
@ -88,8 +89,9 @@ export class AdbSocketController implements AdbSocketInfo, ReadableWritablePair<
chunk
);
await this._writePromise.promise;
},
}
}),
),
new ChunkStream(this.dispatcher.options.maxPayloadSize)
);

View file

@ -123,8 +123,8 @@ export class BufferedStream {
}
}
public close() {
this.reader.cancel();
public async close() {
await this.reader.cancel();
}
}

View file

@ -17,7 +17,7 @@ export interface AbortController {
/**
* Invoking this method will set this object's AbortSignal's aborted flag and signal to any observers that the associated activity is to be aborted.
*/
abort(): void;
abort(reason?: any): void;
}
export let AbortController: {

View file

@ -3,7 +3,7 @@ import type Struct from "@yume-chan/struct";
import type { StructValueType, ValueOrPromise } from "@yume-chan/struct";
import { decodeUtf8 } from "../utils/index.js";
import { BufferedStream, BufferedStreamEndedError } from "./buffered.js";
import { AbortController, AbortSignal, ReadableStream, ReadableStreamDefaultReader, TransformStream, WritableStream, WritableStreamDefaultWriter, type QueuingStrategy, type ReadableStreamDefaultController, type ReadableWritablePair, type UnderlyingSink } from "./detect.js";
import { AbortController, AbortSignal, ReadableStream, ReadableStreamDefaultReader, TransformStream, WritableStream, WritableStreamDefaultWriter, type QueuingStrategy, type ReadableStreamDefaultController, type ReadableWritablePair } from "./detect.js";
export interface DuplexStreamFactoryOptions {
close?: (() => ValueOrPromise<boolean | void>) | undefined;
@ -19,6 +19,7 @@ export interface DuplexStreamFactoryOptions {
*/
export class DuplexStreamFactory<R, W> {
private readableControllers: ReadableStreamDefaultController<R>[] = [];
private writers: WritableStreamDefaultWriter<W>[] = [];
private _writableClosed = false;
public get writableClosed() { return this._writableClosed; }
@ -49,29 +50,26 @@ export class DuplexStreamFactory<R, W> {
});
}
public createWritable(sink: UnderlyingSink<W>, strategy?: QueuingStrategy<W>): WritableStream<W> {
public createWritable(stream: WritableStream<W>): WritableStream<W> {
const writer = stream.getWriter();
this.writers.push(writer);
// `WritableStream` has no way to tell if the remote peer has closed the connection.
// So it only triggers `close`.
return new WritableStream<W>({
start: async (controller) => {
await sink.start?.(controller);
},
write: async (chunk, controller) => {
if (this._writableClosed) {
throw new Error("stream is closed");
}
await sink.write?.(chunk, controller);
write: async (chunk) => {
await writer.ready;
await writer.write(chunk);
},
abort: async (reason) => {
await sink.abort?.(reason);
await writer.abort(reason);
await this.close();
},
close: async () => {
await sink.close?.();
try { await writer.close(); } catch { }
await this.close();
},
}, strategy);
});
}
public async close() {
@ -79,10 +77,15 @@ export class DuplexStreamFactory<R, W> {
return;
}
this._writableClosed = true;
if (await this.options.close?.() !== false) {
// `close` can return `false` to disable automatic `dispose`.
await this.dispose();
}
for (const writer of this.writers) {
try { await writer.close(); } catch { }
}
}
public async dispose() {