1
0
Fork 0
mirror of https://github.com/librespot-org/librespot.git synced 2025-10-03 01:39:28 +02:00

Discovery: Refactor and add Avahi DBus backend (#1347)

* discovery: use opaque error type for DnsSdError

This helps to decouple discovery and core by not leaking implementation
details of the zeroconf backend into Error conversion impls in core.

* discovery: map all MDNS/DNS-SD errors to DiscoveryError::DnsSdError

previously, libmdns errors would use a generic conversion
from std::io::Error to core::Error

* discovery: use an opaque type for the handle to the DNS-SD service

* discovery: make features additive

i.e. add with-libmdns instead of using not(with-dns-sd).

The logic is such that enabling with-dns-sd in addition to the default
with-libmdns will still end up using dns-sd, as before.
If only with-libmdns is enabled, that will be the default.
If none of the features is enabled, attempting to build a `Discovery`
will yield an error.

* discovery: add --zeroconf-backend CLI flag

* discovery: Add minimal Avahi zeroconf backend

* bump MSRV to 1.75

required by zbus >= 4

* discovery: ensure that server and dns-sd backend shutdown gracefully

Previously, on drop the the shutdown_tx/close_tx, it wasn't guaranteed
the corresponding tasks would continue to be polled until they actually
completed their shutdown.

Since dns_sd::Service is not Send and non-async, and because libmdns is
non-async, put them on their own threads.

* discovery: use a shared channel for server and zeroconf status messages

* discovery: add Avahi reconnection logic

This deals gracefully with the case where the Avahi daemon is restarted
or not running initially.

* discovery: allow running when compiled without zeroconf backend...

...but exit with an error if there's no way to authenticate

* better error messages for invalid options with no short flag
This commit is contained in:
Benedikt 2024-10-26 16:45:02 +02:00 committed by GitHub
parent d2324ddd1b
commit 94d174c33d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 1156 additions and 129 deletions

View file

@ -21,13 +21,16 @@ hmac = "0.12"
hyper = { version = "1.3", features = ["http1"] }
hyper-util = { version = "0.1", features = ["server-auto", "server-graceful", "service"] }
http-body-util = "0.1.1"
libmdns = "0.9"
libmdns = { version = "0.9", optional = true }
log = "0.4"
rand = "0.8"
serde = { version = "1", default-features = false, features = ["derive"], optional = true }
serde_repr = "0.1"
serde_json = "1.0"
sha1 = "0.10"
thiserror = "1.0"
tokio = { version = "1", features = ["parking_lot", "sync", "rt"] }
zbus = { version = "4", default-features = false, features = ["tokio"], optional = true }
[dependencies.librespot-core]
path = "../core"
@ -39,4 +42,8 @@ hex = "0.4"
tokio = { version = "1", features = ["macros", "parking_lot", "rt"] }
[features]
with-dns-sd = ["dns-sd", "librespot-core/with-dns-sd"]
with-avahi = ["zbus", "serde"]
with-dns-sd = ["dns-sd"]
with-libmdns = ["libmdns"]
default = ["with-libmdns"]

151
discovery/src/avahi.rs Normal file
View file

@ -0,0 +1,151 @@
#![cfg(feature = "with-avahi")]
#[allow(unused)]
pub use server::ServerProxy;
#[allow(unused)]
pub use entry_group::{
EntryGroupProxy, EntryGroupState, StateChangedStream as EntryGroupStateChangedStream,
};
mod server {
// This is not the full interface, just the methods we need!
// Avahi also implements a newer version of the interface ("org.freedesktop.Avahi.Server2"), but
// the additions are not relevant for us, and the older version is not intended to be deprecated.
// cf. the release notes for 0.8 at https://github.com/avahi/avahi/blob/master/docs/NEWS
#[zbus::proxy(
interface = "org.freedesktop.Avahi.Server",
default_service = "org.freedesktop.Avahi",
default_path = "/",
gen_blocking = false
)]
trait Server {
/// EntryGroupNew method
#[zbus(object = "super::entry_group::EntryGroup")]
fn entry_group_new(&self);
/// GetState method
fn get_state(&self) -> zbus::Result<i32>;
/// StateChanged signal
#[zbus(signal)]
fn state_changed(&self, state: i32, error: &str) -> zbus::Result<()>;
}
}
mod entry_group {
use serde_repr::Deserialize_repr;
use zbus::zvariant;
#[derive(Clone, Copy, Debug, Deserialize_repr)]
#[repr(i32)]
pub enum EntryGroupState {
// The group has not yet been committed, the user must still call avahi_entry_group_commit()
Uncommited = 0,
// The entries of the group are currently being registered
Registering = 1,
// The entries have successfully been established
Established = 2,
// A name collision for one of the entries in the group has been detected, the entries have been withdrawn
Collision = 3,
// Some kind of failure happened, the entries have been withdrawn
Failure = 4,
}
impl zvariant::Type for EntryGroupState {
fn signature() -> zvariant::Signature<'static> {
zvariant::Signature::try_from("i").unwrap()
}
}
#[zbus::proxy(
interface = "org.freedesktop.Avahi.EntryGroup",
default_service = "org.freedesktop.Avahi",
gen_blocking = false
)]
trait EntryGroup {
/// AddAddress method
fn add_address(
&self,
interface: i32,
protocol: i32,
flags: u32,
name: &str,
address: &str,
) -> zbus::Result<()>;
/// AddRecord method
#[allow(clippy::too_many_arguments)]
fn add_record(
&self,
interface: i32,
protocol: i32,
flags: u32,
name: &str,
clazz: u16,
type_: u16,
ttl: u32,
rdata: &[u8],
) -> zbus::Result<()>;
/// AddService method
#[allow(clippy::too_many_arguments)]
fn add_service(
&self,
interface: i32,
protocol: i32,
flags: u32,
name: &str,
type_: &str,
domain: &str,
host: &str,
port: u16,
txt: &[&[u8]],
) -> zbus::Result<()>;
/// AddServiceSubtype method
#[allow(clippy::too_many_arguments)]
fn add_service_subtype(
&self,
interface: i32,
protocol: i32,
flags: u32,
name: &str,
type_: &str,
domain: &str,
subtype: &str,
) -> zbus::Result<()>;
/// Commit method
fn commit(&self) -> zbus::Result<()>;
/// Free method
fn free(&self) -> zbus::Result<()>;
/// GetState method
fn get_state(&self) -> zbus::Result<EntryGroupState>;
/// IsEmpty method
fn is_empty(&self) -> zbus::Result<bool>;
/// Reset method
fn reset(&self) -> zbus::Result<()>;
/// UpdateServiceTxt method
#[allow(clippy::too_many_arguments)]
fn update_service_txt(
&self,
interface: i32,
protocol: i32,
flags: u32,
name: &str,
type_: &str,
domain: &str,
txt: &[&[u8]],
) -> zbus::Result<()>;
/// StateChanged signal
#[zbus(signal)]
fn state_changed(&self, state: EntryGroupState, error: &str) -> zbus::Result<()>;
}
}

