feat: correctly close adb connection

This commit is contained in:
Simon Chan 2022-04-30 21:45:14 +08:00
parent be4dfcd614
commit 7a7f38b3b5
No known key found for this signature in database
GPG key ID: A8B69F750B9BCEDD
14 changed files with 354 additions and 331 deletions

View file

@ -1,5 +1,5 @@
import { DefaultButton, Dialog, Dropdown, IDropdownOption, PrimaryButton, ProgressIndicator, Stack, StackItem } from '@fluentui/react'; import { DefaultButton, Dialog, Dropdown, IDropdownOption, PrimaryButton, ProgressIndicator, Stack, StackItem } from '@fluentui/react';
import { Adb, AdbBackend, InspectStream, pipeFrom } from '@yume-chan/adb'; import { Adb, AdbBackend, AdbPacketData, AdbPacketInit, InspectStream, pipeFrom, ReadableStream, WritableStream } from '@yume-chan/adb';
import AdbDirectSocketsBackend from "@yume-chan/adb-backend-direct-sockets"; import AdbDirectSocketsBackend from "@yume-chan/adb-backend-direct-sockets";
import AdbWebUsbBackend, { AdbWebUsbBackendWatcher } from '@yume-chan/adb-backend-webusb'; import AdbWebUsbBackend, { AdbWebUsbBackendWatcher } from '@yume-chan/adb-backend-webusb';
import AdbWsBackend from '@yume-chan/adb-backend-ws'; import AdbWsBackend from '@yume-chan/adb-backend-ws';
@ -138,49 +138,67 @@ function _Connect(): JSX.Element | null {
}, [updateUsbBackendList]); }, [updateUsbBackendList]);
const connect = useCallback(async () => { const connect = useCallback(async () => {
try { if (!selectedBackend) {
if (selectedBackend) { return;
let device: Adb | undefined; }
try {
setConnecting(true); setConnecting(true);
let readable: ReadableStream<AdbPacketData>;
let writable: WritableStream<AdbPacketInit>;
try {
const streams = await selectedBackend.connect(); const streams = await selectedBackend.connect();
// Use `TransformStream` to intercept packets and log them // Use `InspectStream`s to intercept and log packets
const readable = streams.readable readable = streams.readable
.pipeThrough( .pipeThrough(
new InspectStream(packet => { new InspectStream(packet => {
globalState.appendLog('in', packet); globalState.appendLog('in', packet);
}) })
); );
const writable = pipeFrom(
writable = pipeFrom(
streams.writable, streams.writable,
new InspectStream(packet => { new InspectStream((packet: AdbPacketInit) => {
globalState.appendLog('out', packet); globalState.appendLog('out', packet);
}) })
); );
device = await Adb.authenticate({ readable, writable }, CredentialStore, undefined); } catch (e: any) {
globalState.showErrorDialog(e);
setConnecting(false);
return;
}
try {
const device = await Adb.authenticate(
{ readable, writable },
CredentialStore,
undefined
);
device.disconnected.then(() => { device.disconnected.then(() => {
globalState.setDevice(undefined, undefined); globalState.setDevice(undefined, undefined);
}, (e) => { }, (e) => {
globalState.showErrorDialog(e); globalState.showErrorDialog(e);
globalState.setDevice(undefined, undefined); globalState.setDevice(undefined, undefined);
}); });
globalState.setDevice(selectedBackend, device); globalState.setDevice(selectedBackend, device);
} catch (e) {
device?.dispose();
throw e;
}
}
} catch (e: any) { } catch (e: any) {
globalState.showErrorDialog(e); globalState.showErrorDialog(e);
// The streams are still open when Adb authentication failed,
// manually close them to release the device.
readable.cancel();
writable.close();
} finally { } finally {
setConnecting(false); setConnecting(false);
} }
}, [selectedBackend]); }, [selectedBackend]);
const disconnect = useCallback(async () => { const disconnect = useCallback(async () => {
try { try {
await globalState.device!.dispose(); await globalState.device!.close();
globalState.setDevice(undefined, undefined); globalState.setDevice(undefined, undefined);
} catch (e: any) { } catch (e: any) {
globalState.showErrorDialog(e); globalState.showErrorDialog(e);

View file

@ -1,9 +1,9 @@
import { IconButton, SearchBox, Stack, StackItem } from '@fluentui/react'; import { IconButton, SearchBox, Stack, StackItem } from '@fluentui/react';
import { reaction } from "mobx"; import { action, autorun, makeAutoObservable } from "mobx";
import { observer } from "mobx-react-lite"; import { observer } from "mobx-react-lite";
import { NextPage } from "next"; import { NextPage } from "next";
import Head from "next/head"; import Head from "next/head";
import { useCallback, useEffect, useRef, useState } from 'react'; import { useCallback, useEffect } from 'react';
import 'xterm/css/xterm.css'; import 'xterm/css/xterm.css';
import { ResizeObserver } from '../components'; import { ResizeObserver } from '../components';
import { globalState } from "../state"; import { globalState } from "../state";
@ -15,52 +15,51 @@ if (typeof window !== 'undefined') {
terminal = new AdbTerminal(); terminal = new AdbTerminal();
} }
const UpIconProps = { iconName: Icons.ChevronUp }; const state = makeAutoObservable({
const DownIconProps = { iconName: Icons.ChevronDown }; visible: false,
setVisible(value: boolean) {
this.visible = value;
},
const Shell: NextPage = (): JSX.Element | null => { searchKeyword: '',
const [searchKeyword, setSearchKeyword] = useState(''); setSearchKeyword(value: string) {
const handleSearchKeywordChange = useCallback((e, newValue?: string) => { this.searchKeyword = value;
setSearchKeyword(newValue ?? ''); terminal.searchAddon.findNext(value, { incremental: true });
if (newValue) { },
terminal.searchAddon.findNext(newValue, { incremental: true });
searchPrevious() {
terminal.searchAddon.findPrevious(this.searchKeyword);
},
searchNext() {
terminal.searchAddon.findNext(this.searchKeyword);
} }
}, []); }, {
const findPrevious = useCallback(() => { searchPrevious: action.bound,
terminal.searchAddon.findPrevious(searchKeyword); searchNext: action.bound,
}, [searchKeyword]); });
const findNext = useCallback(() => {
terminal.searchAddon.findNext(searchKeyword);
}, [searchKeyword]);
const connectingRef = useRef(false); autorun(() => {
useEffect(() => {
return reaction(
() => globalState.device,
async () => {
if (!globalState.device) { if (!globalState.device) {
terminal.socket = undefined; terminal.socket = undefined;
return; return;
} }
if (!!terminal.socket || connectingRef.current) { if (!terminal.socket && state.visible) {
return; globalState.device.subprocess.shell()
} .then(action(shell => {
terminal.socket = shell;
try { }), (e) => {
connectingRef.current = true;
const socket = await globalState.device.subprocess.shell();
terminal.socket = socket;
} catch (e: any) {
globalState.showErrorDialog(e); globalState.showErrorDialog(e);
} finally { });
connectingRef.current = false;
} }
}, });
{
fireImmediately: true, const UpIconProps = { iconName: Icons.ChevronUp };
} const DownIconProps = { iconName: Icons.ChevronDown };
);
const Shell: NextPage = (): JSX.Element | null => {
const handleSearchKeywordChange = useCallback((e, value?: string) => {
state.setSearchKeyword(value ?? '');
}, []); }, []);
const handleResize = useCallback(() => { const handleResize = useCallback(() => {
@ -73,6 +72,13 @@ const Shell: NextPage = (): JSX.Element | null => {
} }
}, []); }, []);
useEffect(() => {
state.setVisible(true);
return () => {
state.setVisible(false);
};
}, []);
return ( return (
<Stack {...RouteStackProps}> <Stack {...RouteStackProps}>
<Head> <Head>
@ -84,23 +90,23 @@ const Shell: NextPage = (): JSX.Element | null => {
<StackItem grow> <StackItem grow>
<SearchBox <SearchBox
placeholder="Find" placeholder="Find"
value={searchKeyword} value={state.searchKeyword}
onChange={handleSearchKeywordChange} onChange={handleSearchKeywordChange}
onSearch={findNext} onSearch={state.searchNext}
/> />
</StackItem> </StackItem>
<StackItem> <StackItem>
<IconButton <IconButton
disabled={!searchKeyword} disabled={!state.searchKeyword}
iconProps={UpIconProps} iconProps={UpIconProps}
onClick={findPrevious} onClick={state.searchPrevious}
/> />
</StackItem> </StackItem>
<StackItem> <StackItem>
<IconButton <IconButton
disabled={!searchKeyword} disabled={!state.searchKeyword}
iconProps={DownIconProps} iconProps={DownIconProps}
onClick={findNext} onClick={state.searchNext}
/> />
</StackItem> </StackItem>
</Stack> </Stack>

View file

@ -37,7 +37,7 @@ export class GlobalState {
showErrorDialog(message: Error | string) { showErrorDialog(message: Error | string) {
this.errorDialogVisible = true; this.errorDialogVisible = true;
if (message instanceof Error) { if (message instanceof Error) {
this.errorDialogMessage = message.stack!; this.errorDialogMessage = message.stack || message.message;
} else { } else {
this.errorDialogMessage = message; this.errorDialogMessage = message;
} }

View file

@ -34,31 +34,30 @@ export class AdbWebUsbBackendStream implements ReadableWritablePair<AdbPacketDat
public constructor(device: USBDevice, inEndpoint: USBEndpoint, outEndpoint: USBEndpoint) { public constructor(device: USBDevice, inEndpoint: USBEndpoint, outEndpoint: USBEndpoint) {
const factory = new DuplexStreamFactory<AdbPacketData, Uint8Array>({ const factory = new DuplexStreamFactory<AdbPacketData, Uint8Array>({
close: async () => { close: async () => {
try { await device.close(); } catch { /* device may have already disconnected */ }
},
dispose: async () => {
navigator.usb.removeEventListener('disconnect', handleUsbDisconnect); navigator.usb.removeEventListener('disconnect', handleUsbDisconnect);
try {
await device.close();
} catch {
// device may have already disconnected
}
}, },
}); });
function handleUsbDisconnect(e: USBConnectionEvent) { function handleUsbDisconnect(e: USBConnectionEvent) {
if (e.device === device) { if (e.device === device) {
factory.close(); factory.dispose();
} }
} }
navigator.usb.addEventListener('disconnect', handleUsbDisconnect); navigator.usb.addEventListener('disconnect', handleUsbDisconnect);
this._readable = factory.createWrapReadable(new ReadableStream<AdbPacketData>({ this._readable = factory.wrapReadable(new ReadableStream<AdbPacketData>({
async pull(controller) { async pull(controller) {
// The `length` argument in `transferIn` must not be smaller than what the device sent, // The `length` argument in `transferIn` must not be smaller than what the device sent,
// otherwise it will return `babble` status without any data. // otherwise it will return `babble` status without any data.
// Here we read exactly 24 bytes (packet header) followed by exactly `payloadLength`. // Here we read exactly 24 bytes (packet header) followed by exactly `payloadLength`.
const result = await device.transferIn(inEndpoint.endpointNumber, 24); const result = await device.transferIn(inEndpoint.endpointNumber, 24);
// TODO: webusb-backend: handle `babble` by discarding the data and receive again // TODO: webusb: handle `babble` by discarding the data and receive again
// TODO: webusb: on Windows, `transferIn` throws an NetworkError when device disconnected, check with other OSs.
// From spec, the `result.data` always covers the whole `buffer`. // From spec, the `result.data` always covers the whole `buffer`.
const buffer = new Uint8Array(result.data!.buffer); const buffer = new Uint8Array(result.data!.buffer);

View file

@ -1,4 +1,4 @@
import { AdbPacket, AdbPacketSerializeStream, DuplexStreamFactory, pipeFrom, StructDeserializeStream, type AdbBackend } from '@yume-chan/adb'; import { AdbPacket, AdbPacketSerializeStream, DuplexStreamFactory, pipeFrom, ReadableStream, StructDeserializeStream, type AdbBackend } from '@yume-chan/adb';
export default class AdbWsBackend implements AdbBackend { export default class AdbWsBackend implements AdbBackend {
public readonly serial: string; public readonly serial: string;
@ -28,10 +28,10 @@ export default class AdbWsBackend implements AdbBackend {
}); });
socket.onclose = () => { socket.onclose = () => {
factory.close(); factory.dispose();
}; };
const readable = factory.createReadable({ const readable = factory.wrapReadable(new ReadableStream({
start: (controller) => { start: (controller) => {
socket.onmessage = ({ data }: { data: ArrayBuffer; }) => { socket.onmessage = ({ data }: { data: ArrayBuffer; }) => {
controller.enqueue(new Uint8Array(data)); controller.enqueue(new Uint8Array(data));
@ -40,7 +40,7 @@ export default class AdbWsBackend implements AdbBackend {
}, { }, {
highWaterMark: 16 * 1024, highWaterMark: 16 * 1024,
size(chunk) { return chunk.byteLength; }, size(chunk) { return chunk.byteLength; },
}); }));
const writable = factory.createWritable({ const writable = factory.createWritable({
write: (chunk) => { write: (chunk) => {

View file

@ -5,7 +5,7 @@ import { AdbAuthenticationProcessor, ADB_DEFAULT_AUTHENTICATORS, type AdbCredent
import { AdbPower, AdbReverseCommand, AdbSubprocess, AdbSync, AdbTcpIpCommand, escapeArg, framebuffer, install, type AdbFrameBuffer } from './commands/index.js'; import { AdbPower, AdbReverseCommand, AdbSubprocess, AdbSync, AdbTcpIpCommand, escapeArg, framebuffer, install, type AdbFrameBuffer } from './commands/index.js';
import { AdbFeatures } from './features.js'; import { AdbFeatures } from './features.js';
import { AdbCommand, calculateChecksum, type AdbPacketData, type AdbPacketInit } from './packet.js'; import { AdbCommand, calculateChecksum, type AdbPacketData, type AdbPacketInit } from './packet.js';
import { AdbPacketDispatcher, type AdbSocket } from './socket/index.js'; import { AdbIncomingSocketHandler, AdbPacketDispatcher, type AdbSocket, type Closeable } from './socket/index.js';
import { AbortController, DecodeUtf8Stream, GatherStringStream, WritableStream, type ReadableWritablePair } from "./stream/index.js"; import { AbortController, DecodeUtf8Stream, GatherStringStream, WritableStream, type ReadableWritablePair } from "./stream/index.js";
import { decodeUtf8, encodeUtf8 } from "./utils/index.js"; import { decodeUtf8, encodeUtf8 } from "./utils/index.js";
@ -18,7 +18,7 @@ export enum AdbPropKey {
export const VERSION_OMIT_CHECKSUM = 0x01000001; export const VERSION_OMIT_CHECKSUM = 0x01000001;
export class Adb { export class Adb implements Closeable {
/** /**
* It's possible to call `authenticate` multiple times on a single connection, * It's possible to call `authenticate` multiple times on a single connection,
* every time the device receives a `CNXN` packet, it resets its internal state, * every time the device receives a `CNXN` packet, it resets its internal state,
@ -53,7 +53,7 @@ export class Adb {
await sendPacket(response); await sendPacket(response);
break; break;
default: default:
// Maybe the previous ADB session exited without reading all packets, // Maybe the previous ADB client exited without reading all packets,
// so they are still waiting in OS internal buffer. // so they are still waiting in OS internal buffer.
// Just ignore them. // Just ignore them.
// Because a `Connect` packet will reset the device, // Because a `Connect` packet will reset the device,
@ -78,15 +78,17 @@ export class Adb {
try { try {
// https://android.googlesource.com/platform/packages/modules/adb/+/79010dc6d5ca7490c493df800d4421730f5466ca/transport.cpp#1252 // https://android.googlesource.com/platform/packages/modules/adb/+/79010dc6d5ca7490c493df800d4421730f5466ca/transport.cpp#1252
// There are some other feature constants, but some of them are only used by ADB server, not devices. // There are some other feature constants, but some of them are only used by ADB server, not devices (daemons).
const features = [ const features = [
AdbFeatures.ShellV2, AdbFeatures.ShellV2,
AdbFeatures.Cmd, AdbFeatures.Cmd,
AdbFeatures.StatV2, AdbFeatures.StatV2,
AdbFeatures.ListV2, AdbFeatures.ListV2,
'fixed_push_mkdir', AdbFeatures.FixedPushMkdir,
'apex', 'apex',
'abb', 'abb',
// only tells the client the symlink timestamp issue in `adb push --sync` has been fixed.
// No special handling required.
'fixed_push_symlink_timestamp', 'fixed_push_symlink_timestamp',
'abb_exec', 'abb_exec',
'remount_shell', 'remount_shell',
@ -130,9 +132,9 @@ export class Adb {
} }
} }
private readonly packetDispatcher: AdbPacketDispatcher; private readonly dispatcher: AdbPacketDispatcher;
public get disconnected() { return this.packetDispatcher.disconnected; } public get disconnected() { return this.dispatcher.disconnected; }
private _protocolVersion: number | undefined; private _protocolVersion: number | undefined;
public get protocolVersion() { return this._protocolVersion; } public get protocolVersion() { return this._protocolVersion; }
@ -172,7 +174,7 @@ export class Adb {
appendNullToServiceString = true; appendNullToServiceString = true;
} }
this.packetDispatcher = new AdbPacketDispatcher( this.dispatcher = new AdbPacketDispatcher(
connection, connection,
{ {
calculateChecksum, calculateChecksum,
@ -185,7 +187,7 @@ export class Adb {
this.subprocess = new AdbSubprocess(this); this.subprocess = new AdbSubprocess(this);
this.power = new AdbPower(this); this.power = new AdbPower(this);
this.reverse = new AdbReverseCommand(this.packetDispatcher); this.reverse = new AdbReverseCommand(this);
this.tcpip = new AdbTcpIpCommand(this); this.tcpip = new AdbTcpIpCommand(this);
} }
@ -224,8 +226,12 @@ export class Adb {
} }
} }
public addIncomingSocketHandler(handler: AdbIncomingSocketHandler) {
return this.dispatcher.addIncomingSocketHandler(handler);
}
public async createSocket(service: string): Promise<AdbSocket> { public async createSocket(service: string): Promise<AdbSocket> {
return this.packetDispatcher.createSocket(service); return this.dispatcher.createSocket(service);
} }
public async createSocketAndWait(service: string): Promise<string> { public async createSocketAndWait(service: string): Promise<string> {
@ -264,7 +270,7 @@ export class Adb {
return framebuffer(this); return framebuffer(this);
} }
public async dispose(): Promise<void> { public async close(): Promise<void> {
this.packetDispatcher.dispose(); await this.dispatcher.close();
} }
} }

View file

@ -2,15 +2,11 @@
import { AutoDisposable } from '@yume-chan/event'; import { AutoDisposable } from '@yume-chan/event';
import Struct from '@yume-chan/struct'; import Struct from '@yume-chan/struct';
import type { AdbPacketData } from '../packet.js'; import type { Adb } from "../adb.js";
import type { AdbIncomingSocketEventArgs, AdbPacketDispatcher, AdbSocket } from '../socket/index.js'; import type { AdbIncomingSocketHandler, AdbSocket } from '../socket/index.js';
import { AdbBufferedStream } from '../stream/index.js'; import { AdbBufferedStream } from '../stream/index.js';
import { decodeUtf8 } from "../utils/index.js"; import { decodeUtf8 } from "../utils/index.js";
export interface AdbReverseHandler {
onSocket(packet: AdbPacketData, socket: AdbSocket): void;
}
export interface AdbForwardListener { export interface AdbForwardListener {
deviceSerial: string; deviceSerial: string;
@ -32,37 +28,30 @@ const AdbReverseErrorResponse =
}); });
export class AdbReverseCommand extends AutoDisposable { export class AdbReverseCommand extends AutoDisposable {
protected localPortToHandler = new Map<number, AdbReverseHandler>(); protected localPortToHandler = new Map<number, AdbIncomingSocketHandler>();
protected deviceAddressToLocalPort = new Map<string, number>(); protected deviceAddressToLocalPort = new Map<string, number>();
protected dispatcher: AdbPacketDispatcher; protected adb: Adb;
protected listening = false; protected listening = false;
public constructor(dispatcher: AdbPacketDispatcher) { public constructor(adb: Adb) {
super(); super();
this.dispatcher = dispatcher; this.adb = adb;
this.addDisposable(this.dispatcher.onIncomingSocket(this.handleIncomingSocket, this)); this.addDisposable(this.adb.addIncomingSocketHandler(this.handleIncomingSocket));
} }
protected handleIncomingSocket(e: AdbIncomingSocketEventArgs): void { protected handleIncomingSocket = async (socket: AdbSocket) => {
if (e.handled) { const address = socket.serviceString;
return;
}
const address = decodeUtf8(e.packet.payload);
// Address format: `tcp:12345\0` // Address format: `tcp:12345\0`
const port = Number.parseInt(address.substring(4)); const port = Number.parseInt(address.substring(4));
if (this.localPortToHandler.has(port)) { return !!(await this.localPortToHandler.get(port)?.(socket));
this.localPortToHandler.get(port)!.onSocket(e.packet, e.socket); };
e.handled = true;
}
}
private async createBufferedStream(service: string) { private async createBufferedStream(service: string) {
const socket = await this.dispatcher.createSocket(service); const socket = await this.adb.createSocket(service);
return new AdbBufferedStream(socket); return new AdbBufferedStream(socket);
} }
@ -96,7 +85,7 @@ export class AdbReverseCommand extends AutoDisposable {
public async add( public async add(
deviceAddress: string, deviceAddress: string,
localPort: number, localPort: number,
handler: AdbReverseHandler, handler: AdbIncomingSocketHandler,
): Promise<string> { ): Promise<string> {
const stream = await this.sendRequest(`reverse:forward:${deviceAddress};tcp:${localPort}`); const stream = await this.sendRequest(`reverse:forward:${deviceAddress};tcp:${localPort}`);

View file

@ -1,6 +1,6 @@
import type { Adb } from "../../../adb.js"; import type { Adb } from "../../../adb.js";
import type { AdbSocket } from "../../../socket/index.js"; import type { AdbSocket } from "../../../socket/index.js";
import { DuplexStreamFactory, type ReadableStream } from "../../../stream/index.js"; import { DuplexStreamFactory, ReadableStream } from "../../../stream/index.js";
import type { AdbSubprocessProtocol } from "./types.js"; import type { AdbSubprocessProtocol } from "./types.js";
/** /**
@ -53,8 +53,8 @@ export class AdbSubprocessNoneProtocol implements AdbSubprocessProtocol {
}, },
}); });
this._stdout = factory.createWrapReadable(this.socket.readable); this._stdout = factory.wrapReadable(this.socket.readable);
this._stderr = factory.createReadable(); this._stderr = factory.wrapReadable(new ReadableStream());
this._exit = factory.closed.then(() => 0); this._exit = factory.closed.then(() => 0);
} }

View file

@ -9,7 +9,7 @@ export const AdbSyncDataResponse =
.uint8Array('data', { lengthField: 'dataLength' }) .uint8Array('data', { lengthField: 'dataLength' })
.extra({ id: AdbSyncResponseId.Data as const }); .extra({ id: AdbSyncResponseId.Data as const });
const ResponseTypes = { const RESPONSE_TYPES = {
[AdbSyncResponseId.Data]: AdbSyncDataResponse, [AdbSyncResponseId.Data]: AdbSyncDataResponse,
[AdbSyncResponseId.Done]: new AdbSyncDoneResponse(AdbSyncDataResponse.size), [AdbSyncResponseId.Done]: new AdbSyncDoneResponse(AdbSyncDataResponse.size),
}; };
@ -24,7 +24,7 @@ export function adbSyncPull(
await adbSyncWriteRequest(writer, AdbSyncRequestId.Receive, path); await adbSyncWriteRequest(writer, AdbSyncRequestId.Receive, path);
}, },
async pull(controller) { async pull(controller) {
const response = await adbSyncReadResponse(stream, ResponseTypes); const response = await adbSyncReadResponse(stream, RESPONSE_TYPES);
switch (response.id) { switch (response.id) {
case AdbSyncResponseId.Data: case AdbSyncResponseId.Data:
controller.enqueue(response.data!); controller.enqueue(response.data!);
@ -35,7 +35,10 @@ export function adbSyncPull(
default: default:
throw new Error('Unexpected response id'); throw new Error('Unexpected response id');
} }
} },
cancel() {
throw new Error(`Sync commands don't support cancel.`);
},
}, { }, {
highWaterMark: 16 * 1024, highWaterMark: 16 * 1024,
size(chunk) { return chunk.byteLength; } size(chunk) { return chunk.byteLength; }

View file

@ -1,20 +1,12 @@
import { AsyncOperationManager, PromiseResolver } from '@yume-chan/async'; import { AsyncOperationManager, PromiseResolver } from '@yume-chan/async';
import { AutoDisposable, EventEmitter } from '@yume-chan/event'; import type { RemoveEventListener } from '@yume-chan/event';
import type { ValueOrPromise } from "@yume-chan/struct";
import { AdbCommand, calculateChecksum, type AdbPacketData, type AdbPacketInit } from '../packet.js'; import { AdbCommand, calculateChecksum, type AdbPacketData, type AdbPacketInit } from '../packet.js';
import { AbortController, WritableStream, WritableStreamDefaultWriter, type ReadableWritablePair } from '../stream/index.js'; import { AbortController, WritableStream, WritableStreamDefaultWriter, type ReadableWritablePair } from '../stream/index.js';
import { decodeUtf8, encodeUtf8 } from '../utils/index.js'; import { decodeUtf8, encodeUtf8 } from '../utils/index.js';
import { AdbSocket, AdbSocketController } from './socket.js'; import { AdbSocket, AdbSocketController } from './socket.js';
export interface AdbIncomingSocketEventArgs {
handled: boolean;
packet: AdbPacketData;
serviceString: string;
socket: AdbSocket;
}
const EmptyUint8Array = new Uint8Array(0); const EmptyUint8Array = new Uint8Array(0);
export interface AdbPacketDispatcherOptions { export interface AdbPacketDispatcherOptions {
@ -29,7 +21,23 @@ export interface AdbPacketDispatcherOptions {
maxPayloadSize: number; maxPayloadSize: number;
} }
export class AdbPacketDispatcher extends AutoDisposable { export type AdbIncomingSocketHandler = (socket: AdbSocket) => ValueOrPromise<boolean>;
export interface Closeable {
close(): ValueOrPromise<void>;
}
/**
* The dispatcher is the "dumb" part of the connection handling logic.
*
* Except some options to change some minor behaviors,
* its only job is forwarding packets between authenticated underlying streams
* and abstracted socket objects.
*
* The `Adb` class is responsible for doing the authentication,
* negotiating the options, and has shortcuts to high-level services.
*/
export class AdbPacketDispatcher implements Closeable {
// ADB socket id starts from 1 // ADB socket id starts from 1
// (0 means open failed) // (0 means open failed)
private readonly initializers = new AsyncOperationManager(1); private readonly initializers = new AsyncOperationManager(1);
@ -42,8 +50,7 @@ export class AdbPacketDispatcher extends AutoDisposable {
private _disconnected = new PromiseResolver<void>(); private _disconnected = new PromiseResolver<void>();
public get disconnected() { return this._disconnected.promise; } public get disconnected() { return this._disconnected.promise; }
private readonly incomingSocketEvent = this.addDisposable(new EventEmitter<AdbIncomingSocketEventArgs>()); private _incomingSocketHandlers: Set<AdbIncomingSocketHandler> = new Set();
public get onIncomingSocket() { return this.incomingSocketEvent.event; }
private _abortController = new AbortController(); private _abortController = new AbortController();
@ -51,41 +58,42 @@ export class AdbPacketDispatcher extends AutoDisposable {
connection: ReadableWritablePair<AdbPacketData, AdbPacketInit>, connection: ReadableWritablePair<AdbPacketData, AdbPacketInit>,
options: AdbPacketDispatcherOptions options: AdbPacketDispatcherOptions
) { ) {
super();
this.options = options; this.options = options;
connection.readable connection.readable
.pipeTo(new WritableStream({ .pipeTo(new WritableStream({
write: async (packet) => { write: async (packet) => {
try {
switch (packet.command) { switch (packet.command) {
case AdbCommand.OK: case AdbCommand.OK:
this.handleOk(packet); this.handleOk(packet);
return; break;
case AdbCommand.Close: case AdbCommand.Close:
await this.handleClose(packet); await this.handleClose(packet);
return; break;
case AdbCommand.Write: case AdbCommand.Write:
if (this.sockets.has(packet.arg1)) { if (this.sockets.has(packet.arg1)) {
await this.sockets.get(packet.arg1)!.enqueue(packet.payload); await this.sockets.get(packet.arg1)!.enqueue(packet.payload);
await this.sendPacket(AdbCommand.OK, packet.arg1, packet.arg0); await this.sendPacket(AdbCommand.OK, packet.arg1, packet.arg0);
break;
} }
throw new Error(`Unknown local socket id: ${packet.arg1}`);
// Maybe the device is responding to a packet of last connection
// Just ignore it
return;
case AdbCommand.Open: case AdbCommand.Open:
await this.handleOpen(packet); await this.handleOpen(packet);
return; break;
} default:
} catch (e) { // Junk data may only appear in the authentication phase,
// Throw error here will stop the pipe // since the dispatcher only works after authentication,
// But won't close `readable` because of `preventCancel: true` // all packets should have a valid command.
throw e; // (although it's possible that Adb added new commands in the future)
throw new Error(`Unknown command: ${packet.command.toString(16)}`);
} }
}, },
}), { }), {
// There are multiple reasons for the pipe to stop,
// including device disconnection, protocol error, or user abort,
// if the underlying streams are still open,
// it's still possible to create another ADB connection.
// So don't close `readable` here.
preventCancel: false, preventCancel: false,
signal: this._abortController.signal, signal: this._abortController.signal,
}) })
@ -125,13 +133,18 @@ export class AdbPacketDispatcher extends AutoDisposable {
* CLOSE() operations. * CLOSE() operations.
*/ */
// So don't return if `reject` didn't find a pending socket // If the socket is still pending
if (packet.arg0 === 0 && if (packet.arg0 === 0 &&
this.initializers.reject(packet.arg1, new Error('Socket open failed'))) { this.initializers.reject(packet.arg1, new Error('Socket open failed'))) {
// Device failed to create the socket // Device failed to create the socket
// (unknown service string, failed to execute command, etc.)
// it doesn't break the connection,
// so only reject the socket creation promise,
// don't throw an error here.
return; return;
} }
// Ignore `arg0` and search for the socket
const socket = this.sockets.get(packet.arg1); const socket = this.sockets.get(packet.arg1);
if (socket) { if (socket) {
// The device want to close the socket // The device want to close the socket
@ -143,8 +156,18 @@ export class AdbPacketDispatcher extends AutoDisposable {
return; return;
} }
// Maybe the device is responding to a packet of last connection // TODO: adb: is double closing an socket a catastrophic error?
// Just ignore it // If the client sends two `CLSE` packets for one socket,
// the device may also respond with two `CLSE` packets.
}
public addIncomingSocketHandler(handler: AdbIncomingSocketHandler): RemoveEventListener {
this._incomingSocketHandlers.add(handler);
const remove = () => {
this._incomingSocketHandlers.delete(handler);
};
remove.dispose = remove;
return remove;
} }
private async handleOpen(packet: AdbPacketData) { private async handleOpen(packet: AdbPacketData) {
@ -164,22 +187,17 @@ export class AdbPacketDispatcher extends AutoDisposable {
serviceString, serviceString,
}); });
const args: AdbIncomingSocketEventArgs = { for (const handler of this._incomingSocketHandlers) {
handled: false, if (await handler(controller.socket)) {
packet,
serviceString,
socket: controller.socket,
};
this.incomingSocketEvent.fire(args);
if (args.handled) {
this.sockets.set(localId, controller); this.sockets.set(localId, controller);
await this.sendPacket(AdbCommand.OK, localId, remoteId); await this.sendPacket(AdbCommand.OK, localId, remoteId);
} else { return;
await this.sendPacket(AdbCommand.Close, 0, remoteId);
} }
} }
await this.sendPacket(AdbCommand.Close, 0, remoteId);
}
public async createSocket(serviceString: string): Promise<AdbSocket> { public async createSocket(serviceString: string): Promise<AdbSocket> {
if (this.options.appendNullToServiceString) { if (this.options.appendNullToServiceString) {
serviceString += '\0'; serviceString += '\0';
@ -250,21 +268,34 @@ export class AdbPacketDispatcher extends AutoDisposable {
await this._writer.write(init as AdbPacketInit); await this._writer.write(init as AdbPacketInit);
} }
public override dispose() { public async close() {
// Send `CLSE` packets for all sockets
await Promise.all(
Array.from(
this.sockets.values(),
socket => socket.close(),
)
);
// Stop receiving
// It's possible that we haven't received all `CLSE` confirm packets,
// but it doesn't matter, the next connection can cope with them.
try {
this._abortController.abort();
} catch { }
// Adb connection doesn't have a method to confirm closing,
// so call `dispose` immediately
this.dispose();
}
private dispose() {
for (const socket of this.sockets.values()) { for (const socket of this.sockets.values()) {
socket.dispose(); socket.dispose();
} }
this.sockets.clear();
try {
// Stop pipes
this._abortController.abort();
} catch { }
this._writer.releaseLock(); this._writer.releaseLock();
this._disconnected.resolve(); this._disconnected.resolve();
super.dispose();
} }
} }

View file

@ -1,7 +1,8 @@
import { PromiseResolver } from "@yume-chan/async"; import { PromiseResolver } from "@yume-chan/async";
import type { Disposable } from "@yume-chan/event";
import { AdbCommand } from '../packet.js'; import { AdbCommand } from '../packet.js';
import { ChunkStream, DuplexStreamFactory, pipeFrom, type PushReadableStreamController, type ReadableStream, type ReadableWritablePair, type WritableStream } from '../stream/index.js'; import { ChunkStream, DuplexStreamFactory, pipeFrom, PushReadableStream, type PushReadableStreamController, type ReadableStream, type ReadableWritablePair, type WritableStream } from '../stream/index.js';
import type { AdbPacketDispatcher } from './dispatcher.js'; import type { AdbPacketDispatcher, Closeable } from './dispatcher.js';
export interface AdbSocketInfo { export interface AdbSocketInfo {
localId: number; localId: number;
@ -17,7 +18,7 @@ export interface AdbSocketConstructionOptions extends AdbSocketInfo {
highWaterMark?: number | undefined; highWaterMark?: number | undefined;
} }
export class AdbSocketController implements AdbSocketInfo, ReadableWritablePair<Uint8Array, Uint8Array> { export class AdbSocketController implements AdbSocketInfo, ReadableWritablePair<Uint8Array, Uint8Array>, Closeable, Disposable {
private readonly dispatcher!: AdbPacketDispatcher; private readonly dispatcher!: AdbPacketDispatcher;
public readonly localId!: number; public readonly localId!: number;
@ -49,16 +50,31 @@ export class AdbSocketController implements AdbSocketInfo, ReadableWritablePair<
this._factory = new DuplexStreamFactory<Uint8Array, Uint8Array>({ this._factory = new DuplexStreamFactory<Uint8Array, Uint8Array>({
close: async () => { close: async () => {
await this.close(); await this.dispatcher.sendPacket(
AdbCommand.Close,
this.localId,
this.remoteId
);
// Don't `dispose` here, we need to wait for `CLSE` response packet.
return false;
},
dispose: () => {
this._closed = true;
// Error out the pending writes
this._writePromise?.reject(new Error('Socket closed'));
}, },
}); });
this._readable = this._factory.createPushReadable(controller => { this._readable = this._factory.wrapReadable(
new PushReadableStream(controller => {
this._readableController = controller; this._readableController = controller;
}, { }, {
highWaterMark: options.highWaterMark ?? 16 * 1024, highWaterMark: options.highWaterMark ?? 16 * 1024,
size(chunk) { return chunk.byteLength; } size(chunk) { return chunk.byteLength; }
}); })
);
this.writable = pipeFrom( this.writable = pipeFrom(
this._factory.createWritable({ this._factory.createWritable({
@ -89,32 +105,22 @@ export class AdbSocketController implements AdbSocketInfo, ReadableWritablePair<
} }
public async close(): Promise<void> { public async close(): Promise<void> {
// Error out the pending writes this._factory.close();
this._writePromise?.reject(new Error('Socket closed'));
if (!this._closed) {
this._closed = true;
// Only send close packet when `close` is called before `dispose`
// (the client initiated the close)
await this.dispatcher.sendPacket(
AdbCommand.Close,
this.localId,
this.remoteId
);
}
} }
public dispose() { public dispose() {
this._closed = true; this._factory.dispose();
this._factory.close();
// Close `writable` side
this.close();
} }
} }
/**
* AdbSocket is a duplex stream.
*
* To close it, call either `socket.close()`,
* `socket.readable.cancel()`, `socket.readable.getReader().cancel()`,
* `socket.writable.abort()`, `socket.writable.getWriter().abort()`,
* `socket.writable.close()` or `socket.writable.getWriter().close()`.
*/
export class AdbSocket implements AdbSocketInfo, ReadableWritablePair<Uint8Array, Uint8Array>{ export class AdbSocket implements AdbSocketInfo, ReadableWritablePair<Uint8Array, Uint8Array>{
private _controller: AdbSocketController; private _controller: AdbSocketController;

View file

@ -1,9 +1,10 @@
import { ReadableStream } from "./index.js";
import { DuplexStreamFactory } from './transform.js'; import { DuplexStreamFactory } from './transform.js';
describe('DuplexStreamFactory', () => { describe('DuplexStreamFactory', () => {
it('should close all readable', async () => { it('should close all readable', async () => {
const factory = new DuplexStreamFactory(); const factory = new DuplexStreamFactory();
const readable = factory.createReadable(); const readable = factory.wrapReadable(new ReadableStream());
const reader = readable.getReader(); const reader = readable.getReader();
await factory.close(); await factory.close();
await reader.closed; await reader.closed;

View file

@ -3,12 +3,12 @@ import type Struct from "@yume-chan/struct";
import type { StructValueType, ValueOrPromise } from "@yume-chan/struct"; import type { StructValueType, ValueOrPromise } from "@yume-chan/struct";
import { decodeUtf8 } from "../utils/index.js"; import { decodeUtf8 } from "../utils/index.js";
import { BufferedStream, BufferedStreamEndedError } from "./buffered.js"; import { BufferedStream, BufferedStreamEndedError } from "./buffered.js";
import { AbortController, AbortSignal, ReadableStream, ReadableStreamDefaultReader, TransformStream, WritableStream, WritableStreamDefaultWriter, type QueuingStrategy, type ReadableStreamDefaultController, type ReadableWritablePair, type UnderlyingSink, type UnderlyingSource } from "./detect.js"; import { AbortController, AbortSignal, ReadableStream, ReadableStreamDefaultReader, TransformStream, WritableStream, WritableStreamDefaultWriter, type QueuingStrategy, type ReadableStreamDefaultController, type ReadableWritablePair, type UnderlyingSink } from "./detect.js";
export interface DuplexStreamFactoryOptions { export interface DuplexStreamFactoryOptions {
preventCloseReadableStreams?: boolean | undefined; close?: (() => ValueOrPromise<boolean | void>) | undefined;
close?: (() => void | Promise<void>) | undefined; dispose?: (() => void | Promise<void>) | undefined;
} }
/** /**
@ -19,83 +19,39 @@ export interface DuplexStreamFactoryOptions {
*/ */
export class DuplexStreamFactory<R, W> { export class DuplexStreamFactory<R, W> {
private readableControllers: ReadableStreamDefaultController<R>[] = []; private readableControllers: ReadableStreamDefaultController<R>[] = [];
private pushReadableControllers: PushReadableStreamController<R>[] = [];
private _writableClosed = false;
public get writableClosed() { return this._writableClosed; }
private _closed = new PromiseResolver<void>(); private _closed = new PromiseResolver<void>();
public get closed() { return this._closed.promise; } public get closed() { return this._closed.promise; }
private options: DuplexStreamFactoryOptions; private options: DuplexStreamFactoryOptions;
private _closeRequestedByReadable = false;
private _writableClosed = false;
public constructor(options?: DuplexStreamFactoryOptions) { public constructor(options?: DuplexStreamFactoryOptions) {
this.options = options ?? {}; this.options = options ?? {};
} }
public createPushReadable(source: PushReadableStreamSource<R>, strategy?: QueuingStrategy<R>): PushReadableStream<R> { public wrapReadable(readable: ReadableStream<R>): WrapReadableStream<R> {
return new PushReadableStream<R>(controller => {
this.pushReadableControllers.push(controller);
controller.abortSignal.addEventListener('abort', async () => {
this._closeRequestedByReadable = true;
await this.close();
});
source({
abortSignal: controller.abortSignal,
async enqueue(chunk) {
await controller.enqueue(chunk);
},
close: async () => {
// The source signals stream ended,
// usually means the other end closed the connection first.
controller.close();
this._closeRequestedByReadable = true;
await this.close();
},
error: async (e?: any) => {
controller.error(e);
this._closeRequestedByReadable = true;
await this.close();
},
});
}, strategy);
};
public createWrapReadable(wrapper: ReadableStream<R> | WrapReadableStreamStart<R> | ReadableStreamWrapper<R>): WrapReadableStream<R> {
return new WrapReadableStream<R>({ return new WrapReadableStream<R>({
async start() { start: (controller) => {
return getWrappedReadableStream(wrapper); this.readableControllers.push(controller);
return readable;
},
cancel: async () => {
// cancel means the local peer closes the connection first.
await this.close();
}, },
close: async () => { close: async () => {
if ('close' in wrapper) { // stream end means the remote peer closed the connection first.
await wrapper.close?.(); await this.dispose();
}
this._closeRequestedByReadable = true;
await this.close();
}, },
}); });
} }
public createReadable(source?: UnderlyingSource<R>, strategy?: QueuingStrategy<R>): ReadableStream<R> {
return new ReadableStream<R>({
start: async (controller) => {
this.readableControllers.push(controller);
await source?.start?.(controller);
},
pull: (controller) => {
return source?.pull?.(controller);
},
cancel: async (reason) => {
await source?.cancel?.(reason);
this._closeRequestedByReadable = true;
await this.close();
},
}, strategy);
}
public createWritable(sink: UnderlyingSink<W>, strategy?: QueuingStrategy<W>): WritableStream<W> { public createWritable(sink: UnderlyingSink<W>, strategy?: QueuingStrategy<W>): WritableStream<W> {
// `WritableStream` has no way to tell if the remote peer has closed the connection.
// So it only triggers `close`.
return new WritableStream<W>({ return new WritableStream<W>({
start: async (controller) => { start: async (controller) => {
await sink.start?.(controller); await sink.start?.(controller);
@ -107,41 +63,37 @@ export class DuplexStreamFactory<R, W> {
await sink.write?.(chunk, controller); await sink.write?.(chunk, controller);
}, },
close: async () => {
await sink.close?.();
this.close();
},
abort: async (reason) => { abort: async (reason) => {
await sink.abort?.(reason); await sink.abort?.(reason);
await this.close(); await this.close();
}, },
close: async () => {
await sink.close?.();
await this.close();
},
}, strategy); }, strategy);
} }
public async closeReadableStreams() { public async close() {
if (this._writableClosed) {
return;
}
this._writableClosed = true;
if (await this.options.close?.() !== false) {
// `close` can return `false` to disable automatic `dispose`.
await this.dispose();
}
}
public async dispose() {
this._writableClosed = true;
this._closed.resolve(); this._closed.resolve();
await this.options.close?.();
for (const controller of this.readableControllers) { for (const controller of this.readableControllers) {
try { try { controller.close(); } catch { }
controller.close();
} catch { }
} }
for (const controller of this.pushReadableControllers) { await this.options.dispose?.();
try {
controller.close();
} catch { }
}
}
public async close() {
this._writableClosed = true;
if (this._closeRequestedByReadable ||
!this.options.preventCloseReadableStreams) {
await this.closeReadableStreams();
}
} }
} }
@ -290,20 +242,22 @@ export class WrapWritableStream<T> extends WritableStream<T> {
} }
} }
export type WrapReadableStreamStart<T> = () => ValueOrPromise<ReadableStream<T>>; export type WrapReadableStreamStart<T> = (controller: ReadableStreamDefaultController<T>) => ValueOrPromise<ReadableStream<T>>;
export interface ReadableStreamWrapper<T> { export interface ReadableStreamWrapper<T> {
start: WrapReadableStreamStart<T>; start: WrapReadableStreamStart<T>;
close?(): Promise<void>; cancel?(reason?: any): ValueOrPromise<void>;
close?(): ValueOrPromise<void>;
} }
function getWrappedReadableStream<T>( function getWrappedReadableStream<T>(
wrapper: ReadableStream<T> | WrapReadableStreamStart<T> | ReadableStreamWrapper<T> wrapper: ReadableStream<T> | WrapReadableStreamStart<T> | ReadableStreamWrapper<T>,
controller: ReadableStreamDefaultController<T>
) { ) {
if ('start' in wrapper) { if ('start' in wrapper) {
return wrapper.start(); return wrapper.start(controller);
} else if (typeof wrapper === 'function') { } else if (typeof wrapper === 'function') {
return wrapper(); return wrapper(controller);
} else { } else {
// Can't use `wrapper instanceof ReadableStream` // Can't use `wrapper instanceof ReadableStream`
// Because we want to be compatible with any ReadableStream-like objects // Because we want to be compatible with any ReadableStream-like objects
@ -311,6 +265,13 @@ function getWrappedReadableStream<T>(
} }
} }
/**
* This class has multiple usages:
*
* 1. Get notified when the stream is cancelled or closed.
* 2. Synchronously create a `ReadableStream` by asynchronously return another `ReadableStream`.
* 3. Convert native `ReadableStream`s to polyfilled ones so they can `pipe` between.
*/
export class WrapReadableStream<T> extends ReadableStream<T>{ export class WrapReadableStream<T> extends ReadableStream<T>{
public readable!: ReadableStream<T>; public readable!: ReadableStream<T>;
@ -318,20 +279,20 @@ export class WrapReadableStream<T> extends ReadableStream<T>{
public constructor(wrapper: ReadableStream<T> | WrapReadableStreamStart<T> | ReadableStreamWrapper<T>) { public constructor(wrapper: ReadableStream<T> | WrapReadableStreamStart<T> | ReadableStreamWrapper<T>) {
super({ super({
start: async () => { start: async (controller) => {
// `start` is invoked before `ReadableStream`'s constructor finish, // `start` is invoked before `ReadableStream`'s constructor finish,
// so using `this` synchronously causes // so using `this` synchronously causes
// "Must call super constructor in derived class before accessing 'this' or returning from derived constructor". // "Must call super constructor in derived class before accessing 'this' or returning from derived constructor".
// Queue a microtask to avoid this. // Queue a microtask to avoid this.
await Promise.resolve(); await Promise.resolve();
this.readable = await getWrappedReadableStream(wrapper); this.readable = await getWrappedReadableStream(wrapper, controller);
this.reader = this.readable.getReader(); this.reader = this.readable.getReader();
}, },
cancel: async (reason) => { cancel: async (reason) => {
await this.reader.cancel(reason); await this.reader.cancel(reason);
if ('close' in wrapper) { if ('cancel' in wrapper) {
await wrapper.close?.(); await wrapper.cancel?.(reason);
} }
}, },
pull: async (controller) => { pull: async (controller) => {

View file

@ -103,11 +103,14 @@ export class ScrcpyClientReverseConnection extends ScrcpyClientConnection {
const queue = new TransformStream<AdbSocket>(); const queue = new TransformStream<AdbSocket>();
this.streams = queue.readable.getReader(); this.streams = queue.readable.getReader();
const writer = queue.writable.getWriter(); const writer = queue.writable.getWriter();
this.address = await this.device.reverse.add('localabstract:scrcpy', 27183, { this.address = await this.device.reverse.add(
onSocket: (packet, stream) => { 'localabstract:scrcpy',
writer.write(stream); 27183,
socket => {
writer.write(socket);
return true;
}, },
}); );
} }
private async accept(): Promise<AdbSocket> { private async accept(): Promise<AdbSocket> {