diff --git a/Cargo.lock b/Cargo.lock index 282888dd..7fe8387d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1453,7 +1453,9 @@ dependencies = [ "cfg-if 1.0.0", "env_logger", "error-chain", - "futures", + "futures-core", + "futures-sink", + "futures-util", "hmac", "httparse", "hyper", @@ -1464,15 +1466,14 @@ dependencies = [ "num-traits", "once_cell", "pbkdf2", - "pin-project-lite", "protobuf", - "rand 0.7.3", + "rand 0.8.3", "serde", - "serde_derive", "serde_json", "sha-1 0.9.4", "shannon", "tokio", + "tokio-stream", "tokio-util", "url 1.7.2", "vergen", @@ -2819,6 +2820,17 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-stream" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1981ad97df782ab506a1f43bf82c967326960d278acf3bf8279809648c3ff3ea" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.6.3" diff --git a/core/Cargo.toml b/core/Cargo.toml index f1a15eb1..85f3be62 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -19,7 +19,9 @@ byteorder = "1.4" bytes = "1.0" cfg-if = "1" error-chain = { version = "0.12", default-features = false } -futures = { version = "0.3", features = ["bilock", "unstable"] } +futures-core = { version = "0.3", default-features = false } +futures-sink = { version = "0.3", default-features = false } +futures-util = { version = "0.3", default-features = false, features = ["alloc", "bilock", "unstable", "sink"] } hmac = "0.10" httparse = "1.3" hyper = { version = "0.14", optional = true, features = ["client", "tcp", "http1"] } @@ -28,21 +30,20 @@ num-bigint = "0.3" num-integer = "0.1" num-traits = "0.2" once_cell = "1.5.2" -pbkdf2 = { version = "0.7", default_features = false, features = ["hmac"] } -pin-project-lite = "0.2.4" +pbkdf2 = { version = "0.7", default-features = false, features = ["hmac"] } protobuf = "~2.14.0" -rand = "0.7" -serde = "1.0" -serde_derive = "1.0" +rand = "0.8" +serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" sha-1 = "0.9" shannon = "0.2.0" -tokio = { version = "1.0", features = ["io-util", "rt-multi-thread"] } +tokio = { version = "1.0", features = ["io-util", "net", "rt", "sync"] } +tokio-stream = "0.1" tokio-util = { version = "0.6", features = ["codec"] } url = "1.7" [build-dependencies] -rand = "0.7" +rand = "0.8" vergen = "3.0.4" [dev-dependencies] diff --git a/core/build.rs b/core/build.rs index 83f50472..0fc29335 100644 --- a/core/build.rs +++ b/core/build.rs @@ -7,10 +7,10 @@ fn main() { flags.toggle(ConstantsFlags::REBUILD_ON_HEAD_CHANGE); generate_cargo_keys(ConstantsFlags::all()).expect("Unable to generate the cargo keys!"); - let mut rng = rand::thread_rng(); - let build_id: String = ::std::iter::repeat(()) - .map(|()| rng.sample(Alphanumeric)) + let build_id: String = rand::thread_rng() + .sample_iter(Alphanumeric) .take(8) + .map(char::from) .collect(); println!("cargo:rustc-env=VERGEN_BUILD_ID={}", build_id); } diff --git a/core/src/apresolve.rs b/core/src/apresolve.rs index cd354d88..7698691c 100644 --- a/core/src/apresolve.rs +++ b/core/src/apresolve.rs @@ -9,6 +9,7 @@ cfg_if! { use std::error::Error; use hyper::{Body, Client, Method, Request, Uri}; + use serde::{Serialize, Deserialize}; use crate::proxytunnel::ProxyTunnel; diff --git a/core/src/audio_key.rs b/core/src/audio_key.rs index b9f0c232..3bce1c73 100644 --- a/core/src/audio_key.rs +++ b/core/src/audio_key.rs @@ -1,8 +1,8 @@ use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; use bytes::Bytes; -use futures::channel::oneshot; use std::collections::HashMap; use std::io::Write; +use tokio::sync::oneshot; use crate::spotify_id::{FileId, SpotifyId}; use crate::util::SeqGenerator; diff --git a/core/src/authentication.rs b/core/src/authentication.rs index fa570409..ff477df5 100644 --- a/core/src/authentication.rs +++ b/core/src/authentication.rs @@ -1,10 +1,13 @@ +use std::io::{self, Read}; +use std::ops::FnOnce; + use aes::Aes192; use byteorder::{BigEndian, ByteOrder}; use hmac::Hmac; use pbkdf2::pbkdf2; use protobuf::ProtobufEnum; +use serde::{Deserialize, Serialize}; use sha1::{Digest, Sha1}; -use std::io::{self, Read}; use crate::protocol::authentication::AuthenticationType; use crate::protocol::keyexchange::{APLoginFailed, ErrorCode}; diff --git a/core/src/channel.rs b/core/src/channel.rs index 7ada05d5..54eee184 100644 --- a/core/src/channel.rs +++ b/core/src/channel.rs @@ -1,12 +1,14 @@ +use std::collections::HashMap; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Instant; + use byteorder::{BigEndian, ByteOrder}; use bytes::Bytes; -use futures::{channel::mpsc, lock::BiLock, Stream, StreamExt}; -use std::{ - collections::HashMap, - pin::Pin, - task::{Context, Poll}, - time::Instant, -}; +use futures_core::Stream; +use futures_util::lock::BiLock; +use futures_util::StreamExt; +use tokio::sync::mpsc; use crate::util::SeqGenerator; @@ -46,7 +48,7 @@ enum ChannelState { impl ChannelManager { pub fn allocate(&self) -> (u16, Channel) { - let (tx, rx) = mpsc::unbounded(); + let (tx, rx) = mpsc::unbounded_channel(); let seq = self.lock(|inner| { let seq = inner.sequence.get(); @@ -85,7 +87,7 @@ impl ChannelManager { inner.download_measurement_bytes += data.len(); if let Entry::Occupied(entry) = inner.channels.entry(id) { - let _ = entry.get().unbounded_send((cmd, data)); + let _ = entry.get().send((cmd, data)); } }); } @@ -105,7 +107,7 @@ impl ChannelManager { impl Channel { fn recv_packet(&mut self, cx: &mut Context<'_>) -> Poll> { - let (cmd, packet) = match self.receiver.poll_next_unpin(cx) { + let (cmd, packet) = match self.receiver.poll_recv(cx) { Poll::Pending => return Poll::Pending, Poll::Ready(o) => o.ok_or(ChannelError)?, }; diff --git a/core/src/connection/mod.rs b/core/src/connection/mod.rs index 68e2e7a5..6bdbde6a 100644 --- a/core/src/connection/mod.rs +++ b/core/src/connection/mod.rs @@ -4,7 +4,7 @@ mod handshake; pub use self::codec::APCodec; pub use self::handshake::handshake; -use futures::{SinkExt, StreamExt}; +use futures_util::{SinkExt, StreamExt}; use protobuf::{self, Message}; use std::io; use std::net::ToSocketAddrs; diff --git a/core/src/keymaster.rs b/core/src/keymaster.rs index 87b3f1e3..8c3c00a2 100644 --- a/core/src/keymaster.rs +++ b/core/src/keymaster.rs @@ -1,3 +1,5 @@ +use serde::Deserialize; + use crate::{mercury::MercuryError, session::Session}; #[derive(Deserialize, Debug, Clone)] diff --git a/core/src/lib.rs b/core/src/lib.rs index 65fa898a..54f83f17 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -5,10 +5,6 @@ extern crate log; #[macro_use] extern crate cfg_if; #[macro_use] -extern crate serde_derive; -#[macro_use] -extern crate pin_project_lite; -#[macro_use] extern crate error_chain; use librespot_protocol as protocol; diff --git a/core/src/mercury/mod.rs b/core/src/mercury/mod.rs index 4baa674f..537ff2cb 100644 --- a/core/src/mercury/mod.rs +++ b/core/src/mercury/mod.rs @@ -1,14 +1,17 @@ +use std::collections::HashMap; +use std::future::Future; +use std::mem; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +use byteorder::{BigEndian, ByteOrder}; +use bytes::Bytes; +use tokio::sync::{mpsc, oneshot}; + use crate::protocol; use crate::util::url_encode; use crate::util::SeqGenerator; -use byteorder::{BigEndian, ByteOrder}; -use bytes::Bytes; -use futures::{ - channel::{mpsc, oneshot}, - Future, -}; -use std::{collections::HashMap, task::Poll}; -use std::{mem, pin::Pin, task::Context}; mod types; pub use self::types::*; @@ -31,18 +34,15 @@ pub struct MercuryPending { callback: Option>>, } -pin_project! { - pub struct MercuryFuture { - #[pin] - receiver: oneshot::Receiver> - } +pub struct MercuryFuture { + receiver: oneshot::Receiver>, } impl Future for MercuryFuture { type Output = Result; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.project().receiver.poll(cx) { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match Pin::new(&mut self.receiver).poll(cx) { Poll::Ready(Ok(x)) => Poll::Ready(x), Poll::Ready(Err(_)) => Poll::Ready(Err(MercuryError)), Poll::Pending => Poll::Pending, @@ -119,7 +119,7 @@ impl MercuryManager { async move { let response = request.await?; - let (tx, rx) = mpsc::unbounded(); + let (tx, rx) = mpsc::unbounded_channel(); manager.lock(move |inner| { if !inner.invalid { @@ -221,7 +221,7 @@ impl MercuryManager { // if send fails, remove from list of subs // TODO: send unsub message - sub.unbounded_send(response.clone()).is_ok() + sub.send(response.clone()).is_ok() } else { // URI doesn't match true diff --git a/core/src/session.rs b/core/src/session.rs index b0eca0c0..858a0b69 100644 --- a/core/src/session.rs +++ b/core/src/session.rs @@ -1,14 +1,19 @@ +use std::future::Future; +use std::io; +use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, RwLock, Weak}; +use std::task::Context; use std::task::Poll; use std::time::{SystemTime, UNIX_EPOCH}; -use std::{io, pin::Pin, task::Context}; - -use once_cell::sync::OnceCell; use byteorder::{BigEndian, ByteOrder}; use bytes::Bytes; -use futures::{channel::mpsc, Future, FutureExt, StreamExt, TryStream, TryStreamExt}; +use futures_core::TryStream; +use futures_util::{FutureExt, StreamExt, TryStreamExt}; +use once_cell::sync::OnceCell; +use tokio::sync::mpsc; +use tokio_stream::wrappers::UnboundedReceiverStream; use crate::apresolve::apresolve_or_fallback; use crate::audio_key::AudioKeyManager; @@ -87,7 +92,7 @@ impl Session { ) -> Session { let (sink, stream) = transport.split(); - let (sender_tx, sender_rx) = mpsc::unbounded(); + let (sender_tx, sender_rx) = mpsc::unbounded_channel(); let session_id = SESSION_COUNTER.fetch_add(1, Ordering::Relaxed); debug!("new Session[{}]", session_id); @@ -114,11 +119,13 @@ impl Session { session_id: session_id, })); - let sender_task = sender_rx.map(Ok::<_, io::Error>).forward(sink); + let sender_task = UnboundedReceiverStream::new(sender_rx) + .map(Ok) + .forward(sink); let receiver_task = DispatchTask(stream, session.weak()); let task = - futures::future::join(sender_task, receiver_task).map(|_| io::Result::<_>::Ok(())); + futures_util::future::join(sender_task, receiver_task).map(|_| io::Result::<_>::Ok(())); tokio::spawn(task); session } @@ -193,7 +200,7 @@ impl Session { } pub fn send_packet(&self, cmd: u8, data: Vec) { - self.0.tx_connection.unbounded_send((cmd, data)).unwrap(); + self.0.tx_connection.send((cmd, data)).unwrap(); } pub fn cache(&self) -> Option<&Arc> {