View file

@ -7,17 +7,19 @@
//! This library uses mDNS and DNS-SD so that other devices can find it,
//! and spawns an http server to answer requests of Spotify clients.
mod avahi;
mod server;
use std::{
borrow::Cow,
io,
error::Error as StdError,
pin::Pin,
task::{Context, Poll},
};
use futures_core::Stream;
use thiserror::Error;
use tokio::sync::{mpsc, oneshot};
use self::server::DiscoveryServer;
@ -30,6 +32,88 @@ pub use crate::core::authentication::Credentials;
/// Determining the icon in the list of available devices.
pub use crate::core::config::DeviceType;
pub enum DiscoveryEvent {
Credentials(Credentials),
ServerError(DiscoveryError),
ZeroconfError(DiscoveryError),
}
enum ZeroconfCmd {
Shutdown,
}
pub struct DnsSdHandle {
task_handle: tokio::task::JoinHandle<()>,
shutdown_tx: oneshot::Sender<ZeroconfCmd>,
}
impl DnsSdHandle {
async fn shutdown(self) {
log::debug!("Shutting down zeroconf responder");
let Self {
task_handle,
shutdown_tx,
} = self;
if shutdown_tx.send(ZeroconfCmd::Shutdown).is_err() {
log::warn!("Zeroconf responder unexpectedly disappeared");
} else {
let _ = task_handle.await;
log::debug!("Zeroconf responder stopped");
}
}
}
pub type DnsSdServiceBuilder = fn(
Cow<'static, str>,
Vec<std::net::IpAddr>,
u16,
mpsc::UnboundedSender<DiscoveryEvent>,
) -> Result<DnsSdHandle, Error>;
// Default goes first: This matches the behaviour when feature flags were exlusive, i.e. when there
// was only `feature = "with-dns-sd"` or `not(feature = "with-dns-sd")`
pub const BACKENDS: &[(
&str,
// If None, the backend is known but wasn't compiled.
Option<DnsSdServiceBuilder>,
)] = &[
#[cfg(feature = "with-avahi")]
("avahi", Some(launch_avahi)),
#[cfg(not(feature = "with-avahi"))]
("avahi", None),
#[cfg(feature = "with-dns-sd")]
("dns-sd", Some(launch_dns_sd)),
#[cfg(not(feature = "with-dns-sd"))]
("dns-sd", None),
#[cfg(feature = "with-libmdns")]
("libmdns", Some(launch_libmdns)),
#[cfg(not(feature = "with-libmdns"))]
("libmdns", None),
];
pub fn find(name: Option<&str>) -> Result<DnsSdServiceBuilder, Error> {
if let Some(ref name) = name {
match BACKENDS.iter().find(|(id, _)| name == id) {
Some((_id, Some(launch_svc))) => Ok(*launch_svc),
Some((_id, None)) => Err(Error::unavailable(format!(
"librespot built without '{}' support",
name
))),
None => Err(Error::not_found(format!(
"unknown zeroconf backend '{}'",
name
))),
}
} else {
BACKENDS
.iter()
.find_map(|(_, launch_svc)| *launch_svc)
.ok_or(Error::unavailable(
"librespot built without zeroconf backends",
))
}
}
/// Makes this device visible to Spotify clients in the local network.
///
/// `Discovery` implements the [`Stream`] trait. Every time this device
@ -37,10 +121,11 @@ pub use crate::core::config::DeviceType;
pub struct Discovery {
server: DiscoveryServer,
#[cfg(not(feature = "with-dns-sd"))]
_svc: libmdns::Service,
#[cfg(feature = "with-dns-sd")]
_svc: dns_sd::DNSService,
/// An opaque handle to the DNS-SD service. Dropping this will unregister the service.
#[allow(unused)]
svc: DnsSdHandle,
event_rx: mpsc::UnboundedReceiver<DiscoveryEvent>,
}
/// A builder for [`Discovery`].
@ -48,6 +133,7 @@ pub struct Builder {
server_config: server::Config,
port: u16,
zeroconf_ip: Vec<std::net::IpAddr>,
zeroconf_backend: Option<DnsSdServiceBuilder>,
}
/// Errors that can occur while setting up a [`Discovery`] instance.
@ -55,16 +141,27 @@ pub struct Builder {
pub enum DiscoveryError {
#[error("Creating SHA1 block cipher failed")]
AesError(#[from] aes::cipher::InvalidLength),
#[error("Setting up dns-sd failed: {0}")]
DnsSdError(#[from] io::Error),
DnsSdError(#[source] Box<dyn StdError + Send + Sync>),
#[error("Creating SHA1 HMAC failed for base key {0:?}")]
HmacError(Vec<u8>),
#[error("Setting up the HTTP server failed: {0}")]
HttpServerError(#[from] hyper::Error),
#[error("Missing params for key {0}")]
ParamsError(&'static str),
}
#[cfg(feature = "with-avahi")]
impl From<zbus::Error> for DiscoveryError {
fn from(error: zbus::Error) -> Self {
Self::DnsSdError(Box::new(error))
}
}
impl From<DiscoveryError> for Error {
fn from(err: DiscoveryError) -> Self {
match err {
@ -77,6 +174,264 @@ impl From<DiscoveryError> for Error {
}
}
#[allow(unused)]
const DNS_SD_SERVICE_NAME: &str = "_spotify-connect._tcp";
#[allow(unused)]
const TXT_RECORD: [&str; 2] = ["VERSION=1.0", "CPath=/"];
#[cfg(feature = "with-avahi")]
async fn avahi_task(
name: Cow<'static, str>,
port: u16,
entry_group: &mut Option<avahi::EntryGroupProxy<'_>>,
) -> Result<(), DiscoveryError> {
use self::avahi::{EntryGroupState, ServerProxy};
use futures_util::StreamExt;
let conn = zbus::Connection::system().await?;
// Wait for the daemon to show up.
// On error: Failed to listen for NameOwnerChanged signal => Fatal DBus issue
let bus = zbus::fdo::DBusProxy::new(&conn).await?;
let mut stream = bus
.receive_name_owner_changed_with_args(&[(0, "org.freedesktop.Avahi")])
.await?;
loop {
// Wait for Avahi daemon to be started
'wait_avahi: {
while let Poll::Ready(Some(_)) = futures_util::poll!(stream.next()) {
// Drain queued name owner changes, since we're going to connect in a second
}
// Ping after we connected to the signal since it might have shown up in the meantime
if let Ok(avahi_peer) =
zbus::fdo::PeerProxy::new(&conn, "org.freedesktop.Avahi", "/").await
{
if avahi_peer.ping().await.is_ok() {
log::debug!("Pinged Avahi: Available");
break 'wait_avahi;
}
}
log::warn!("Failed to connect to Avahi, zeroconf discovery will not work until avahi-daemon is started. Check that it is installed and running");
// If it didn't, wait for the signal
match stream.next().await {
Some(_signal) => {
log::debug!("Avahi appeared");
break 'wait_avahi;
}
// The stream ended, but this should never happen
None => {
return Err(zbus::Error::Failure("DBus disappeared".to_owned()).into());
}
}
}
// Connect to Avahi and publish the service
let avahi_server = ServerProxy::new(&conn).await?;
log::trace!("Connected to Avahi");
*entry_group = Some(avahi_server.entry_group_new().await?);
let mut entry_group_state_stream = entry_group
.as_mut()
.unwrap()
.receive_state_changed()
.await?;
entry_group
.as_mut()
.unwrap()
.add_service(
-1, // AVAHI_IF_UNSPEC
-1, // IPv4 and IPv6
0, // flags
&name,
DNS_SD_SERVICE_NAME, // type
"", // domain: let the server choose
"", // host: let the server choose
port,
&TXT_RECORD.map(|s| s.as_bytes()),
)
.await?;
entry_group.as_mut().unwrap().commit().await?;
log::debug!("Commited zeroconf service with name {}", &name);
'monitor_service: loop {
tokio::select! {
Some(state_changed) = entry_group_state_stream.next() => {
let (state, error) = match state_changed.args() {
Ok(sc) => (sc.state, sc.error),
Err(e) => {
log::warn!("Error on receiving EntryGroup state from Avahi: {}", e);
continue 'monitor_service;
}
};
match state {
EntryGroupState::Uncommited | EntryGroupState::Registering => {
// Not yet registered, ignore.
}
EntryGroupState::Established => {
log::info!("Published zeroconf service");
}
EntryGroupState::Collision => {
// This most likely means that librespot has unintentionally been started twice.
// Thus, don't retry with a new name, but abort.
//
// Note that the error would usually already be returned by
// entry_group.add_service above, so this state_changed handler
// won't be hit.
//
// EntryGroup has been withdrawn at this point already!
log::error!("zeroconf collision for name '{}'", &name);
return Err(zbus::Error::Failure(format!("zeroconf collision for name: {}", name)).into());
}
EntryGroupState::Failure => {
// TODO: Back off/treat as fatal?
// EntryGroup has been withdrawn at this point already!
// There seems to be no code in Avahi that actually sets this state.
log::error!("zeroconf failure: {}", error);
return Err(zbus::Error::Failure(format!("zeroconf failure: {}", error)).into());
}
}
}
_name_owner_change = stream.next() => {
break 'monitor_service;
}
}
}
// Avahi disappeared (or the service was immediately taken over by a
// new daemon) => drop all handles, and reconnect
log::info!("Avahi disappeared, trying to reconnect");
}
}
#[cfg(feature = "with-avahi")]
fn launch_avahi(
name: Cow<'static, str>,
_zeroconf_ip: Vec<std::net::IpAddr>,
port: u16,
status_tx: mpsc::UnboundedSender<DiscoveryEvent>,
) -> Result<DnsSdHandle, Error> {
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let task_handle = tokio::spawn(async move {
let mut entry_group = None;
tokio::select! {
res = avahi_task(name, port, &mut entry_group) => {
if let Err(e) = res {
log::error!("Avahi error: {}", e);
let _ = status_tx.send(DiscoveryEvent::ZeroconfError(e));
}
},
_ = shutdown_rx => {
if let Some(entry_group) = entry_group.as_mut() {
if let Err(e) = entry_group.free().await {
log::warn!("Failed to un-publish zeroconf service: {}", e);
} else {
log::debug!("Un-published zeroconf service");
}
}
},
}
});
Ok(DnsSdHandle {
task_handle,
shutdown_tx,
})
}
#[cfg(feature = "with-dns-sd")]
fn launch_dns_sd(
name: Cow<'static, str>,
_zeroconf_ip: Vec<std::net::IpAddr>,
port: u16,
status_tx: mpsc::UnboundedSender<DiscoveryEvent>,
) -> Result<DnsSdHandle, Error> {
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let task_handle = tokio::task::spawn_blocking(move || {
let inner = move || -> Result<(), DiscoveryError> {
let svc = dns_sd::DNSService::register(
Some(name.as_ref()),
DNS_SD_SERVICE_NAME,
None,
None,
port,
&TXT_RECORD,
)
.map_err(|e| DiscoveryError::DnsSdError(Box::new(e)))?;
let _ = shutdown_rx.blocking_recv();
std::mem::drop(svc);
Ok(())
};
if let Err(e) = inner() {
log::error!("dns_sd error: {}", e);
let _ = status_tx.send(DiscoveryEvent::ZeroconfError(e));
}
});
Ok(DnsSdHandle {
shutdown_tx,
task_handle,
})
}
#[cfg(feature = "with-libmdns")]
fn launch_libmdns(
name: Cow<'static, str>,
zeroconf_ip: Vec<std::net::IpAddr>,
port: u16,
status_tx: mpsc::UnboundedSender<DiscoveryEvent>,
) -> Result<DnsSdHandle, Error> {
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let task_handle = tokio::task::spawn_blocking(move || {
let inner = move || -> Result<(), DiscoveryError> {
let svc = if !zeroconf_ip.is_empty() {
libmdns::Responder::spawn_with_ip_list(
&tokio::runtime::Handle::current(),
zeroconf_ip,
)
} else {
libmdns::Responder::spawn(&tokio::runtime::Handle::current())
}
.map_err(|e| DiscoveryError::DnsSdError(Box::new(e)))
.unwrap()
.register(
DNS_SD_SERVICE_NAME.to_owned(),
name.into_owned(),
port,
&TXT_RECORD,
);
let _ = shutdown_rx.blocking_recv();
std::mem::drop(svc);
Ok(())
};
if let Err(e) = inner() {
log::error!("libmdns error: {}", e);
let _ = status_tx.send(DiscoveryEvent::ZeroconfError(e));
}
});
Ok(DnsSdHandle {
shutdown_tx,
task_handle,
})
}
impl Builder {
/// Starts a new builder using the provided device and client IDs.
pub fn new<T: Into<String>>(device_id: T, client_id: T) -> Self {
@ -90,6 +445,7 @@ impl Builder {
},
port: 0,
zeroconf_ip: vec![],
zeroconf_backend: None,
}
}
@ -117,6 +473,12 @@ impl Builder {
self
}
/// Set the zeroconf (MDNS and DNS-SD) implementation to use.
pub fn zeroconf_backend(mut self, zeroconf_backend: DnsSdServiceBuilder) -> Self {
self.zeroconf_backend = Some(zeroconf_backend);
self
}
/// Sets the port on which it should listen to incoming connections.
/// The default value `0` means any port.
pub fn port(mut self, port: u16) -> Self {
@ -129,43 +491,21 @@ impl Builder {
/// # Errors
/// If setting up the mdns service or creating the server fails, this function returns an error.
pub fn launch(self) -> Result<Discovery, Error> {
let name = self.server_config.name.clone();
let zeroconf_ip = self.zeroconf_ip;
let (event_tx, event_rx) = mpsc::unbounded_channel();
let mut port = self.port;
let name = self.server_config.name.clone().into_owned();
let server = DiscoveryServer::new(self.server_config, &mut port)?;
let _zeroconf_ip = self.zeroconf_ip;
let svc;
let server = DiscoveryServer::new(self.server_config, &mut port, event_tx.clone())?;
#[cfg(feature = "with-dns-sd")]
{
svc = dns_sd::DNSService::register(
Some(name.as_ref()),
"_spotify-connect._tcp",
None,
None,
port,
&["VERSION=1.0", "CPath=/"],
)?;
}
#[cfg(not(feature = "with-dns-sd"))]
{
let _svc = if !_zeroconf_ip.is_empty() {
libmdns::Responder::spawn_with_ip_list(
&tokio::runtime::Handle::current(),
_zeroconf_ip,
)?
} else {
libmdns::Responder::spawn(&tokio::runtime::Handle::current())?
};
svc = _svc.register(
"_spotify-connect._tcp".to_owned(),
name,
port,
&["VERSION=1.0", "CPath=/"],
);
}
Ok(Discovery { server, _svc: svc })
let launch_svc = self.zeroconf_backend.unwrap_or(find(None)?);
let svc = launch_svc(name, zeroconf_ip, port, event_tx)?;
Ok(Discovery {
server,
svc,
event_rx,
})
}
}
@ -179,12 +519,25 @@ impl Discovery {
pub fn new<T: Into<String>>(device_id: T, client_id: T) -> Result<Self, Error> {
Self::builder(device_id, client_id).launch()
}
pub async fn shutdown(self) {
tokio::join!(self.server.shutdown(), self.svc.shutdown(),);
}
}
impl Stream for Discovery {
type Item = Credentials;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.server).poll_next(cx)
match Pin::new(&mut self.event_rx).poll_recv(cx) {
// Yields credentials
Poll::Ready(Some(DiscoveryEvent::Credentials(creds))) => Poll::Ready(Some(creds)),
// Also terminate the stream on fatal server or MDNS/DNS-SD errors.
Poll::Ready(Some(
DiscoveryEvent::ServerError(_) | DiscoveryEvent::ZeroconfError(_),
)) => Poll::Ready(None),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}

View file

@ -1,18 +1,14 @@
use std::{
borrow::Cow,
collections::BTreeMap,
convert::Infallible,
net::{Ipv4Addr, Ipv6Addr, SocketAddr, TcpListener},
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
};
use aes::cipher::{KeyIvInit, StreamCipher};
use base64::engine::general_purpose::STANDARD as BASE64;
use base64::engine::Engine as _;
use bytes::Bytes;
use futures_core::Stream;
use futures_util::{FutureExt, TryFutureExt};
use hmac::{Hmac, Mac};
use http_body_util::{BodyExt, Full};
@ -24,7 +20,7 @@ use serde_json::json;
use sha1::{Digest, Sha1};
use tokio::sync::{mpsc, oneshot};
use super::DiscoveryError;
use super::{DiscoveryError, DiscoveryEvent};
use crate::{
core::config::DeviceType,
@ -47,21 +43,17 @@ struct RequestHandler {
config: Config,
username: Mutex<Option<String>>,
keys: DhLocalKeys,
tx: mpsc::UnboundedSender<Credentials>,
event_tx: mpsc::UnboundedSender<DiscoveryEvent>,
}
impl RequestHandler {
fn new(config: Config) -> (Self, mpsc::UnboundedReceiver<Credentials>) {
let (tx, rx) = mpsc::unbounded_channel();
let discovery = Self {
fn new(config: Config, event_tx: mpsc::UnboundedSender<DiscoveryEvent>) -> Self {
Self {
config,
username: Mutex::new(None),
keys: DhLocalKeys::random(&mut rand::thread_rng()),
tx,
};
(discovery, rx)
event_tx,
}
}
fn active_user(&self) -> String {
@ -202,7 +194,8 @@ impl RequestHandler {
{
let maybe_username = self.username.lock();
self.tx.send(credentials)?;
self.event_tx
.send(DiscoveryEvent::Credentials(credentials))?;
if let Ok(mut username_field) = maybe_username {
*username_field = Some(String::from(username));
} else {
@ -258,14 +251,22 @@ impl RequestHandler {
}
}
pub(crate) enum DiscoveryServerCmd {
Shutdown,
}
pub struct DiscoveryServer {
cred_rx: mpsc::UnboundedReceiver<Credentials>,
_close_tx: oneshot::Sender<Infallible>,
close_tx: oneshot::Sender<DiscoveryServerCmd>,
task_handle: tokio::task::JoinHandle<()>,
}
impl DiscoveryServer {
pub fn new(config: Config, port: &mut u16) -> Result<Self, Error> {
let (discovery, cred_rx) = RequestHandler::new(config);
pub fn new(
config: Config,
port: &mut u16,
event_tx: mpsc::UnboundedSender<DiscoveryEvent>,
) -> Result<Self, Error> {
let discovery = RequestHandler::new(config, event_tx);
let address = if cfg!(windows) {
SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), *port)
} else {
@ -297,7 +298,7 @@ impl DiscoveryServer {
}
}
tokio::spawn(async move {
let task_handle = tokio::spawn(async move {
let discovery = Arc::new(discovery);
let server = hyper::server::conn::http1::Builder::new();
@ -326,27 +327,32 @@ impl DiscoveryServer {
});
}
_ = &mut close_rx => {
debug!("Shutting down discovery server");
break;
}
}
}
graceful.shutdown().await;
debug!("Discovery server stopped");
});
Ok(Self {
cred_rx,
_close_tx: close_tx,
close_tx,
task_handle,
})
}
}
impl Stream for DiscoveryServer {
type Item = Credentials;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Credentials>> {
self.cred_rx.poll_recv(cx)
pub async fn shutdown(self) {
let Self {
close_tx,
task_handle,
..
} = self;
log::debug!("Shutting down discovery server");
if close_tx.send(DiscoveryServerCmd::Shutdown).is_err() {
log::warn!("Discovery server unexpectedly disappeared");
} else {
let _ = task_handle.await;
log::debug!("Discovery server stopped");
}
}
}