1
0
Fork 0
mirror of https://github.com/librespot-org/librespot.git synced 2025-10-03 01:39:28 +02:00

Migrate librespot-connect to tokio 1.0

This commit is contained in:
johannesd3 2021-02-20 00:17:18 +01:00 committed by Johannes Dertmann
parent afacaea15f
commit daf7ecd23a
7 changed files with 478 additions and 370 deletions

View file

@ -102,46 +102,48 @@ impl MercuryManager {
MercurySender::new(self.clone(), uri.into())
}
pub async fn subscribe<T: Into<String>>(
pub fn subscribe<T: Into<String>>(
&self,
uri: T,
) -> Result<mpsc::UnboundedReceiver<MercuryResponse>, MercuryError> {
) -> impl Future<Output = Result<mpsc::UnboundedReceiver<MercuryResponse>, MercuryError>> + 'static
{
let uri = uri.into();
let response = self
.request(MercuryRequest {
method: MercuryMethod::SUB,
uri: uri.clone(),
content_type: None,
payload: Vec::new(),
})
.await?;
let (tx, rx) = mpsc::unbounded();
let manager = self.clone();
manager.lock(move |inner| {
if !inner.invalid {
debug!("subscribed uri={} count={}", uri, response.payload.len());
if !response.payload.is_empty() {
// Old subscription protocol, watch the provided list of URIs
for sub in response.payload {
let mut sub: protocol::pubsub::Subscription =
protobuf::parse_from_bytes(&sub).unwrap();
let sub_uri = sub.take_uri();
debug!("subscribed sub_uri={}", sub_uri);
inner.subscriptions.push((sub_uri, tx.clone()));
}
} else {
// New subscription protocol, watch the requested URI
inner.subscriptions.push((uri, tx));
}
}
let request = self.request(MercuryRequest {
method: MercuryMethod::SUB,
uri: uri.clone(),
content_type: None,
payload: Vec::new(),
});
Ok(rx)
let manager = self.clone();
async move {
let response = request.await?;
let (tx, rx) = mpsc::unbounded();
manager.lock(move |inner| {
if !inner.invalid {
debug!("subscribed uri={} count={}", uri, response.payload.len());
if !response.payload.is_empty() {
// Old subscription protocol, watch the provided list of URIs
for sub in response.payload {
let mut sub: protocol::pubsub::Subscription =
protobuf::parse_from_bytes(&sub).unwrap();
let sub_uri = sub.take_uri();
debug!("subscribed sub_uri={}", sub_uri);
inner.subscriptions.push((sub_uri, tx.clone()));
}
} else {
// New subscription protocol, watch the requested URI
inner.subscriptions.push((uri, tx));
}
}
});
Ok(rx)
}
}
pub(crate) fn dispatch(&self, cmd: u8, mut data: Bytes) {