From 40e6355c34a72f7ac0980770082e06099eb91451 Mon Sep 17 00:00:00 2001 From: johannesd3 Date: Thu, 21 Jan 2021 21:49:39 +0100 Subject: [PATCH] Migrate core to tokio 1.0 --- core/Cargo.toml | 26 +++-- core/src/apresolve.rs | 126 +++++++++-------------- core/src/audio_key.rs | 22 +--- core/src/authentication.rs | 10 +- core/src/channel.rs | 91 +++++++++------- core/src/component.rs | 26 ----- core/src/connection/codec.rs | 7 +- core/src/connection/handshake.rs | 171 +++++++++---------------------- core/src/connection/mod.rs | 122 +++++++++------------- core/src/diffie_hellman.rs | 12 +-- core/src/keymaster.rs | 20 ++-- core/src/lib.rs | 24 ++--- core/src/mercury/mod.rs | 94 ++++++++--------- core/src/mercury/sender.rs | 35 ++++--- core/src/proxytunnel.rs | 135 +++++++----------------- core/src/session.rs | 146 +++++++++++--------------- 16 files changed, 406 insertions(+), 661 deletions(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index 8511878c..a9fcc246 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -13,34 +13,32 @@ path = "../protocol" version = "0.1.3" [dependencies] +aes = "0.6" base64 = "0.13" -byteorder = "1.3" -bytes = "0.4" -error-chain = { version = "0.12", default_features = false } -futures = "0.1" +byteorder = "1.4" +bytes = "1.0" +futures = { version = "0.3", features = ["bilock", "unstable"] } +hmac = "0.7" httparse = "1.3" -hyper = "0.11" -hyper-proxy = { version = "0.4", default_features = false } -lazy_static = "1.3" +hyper = { version = "0.14", features = ["client", "tcp", "http1", "http2", "stream"] } log = "0.4" num-bigint = "0.3" num-integer = "0.1" num-traits = "0.2" +once_cell = "1.5.2" +pbkdf2 = "0.3" +pin-project = "1.0" protobuf = "~2.14.0" rand = "0.7" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" +sha-1 = "~0.8" shannon = "0.2.0" -tokio-codec = "0.1" -tokio-core = "0.1" -tokio-io = "0.1" +tokio = { version = "1.0", features = ["io-util", "rt-multi-thread", "macros" ] } +tokio-util = { version = "0.6", features = ["codec"] } url = "1.7" uuid = { version = "0.8", features = ["v4"] } -sha-1 = "0.8" -hmac = "0.7" -pbkdf2 = "0.3" -aes = "0.3" [build-dependencies] rand = "0.7" diff --git a/core/src/apresolve.rs b/core/src/apresolve.rs index 94d94244..07c2958f 100644 --- a/core/src/apresolve.rs +++ b/core/src/apresolve.rs @@ -1,101 +1,69 @@ const AP_FALLBACK: &'static str = "ap.spotify.com:443"; const APRESOLVE_ENDPOINT: &'static str = "http://apresolve.spotify.com/"; -use futures::{Future, Stream}; -use hyper::client::HttpConnector; -use hyper::{self, Client, Method, Request, Uri}; -use hyper_proxy::{Intercept, Proxy, ProxyConnector}; -use serde_json; -use std::str::FromStr; -use tokio_core::reactor::Handle; +use hyper::{Body, Client, Method, Request, Uri}; +use std::error::Error; use url::Url; -error_chain! {} - #[derive(Clone, Debug, Serialize, Deserialize)] pub struct APResolveData { ap_list: Vec, } -fn apresolve( - handle: &Handle, - proxy: &Option, - ap_port: &Option, -) -> Box> { - let url = Uri::from_str(APRESOLVE_ENDPOINT).expect("invalid AP resolve URL"); - let use_proxy = proxy.is_some(); +async fn apresolve(proxy: &Option, ap_port: &Option) -> Result> { + let port = ap_port.unwrap_or(443); - let mut req = Request::new(Method::Get, url.clone()); - let response = match *proxy { - Some(ref val) => { - let proxy_url = Uri::from_str(val.as_str()).expect("invalid http proxy"); - let proxy = Proxy::new(Intercept::All, proxy_url); - let connector = HttpConnector::new(4, handle); + let req = Request::builder() + .method(Method::GET) + .uri( + APRESOLVE_ENDPOINT + .parse::() + .expect("invalid AP resolve URL"), + ) + .body(Body::empty())?; + + let client = if proxy.is_some() { + todo!("proxies not yet supported") + /*let proxy = { + let proxy_url = val.as_str().parse().expect("invalid http proxy"); + let mut proxy = Proxy::new(Intercept::All, proxy_url); + let connector = HttpConnector::new(); let proxy_connector = ProxyConnector::from_proxy_unsecured(connector, proxy); - if let Some(headers) = proxy_connector.http_headers(&url) { - req.headers_mut().extend(headers.iter()); - req.set_proxy(true); - } - let client = Client::configure().connector(proxy_connector).build(handle); - client.request(req) - } - _ => { - let client = Client::new(handle); - client.request(req) - } + proxy_connector + }; + + if let Some(headers) = proxy.http_headers(&APRESOLVE_ENDPOINT.parse().unwrap()) { + req.headers_mut().extend(headers.clone()); + }; + Client::builder().build(proxy)*/ + } else { + Client::new() }; - let body = response.and_then(|response| { - response.body().fold(Vec::new(), |mut acc, chunk| { - acc.extend_from_slice(chunk.as_ref()); - Ok::<_, hyper::Error>(acc) - }) - }); - let body = body.then(|result| result.chain_err(|| "HTTP error")); - let body = - body.and_then(|body| String::from_utf8(body).chain_err(|| "invalid UTF8 in response")); + let response = client.request(req).await?; - let data = body - .and_then(|body| serde_json::from_str::(&body).chain_err(|| "invalid JSON")); + let body = hyper::body::to_bytes(response.into_body()).await?; + let data: APResolveData = serde_json::from_slice(body.as_ref())?; - let p = ap_port.clone(); - - let ap = data.and_then(move |data| { - let mut aps = data.ap_list.iter().filter(|ap| { - if p.is_some() { - Uri::from_str(ap).ok().map_or(false, |uri| { - uri.port().map_or(false, |port| port == p.unwrap()) - }) - } else if use_proxy { - // It is unlikely that the proxy will accept CONNECT on anything other than 443. - Uri::from_str(ap) - .ok() - .map_or(false, |uri| uri.port().map_or(false, |port| port == 443)) + let ap = if ap_port.is_some() || proxy.is_some() { + data.ap_list.into_iter().find_map(|ap| { + if ap.parse::().ok()?.port()? == port { + Some(ap) } else { - true + None } - }); - - let ap = aps.next().ok_or("empty AP List")?; - Ok(ap.clone()) - }); - - Box::new(ap) + }) + } else { + data.ap_list.into_iter().next() + } + .ok_or("empty AP List")?; + Ok(ap) } -pub(crate) fn apresolve_or_fallback( - handle: &Handle, - proxy: &Option, - ap_port: &Option, -) -> Box> -where - E: 'static, -{ - let ap = apresolve(handle, proxy, ap_port).or_else(|e| { - warn!("Failed to resolve Access Point: {}", e.description()); +pub async fn apresolve_or_fallback(proxy: &Option, ap_port: &Option) -> String { + apresolve(proxy, ap_port).await.unwrap_or_else(|e| { + warn!("Failed to resolve Access Point: {}", e); warn!("Using fallback \"{}\"", AP_FALLBACK); - Ok(AP_FALLBACK.into()) - }); - - Box::new(ap) + AP_FALLBACK.into() + }) } diff --git a/core/src/audio_key.rs b/core/src/audio_key.rs index 1e5310c2..b9f0c232 100644 --- a/core/src/audio_key.rs +++ b/core/src/audio_key.rs @@ -1,7 +1,6 @@ use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; use bytes::Bytes; -use futures::sync::oneshot; -use futures::{Async, Future, Poll}; +use futures::channel::oneshot; use std::collections::HashMap; use std::io::Write; @@ -47,7 +46,7 @@ impl AudioKeyManager { } } - pub fn request(&self, track: SpotifyId, file: FileId) -> AudioKeyFuture { + pub async fn request(&self, track: SpotifyId, file: FileId) -> Result { let (tx, rx) = oneshot::channel(); let seq = self.lock(move |inner| { @@ -57,7 +56,7 @@ impl AudioKeyManager { }); self.send_key_request(seq, track, file); - AudioKeyFuture(rx) + rx.await.map_err(|_| AudioKeyError)? } fn send_key_request(&self, seq: u32, track: SpotifyId, file: FileId) { @@ -70,18 +69,3 @@ impl AudioKeyManager { self.session().send_packet(0xc, data) } } - -pub struct AudioKeyFuture(oneshot::Receiver>); -impl Future for AudioKeyFuture { - type Item = T; - type Error = AudioKeyError; - - fn poll(&mut self) -> Poll { - match self.0.poll() { - Ok(Async::Ready(Ok(value))) => Ok(Async::Ready(value)), - Ok(Async::Ready(Err(err))) => Err(err), - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(oneshot::Canceled) => Err(AudioKeyError), - } - } -} diff --git a/core/src/authentication.rs b/core/src/authentication.rs index 36cbd439..dd39fd85 100644 --- a/core/src/authentication.rs +++ b/core/src/authentication.rs @@ -1,11 +1,9 @@ use aes::Aes192; -use base64; +use aes::NewBlockCipher; use byteorder::{BigEndian, ByteOrder}; use hmac::Hmac; use pbkdf2::pbkdf2; use protobuf::ProtobufEnum; -use serde; -use serde_json; use sha1::{Digest, Sha1}; use std::fs::File; use std::io::{self, Read, Write}; @@ -76,9 +74,9 @@ impl Credentials { // decrypt data using ECB mode without padding let blob = { - use aes::block_cipher_trait::generic_array::typenum::Unsigned; - use aes::block_cipher_trait::generic_array::GenericArray; - use aes::block_cipher_trait::BlockCipher; + use aes::cipher::generic_array::typenum::Unsigned; + use aes::cipher::generic_array::GenericArray; + use aes::cipher::BlockCipher; let mut data = base64::decode(encrypted_blob).unwrap(); let cipher = Aes192::new(GenericArray::from_slice(&key)); diff --git a/core/src/channel.rs b/core/src/channel.rs index b614fac4..7ada05d5 100644 --- a/core/src/channel.rs +++ b/core/src/channel.rs @@ -1,9 +1,12 @@ use byteorder::{BigEndian, ByteOrder}; use bytes::Bytes; -use futures::sync::{mpsc, BiLock}; -use futures::{Async, Poll, Stream}; -use std::collections::HashMap; -use std::time::Instant; +use futures::{channel::mpsc, lock::BiLock, Stream, StreamExt}; +use std::{ + collections::HashMap, + pin::Pin, + task::{Context, Poll}, + time::Instant, +}; use crate::util::SeqGenerator; @@ -101,12 +104,10 @@ impl ChannelManager { } impl Channel { - fn recv_packet(&mut self) -> Poll { - let (cmd, packet) = match self.receiver.poll() { - Ok(Async::Ready(Some(t))) => t, - Ok(Async::Ready(None)) => return Err(ChannelError), // The channel has been closed. - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(()) => unreachable!(), + fn recv_packet(&mut self, cx: &mut Context<'_>) -> Poll> { + let (cmd, packet) = match self.receiver.poll_next_unpin(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(o) => o.ok_or(ChannelError)?, }; if cmd == 0xa { @@ -115,9 +116,9 @@ impl Channel { self.state = ChannelState::Closed; - Err(ChannelError) + Poll::Ready(Err(ChannelError)) } else { - Ok(Async::Ready(packet)) + Poll::Ready(Ok(packet)) } } @@ -129,16 +130,19 @@ impl Channel { } impl Stream for Channel { - type Item = ChannelEvent; - type Error = ChannelError; + type Item = Result; - fn poll(&mut self) -> Poll, Self::Error> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { match self.state.clone() { ChannelState::Closed => panic!("Polling already terminated channel"), ChannelState::Header(mut data) => { if data.len() == 0 { - data = try_ready!(self.recv_packet()); + data = match self.recv_packet(cx) { + Poll::Ready(Ok(x)) => x, + Poll::Ready(Err(x)) => return Poll::Ready(Some(Err(x))), + Poll::Pending => return Poll::Pending, + }; } let length = BigEndian::read_u16(data.split_to(2).as_ref()) as usize; @@ -152,19 +156,23 @@ impl Stream for Channel { self.state = ChannelState::Header(data); let event = ChannelEvent::Header(header_id, header_data); - return Ok(Async::Ready(Some(event))); + return Poll::Ready(Some(Ok(event))); } } ChannelState::Data => { - let data = try_ready!(self.recv_packet()); + let data = match self.recv_packet(cx) { + Poll::Ready(Ok(x)) => x, + Poll::Ready(Err(x)) => return Poll::Ready(Some(Err(x))), + Poll::Pending => return Poll::Pending, + }; if data.len() == 0 { self.receiver.close(); self.state = ChannelState::Closed; - return Ok(Async::Ready(None)); + return Poll::Ready(None); } else { let event = ChannelEvent::Data(data); - return Ok(Async::Ready(Some(event))); + return Poll::Ready(Some(Ok(event))); } } } @@ -173,38 +181,45 @@ impl Stream for Channel { } impl Stream for ChannelData { - type Item = Bytes; - type Error = ChannelError; + type Item = Result; - fn poll(&mut self) -> Poll, Self::Error> { - let mut channel = match self.0.poll_lock() { - Async::Ready(c) => c, - Async::NotReady => return Ok(Async::NotReady), + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut channel = match self.0.poll_lock(cx) { + Poll::Ready(c) => c, + Poll::Pending => return Poll::Pending, }; loop { - match try_ready!(channel.poll()) { + let x = match channel.poll_next_unpin(cx) { + Poll::Ready(x) => x.transpose()?, + Poll::Pending => return Poll::Pending, + }; + match x { Some(ChannelEvent::Header(..)) => (), - Some(ChannelEvent::Data(data)) => return Ok(Async::Ready(Some(data))), - None => return Ok(Async::Ready(None)), + Some(ChannelEvent::Data(data)) => return Poll::Ready(Some(Ok(data))), + None => return Poll::Ready(None), } } } } impl Stream for ChannelHeaders { - type Item = (u8, Vec); - type Error = ChannelError; + type Item = Result<(u8, Vec), ChannelError>; - fn poll(&mut self) -> Poll, Self::Error> { - let mut channel = match self.0.poll_lock() { - Async::Ready(c) => c, - Async::NotReady => return Ok(Async::NotReady), + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut channel = match self.0.poll_lock(cx) { + Poll::Ready(c) => c, + Poll::Pending => return Poll::Pending, }; - match try_ready!(channel.poll()) { - Some(ChannelEvent::Header(id, data)) => Ok(Async::Ready(Some((id, data)))), - Some(ChannelEvent::Data(..)) | None => Ok(Async::Ready(None)), + let x = match channel.poll_next_unpin(cx) { + Poll::Ready(x) => x.transpose()?, + Poll::Pending => return Poll::Pending, + }; + + match x { + Some(ChannelEvent::Header(id, data)) => Poll::Ready(Some(Ok((id, data)))), + Some(ChannelEvent::Data(..)) | None => Poll::Ready(None), } } } diff --git a/core/src/component.rs b/core/src/component.rs index 50ab7b37..a761c455 100644 --- a/core/src/component.rs +++ b/core/src/component.rs @@ -35,29 +35,3 @@ macro_rules! component { } } } - -use std::cell::UnsafeCell; -use std::sync::Mutex; - -pub(crate) struct Lazy(Mutex, UnsafeCell>); -unsafe impl Sync for Lazy {} -unsafe impl Send for Lazy {} - -#[cfg_attr(feature = "cargo-clippy", allow(mutex_atomic))] -impl Lazy { - pub(crate) fn new() -> Lazy { - Lazy(Mutex::new(false), UnsafeCell::new(None)) - } - - pub(crate) fn get T>(&self, f: F) -> &T { - let mut inner = self.0.lock().unwrap(); - if !*inner { - unsafe { - *self.1.get() = Some(f()); - } - *inner = true; - } - - unsafe { &*self.1.get() }.as_ref().unwrap() - } -} diff --git a/core/src/connection/codec.rs b/core/src/connection/codec.rs index fa4cd9d9..ead07b6e 100644 --- a/core/src/connection/codec.rs +++ b/core/src/connection/codec.rs @@ -2,7 +2,7 @@ use byteorder::{BigEndian, ByteOrder}; use bytes::{BufMut, Bytes, BytesMut}; use shannon::Shannon; use std::io; -use tokio_io::codec::{Decoder, Encoder}; +use tokio_util::codec::{Decoder, Encoder}; const HEADER_SIZE: usize = 3; const MAC_SIZE: usize = 4; @@ -35,8 +35,7 @@ impl APCodec { } } -impl Encoder for APCodec { - type Item = (u8, Vec); +impl Encoder<(u8, Vec)> for APCodec { type Error = io::Error; fn encode(&mut self, item: (u8, Vec), buf: &mut BytesMut) -> io::Result<()> { @@ -45,7 +44,7 @@ impl Encoder for APCodec { buf.reserve(3 + payload.len()); buf.put_u8(cmd); - buf.put_u16_be(payload.len() as u16); + buf.put_u16(payload.len() as u16); buf.extend_from_slice(&payload); self.encode_cipher.nonce_u32(self.encode_nonce); diff --git a/core/src/connection/handshake.rs b/core/src/connection/handshake.rs index 220ab6e8..3810fc96 100644 --- a/core/src/connection/handshake.rs +++ b/core/src/connection/handshake.rs @@ -1,14 +1,11 @@ use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; -use futures::{Async, Future, Poll}; use hmac::{Hmac, Mac}; use protobuf::{self, Message}; use rand::thread_rng; use sha1::Sha1; -use std::io::{self, Read}; -use std::marker::PhantomData; -use tokio_codec::{Decoder, Framed}; -use tokio_io::io::{read_exact, write_all, ReadExact, Window, WriteAll}; -use tokio_io::{AsyncRead, AsyncWrite}; +use std::io; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use tokio_util::codec::{Decoder, Framed}; use super::codec::APCodec; use crate::diffie_hellman::DHLocalKeys; @@ -16,72 +13,33 @@ use crate::protocol; use crate::protocol::keyexchange::{APResponseMessage, ClientHello, ClientResponsePlaintext}; use crate::util; -pub struct Handshake { - keys: DHLocalKeys, - state: HandshakeState, -} - -enum HandshakeState { - ClientHello(WriteAll>), - APResponse(RecvPacket), - ClientResponse(Option, WriteAll>), -} - -pub fn handshake(connection: T) -> Handshake { +pub async fn handshake( + mut connection: T, +) -> io::Result> { let local_keys = DHLocalKeys::random(&mut thread_rng()); - let client_hello = client_hello(connection, local_keys.public_key()); + let gc = local_keys.public_key(); + let mut accumulator = client_hello(&mut connection, gc).await?; + let message: APResponseMessage = recv_packet(&mut connection, &mut accumulator).await?; + let remote_key = message + .get_challenge() + .get_login_crypto_challenge() + .get_diffie_hellman() + .get_gs() + .to_owned(); - Handshake { - keys: local_keys, - state: HandshakeState::ClientHello(client_hello), - } + let shared_secret = local_keys.shared_secret(&remote_key); + let (challenge, send_key, recv_key) = compute_keys(&shared_secret, &accumulator); + let codec = APCodec::new(&send_key, &recv_key); + + client_response(&mut connection, challenge).await?; + + Ok(codec.framed(connection)) } -impl Future for Handshake { - type Item = Framed; - type Error = io::Error; - - fn poll(&mut self) -> Poll { - use self::HandshakeState::*; - loop { - self.state = match self.state { - ClientHello(ref mut write) => { - let (connection, accumulator) = try_ready!(write.poll()); - - let read = recv_packet(connection, accumulator); - APResponse(read) - } - - APResponse(ref mut read) => { - let (connection, message, accumulator) = try_ready!(read.poll()); - let remote_key = message - .get_challenge() - .get_login_crypto_challenge() - .get_diffie_hellman() - .get_gs() - .to_owned(); - - let shared_secret = self.keys.shared_secret(&remote_key); - let (challenge, send_key, recv_key) = - compute_keys(&shared_secret, &accumulator); - let codec = APCodec::new(&send_key, &recv_key); - - let write = client_response(connection, challenge); - ClientResponse(Some(codec), write) - } - - ClientResponse(ref mut codec, ref mut write) => { - let (connection, _) = try_ready!(write.poll()); - let codec = codec.take().unwrap(); - let framed = codec.framed(connection); - return Ok(Async::Ready(framed)); - } - } - } - } -} - -fn client_hello(connection: T, gc: Vec) -> WriteAll> { +async fn client_hello(connection: &mut T, gc: Vec) -> io::Result> +where + T: AsyncWrite + Unpin, +{ let mut packet = ClientHello::new(); packet .mut_build_info() @@ -106,13 +64,17 @@ fn client_hello(connection: T, gc: Vec) -> WriteAll(size).unwrap(); + as WriteBytesExt>::write_u32::(&mut buffer, size).unwrap(); packet.write_to_vec(&mut buffer).unwrap(); - write_all(connection, buffer) + connection.write_all(&buffer[..]).await?; + Ok(buffer) } -fn client_response(connection: T, challenge: Vec) -> WriteAll> { +async fn client_response(connection: &mut T, challenge: Vec) -> io::Result<()> +where + T: AsyncWrite + Unpin, +{ let mut packet = ClientResponsePlaintext::new(); packet .mut_login_crypto_response() @@ -123,70 +85,35 @@ fn client_response(connection: T, challenge: Vec) -> WriteAll let mut buffer = vec![]; let size = 4 + packet.compute_size(); - buffer.write_u32::(size).unwrap(); + as WriteBytesExt>::write_u32::(&mut buffer, size).unwrap(); packet.write_to_vec(&mut buffer).unwrap(); - write_all(connection, buffer) + connection.write_all(&buffer[..]).await?; + Ok(()) } -enum RecvPacket { - Header(ReadExact>>, PhantomData), - Body(ReadExact>>, PhantomData), -} - -fn recv_packet(connection: T, acc: Vec) -> RecvPacket +async fn recv_packet(connection: &mut T, acc: &mut Vec) -> io::Result where - T: Read, + T: AsyncRead + Unpin, M: Message, { - RecvPacket::Header(read_into_accumulator(connection, 4, acc), PhantomData) + let header = read_into_accumulator(connection, 4, acc).await?; + let size = BigEndian::read_u32(header) as usize; + let data = read_into_accumulator(connection, size - 4, acc).await?; + let message = protobuf::parse_from_bytes(data).unwrap(); + Ok(message) } -impl Future for RecvPacket -where - T: Read, - M: Message, -{ - type Item = (T, M, Vec); - type Error = io::Error; - - fn poll(&mut self) -> Poll { - use self::RecvPacket::*; - loop { - *self = match *self { - Header(ref mut read, _) => { - let (connection, header) = try_ready!(read.poll()); - let size = BigEndian::read_u32(header.as_ref()) as usize; - - let acc = header.into_inner(); - let read = read_into_accumulator(connection, size - 4, acc); - RecvPacket::Body(read, PhantomData) - } - - Body(ref mut read, _) => { - let (connection, data) = try_ready!(read.poll()); - let message = protobuf::parse_from_bytes(data.as_ref()).unwrap(); - - let acc = data.into_inner(); - return Ok(Async::Ready((connection, message, acc))); - } - } - } - } -} - -fn read_into_accumulator( - connection: T, +async fn read_into_accumulator<'a, T: AsyncRead + Unpin>( + connection: &mut T, size: usize, - mut acc: Vec, -) -> ReadExact>> { + acc: &'a mut Vec, +) -> io::Result<&'a mut [u8]> { let offset = acc.len(); acc.resize(offset + size, 0); - let mut window = Window::new(acc); - window.set_start(offset); - - read_exact(connection, window) + connection.read_exact(&mut acc[offset..]).await?; + Ok(&mut acc[offset..]) } fn compute_keys(shared_secret: &[u8], packets: &[u8]) -> (Vec, Vec, Vec) { diff --git a/core/src/connection/mod.rs b/core/src/connection/mod.rs index 72497795..eba64070 100644 --- a/core/src/connection/mod.rs +++ b/core/src/connection/mod.rs @@ -4,13 +4,12 @@ mod handshake; pub use self::codec::APCodec; pub use self::handshake::handshake; -use futures::{Future, Sink, Stream}; +use futures::{SinkExt, StreamExt}; use protobuf::{self, Message}; use std::io; use std::net::ToSocketAddrs; -use tokio_codec::Framed; -use tokio_core::net::TcpStream; -use tokio_core::reactor::Handle; +use tokio::net::TcpStream; +use tokio_util::codec::Framed; use url::Url; use crate::authentication::Credentials; @@ -20,53 +19,36 @@ use crate::proxytunnel; pub type Transport = Framed; -pub fn connect( - addr: String, - handle: &Handle, - proxy: &Option, -) -> Box> { - let (addr, connect_url) = match *proxy { - Some(ref url) => { - info!("Using proxy \"{}\"", url); - match url.to_socket_addrs().and_then(|mut iter| { - iter.next().ok_or(io::Error::new( +pub async fn connect(addr: String, proxy: &Option) -> io::Result { + let socket = if let Some(proxy) = proxy { + info!("Using proxy \"{}\"", proxy); + let socket_addr = proxy.to_socket_addrs().and_then(|mut iter| { + iter.next().ok_or_else(|| { + io::Error::new( io::ErrorKind::NotFound, "Can't resolve proxy server address", - )) - }) { - Ok(socket_addr) => (socket_addr, Some(addr)), - Err(error) => return Box::new(futures::future::err(error)), - } - } - None => { - match addr.to_socket_addrs().and_then(|mut iter| { - iter.next().ok_or(io::Error::new( - io::ErrorKind::NotFound, - "Can't resolve server address", - )) - }) { - Ok(socket_addr) => (socket_addr, None), - Err(error) => return Box::new(futures::future::err(error)), - } - } + ) + }) + })?; + let socket = TcpStream::connect(&socket_addr).await?; + proxytunnel::connect(socket, &addr).await? + } else { + let socket_addr = addr.to_socket_addrs().and_then(|mut iter| { + iter.next().ok_or_else(|| { + io::Error::new(io::ErrorKind::NotFound, "Can't resolve server address") + }) + })?; + TcpStream::connect(&socket_addr).await? }; - let socket = TcpStream::connect(&addr, handle); - if let Some(connect_url) = connect_url { - let connection = socket - .and_then(move |socket| proxytunnel::connect(socket, &connect_url).and_then(handshake)); - Box::new(connection) - } else { - let connection = socket.and_then(handshake); - Box::new(connection) - } + handshake(socket).await } -pub fn authenticate( - transport: Transport, +pub async fn authenticate( + transport: &mut Transport, credentials: Credentials, - device_id: String, -) -> Box> { + device_id: &str, +) -> io::Result { use crate::protocol::authentication::{APWelcome, ClientResponseEncrypted, CpuFamily, Os}; use crate::protocol::keyexchange::APLoginFailed; @@ -91,41 +73,37 @@ pub fn authenticate( version::short_sha(), version::build_id() )); - packet.mut_system_info().set_device_id(device_id); + packet + .mut_system_info() + .set_device_id(device_id.to_string()); packet.set_version_string(version::version_string()); let cmd = 0xab; let data = packet.write_to_bytes().unwrap(); - Box::new( - transport - .send((cmd, data)) - .and_then(|transport| transport.into_future().map_err(|(err, _stream)| err)) - .and_then(|(packet, transport)| match packet { - Some((0xac, data)) => { - let welcome_data: APWelcome = - protobuf::parse_from_bytes(data.as_ref()).unwrap(); + transport.send((cmd, data)).await?; + let (cmd, data) = transport.next().await.expect("EOF")?; + match cmd { + 0xac => { + let welcome_data: APWelcome = protobuf::parse_from_bytes(data.as_ref()).unwrap(); - let reusable_credentials = Credentials { - username: welcome_data.get_canonical_username().to_owned(), - auth_type: welcome_data.get_reusable_auth_credentials_type(), - auth_data: welcome_data.get_reusable_auth_credentials().to_owned(), - }; + let reusable_credentials = Credentials { + username: welcome_data.get_canonical_username().to_owned(), + auth_type: welcome_data.get_reusable_auth_credentials_type(), + auth_data: welcome_data.get_reusable_auth_credentials().to_owned(), + }; - Ok((transport, reusable_credentials)) - } + Ok(reusable_credentials) + } - Some((0xad, data)) => { - let error_data: APLoginFailed = - protobuf::parse_from_bytes(data.as_ref()).unwrap(); - panic!( - "Authentication failed with reason: {:?}", - error_data.get_error_code() - ) - } + 0xad => { + let error_data: APLoginFailed = protobuf::parse_from_bytes(data.as_ref()).unwrap(); + panic!( + "Authentication failed with reason: {:?}", + error_data.get_error_code() + ) + } - Some((cmd, _)) => panic!("Unexpected packet {:?}", cmd), - None => panic!("EOF"), - }), - ) + _ => panic!("Unexpected packet {:?}", cmd), + } } diff --git a/core/src/diffie_hellman.rs b/core/src/diffie_hellman.rs index dec34a3b..358901be 100644 --- a/core/src/diffie_hellman.rs +++ b/core/src/diffie_hellman.rs @@ -1,12 +1,12 @@ use num_bigint::BigUint; -use num_traits::FromPrimitive; +use once_cell::sync::Lazy; use rand::Rng; use crate::util; -lazy_static! { - pub static ref DH_GENERATOR: BigUint = BigUint::from_u64(0x2).unwrap(); - pub static ref DH_PRIME: BigUint = BigUint::from_bytes_be(&[ +pub static DH_GENERATOR: Lazy = Lazy::new(|| BigUint::from_bytes_be(&[0x02])); +pub static DH_PRIME: Lazy = Lazy::new(|| { + BigUint::from_bytes_be(&[ 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xc9, 0x0f, 0xda, 0xa2, 0x21, 0x68, 0xc2, 0x34, 0xc4, 0xc6, 0x62, 0x8b, 0x80, 0xdc, 0x1c, 0xd1, 0x29, 0x02, 0x4e, 0x08, 0x8a, 0x67, 0xcc, 0x74, 0x02, 0x0b, 0xbe, 0xa6, 0x3b, 0x13, 0x9b, 0x22, 0x51, 0x4a, 0x08, 0x79, 0x8e, @@ -14,8 +14,8 @@ lazy_static! { 0xf2, 0x5f, 0x14, 0x37, 0x4f, 0xe1, 0x35, 0x6d, 0x6d, 0x51, 0xc2, 0x45, 0xe4, 0x85, 0xb5, 0x76, 0x62, 0x5e, 0x7e, 0xc6, 0xf4, 0x4c, 0x42, 0xe9, 0xa6, 0x3a, 0x36, 0x20, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - ]); -} + ]) +}); pub struct DHLocalKeys { private_key: BigUint, diff --git a/core/src/keymaster.rs b/core/src/keymaster.rs index f2d7b772..87b3f1e3 100644 --- a/core/src/keymaster.rs +++ b/core/src/keymaster.rs @@ -1,8 +1,4 @@ -use futures::Future; -use serde_json; - -use crate::mercury::MercuryError; -use crate::session::Session; +use crate::{mercury::MercuryError, session::Session}; #[derive(Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] @@ -13,20 +9,16 @@ pub struct Token { pub scope: Vec, } -pub fn get_token( +pub async fn get_token( session: &Session, client_id: &str, scopes: &str, -) -> Box> { +) -> Result { let url = format!( "hm://keymaster/token/authenticated?client_id={}&scope={}", client_id, scopes ); - Box::new(session.mercury().get(url).map(move |response| { - let data = response.payload.first().expect("Empty payload"); - let data = String::from_utf8(data.clone()).unwrap(); - let token: Token = serde_json::from_str(&data).unwrap(); - - token - })) + let response = session.mercury().get(url).await?; + let data = response.payload.first().expect("Empty payload"); + serde_json::from_slice(data.as_ref()).map_err(|_| MercuryError) } diff --git a/core/src/lib.rs b/core/src/lib.rs index c65878c2..65f6f81b 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1,27 +1,23 @@ -#![cfg_attr(feature = "cargo-clippy", allow(unused_io_amount))] +#![allow(clippy::unused_io_amount)] -#[macro_use] -extern crate error_chain; -#[macro_use] -extern crate futures; -#[macro_use] -extern crate lazy_static; #[macro_use] extern crate log; #[macro_use] extern crate serde_derive; - +#[macro_use] +extern crate pin_project; extern crate aes; extern crate base64; extern crate byteorder; extern crate bytes; +extern crate futures; extern crate hmac; extern crate httparse; extern crate hyper; -extern crate hyper_proxy; extern crate num_bigint; extern crate num_integer; extern crate num_traits; +extern crate once_cell; extern crate pbkdf2; extern crate protobuf; extern crate rand; @@ -29,9 +25,8 @@ extern crate serde; extern crate serde_json; extern crate sha1; extern crate shannon; -extern crate tokio_codec; -extern crate tokio_core; -extern crate tokio_io; +extern crate tokio; +extern crate tokio_util; extern crate url; extern crate uuid; @@ -39,13 +34,14 @@ extern crate librespot_protocol as protocol; #[macro_use] mod component; -mod apresolve; + +pub mod apresolve; pub mod audio_key; pub mod authentication; pub mod cache; pub mod channel; pub mod config; -mod connection; +pub mod connection; pub mod diffie_hellman; pub mod keymaster; pub mod mercury; diff --git a/core/src/mercury/mod.rs b/core/src/mercury/mod.rs index 20e3f0db..e77b4a45 100644 --- a/core/src/mercury/mod.rs +++ b/core/src/mercury/mod.rs @@ -1,14 +1,14 @@ use crate::protocol; use crate::util::url_encode; +use crate::util::SeqGenerator; use byteorder::{BigEndian, ByteOrder}; use bytes::Bytes; -use futures::sync::{mpsc, oneshot}; -use futures::{Async, Future, Poll}; -use protobuf; -use std::collections::HashMap; -use std::mem; - -use crate::util::SeqGenerator; +use futures::{ + channel::{mpsc, oneshot}, + Future, +}; +use std::{collections::HashMap, task::Poll}; +use std::{mem, pin::Pin, task::Context}; mod types; pub use self::types::*; @@ -31,17 +31,17 @@ pub struct MercuryPending { callback: Option>>, } -pub struct MercuryFuture(oneshot::Receiver>); -impl Future for MercuryFuture { - type Item = T; - type Error = MercuryError; +#[pin_project] +pub struct MercuryFuture(#[pin] oneshot::Receiver>); - fn poll(&mut self) -> Poll { - match self.0.poll() { - Ok(Async::Ready(Ok(value))) => Ok(Async::Ready(value)), - Ok(Async::Ready(Err(err))) => Err(err), - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(oneshot::Canceled) => Err(MercuryError), +impl Future for MercuryFuture { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.project().0.poll(cx) { + Poll::Ready(Ok(x)) => Poll::Ready(x), + Poll::Ready(Err(_)) => Poll::Ready(Err(MercuryError)), + Poll::Pending => Poll::Pending, } } } @@ -98,46 +98,46 @@ impl MercuryManager { MercurySender::new(self.clone(), uri.into()) } - pub fn subscribe>( + pub async fn subscribe>( &self, uri: T, - ) -> Box, Error = MercuryError>> - { + ) -> Result, MercuryError> { let uri = uri.into(); - let request = self.request(MercuryRequest { - method: MercuryMethod::SUB, - uri: uri.clone(), - content_type: None, - payload: Vec::new(), - }); + let response = self + .request(MercuryRequest { + method: MercuryMethod::SUB, + uri: uri.clone(), + content_type: None, + payload: Vec::new(), + }) + .await?; + + let (tx, rx) = mpsc::unbounded(); let manager = self.clone(); - Box::new(request.map(move |response| { - let (tx, rx) = mpsc::unbounded(); - manager.lock(move |inner| { - if !inner.invalid { - debug!("subscribed uri={} count={}", uri, response.payload.len()); - if response.payload.len() > 0 { - // Old subscription protocol, watch the provided list of URIs - for sub in response.payload { - let mut sub: protocol::pubsub::Subscription = - protobuf::parse_from_bytes(&sub).unwrap(); - let sub_uri = sub.take_uri(); + manager.lock(move |inner| { + if !inner.invalid { + debug!("subscribed uri={} count={}", uri, response.payload.len()); + if !response.payload.is_empty() { + // Old subscription protocol, watch the provided list of URIs + for sub in response.payload { + let mut sub: protocol::pubsub::Subscription = + protobuf::parse_from_bytes(&sub).unwrap(); + let sub_uri = sub.take_uri(); - debug!("subscribed sub_uri={}", sub_uri); + debug!("subscribed sub_uri={}", sub_uri); - inner.subscriptions.push((sub_uri, tx.clone())); - } - } else { - // New subscription protocol, watch the requested URI - inner.subscriptions.push((uri, tx)); + inner.subscriptions.push((sub_uri, tx.clone())); } + } else { + // New subscription protocol, watch the requested URI + inner.subscriptions.push((uri, tx)); } - }); + } + }); - rx - })) + Ok(rx) } pub(crate) fn dispatch(&self, cmd: u8, mut data: Bytes) { @@ -193,7 +193,7 @@ impl MercuryManager { let header: protocol::mercury::Header = protobuf::parse_from_bytes(&header_data).unwrap(); let response = MercuryResponse { - uri: url_encode(header.get_uri()).to_owned(), + uri: url_encode(header.get_uri()), status_code: header.get_status_code(), payload: pending.parts, }; diff --git a/core/src/mercury/sender.rs b/core/src/mercury/sender.rs index f00235ef..860c2f33 100644 --- a/core/src/mercury/sender.rs +++ b/core/src/mercury/sender.rs @@ -1,5 +1,5 @@ -use futures::{Async, AsyncSink, Future, Poll, Sink, StartSend}; -use std::collections::VecDeque; +use futures::Sink; +use std::{collections::VecDeque, pin::Pin, task::Context}; use super::*; @@ -30,27 +30,38 @@ impl Clone for MercurySender { } } -impl Sink for MercurySender { - type SinkItem = Vec; - type SinkError = MercuryError; +impl Sink> for MercurySender { + type Error = MercuryError; - fn start_send(&mut self, item: Self::SinkItem) -> StartSend { - let task = self.mercury.send(self.uri.clone(), item); - self.pending.push_back(task); - Ok(AsyncSink::Ready) + fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_flush(cx) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { match self.pending.front_mut() { Some(task) => { - try_ready!(task.poll()); + match Pin::new(task).poll(cx) { + Poll::Ready(Err(x)) => return Poll::Ready(Err(x)), + Poll::Pending => return Poll::Pending, + _ => (), + }; } None => { - return Ok(Async::Ready(())); + return Poll::Ready(Ok(())); } } self.pending.pop_front(); } } + + fn start_send(mut self: Pin<&mut Self>, item: Vec) -> Result<(), Self::Error> { + let task = self.mercury.send(self.uri.clone(), item); + self.pending.push_back(task); + Ok(()) + } } diff --git a/core/src/proxytunnel.rs b/core/src/proxytunnel.rs index b1363846..508de7f8 100644 --- a/core/src/proxytunnel.rs +++ b/core/src/proxytunnel.rs @@ -1,110 +1,45 @@ use std::io; -use std::str::FromStr; -use futures::{Async, Future, Poll}; -use httparse; use hyper::Uri; -use tokio_io::io::{read, write_all, Read, Window, WriteAll}; -use tokio_io::{AsyncRead, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -pub struct ProxyTunnel { - state: ProxyState, -} - -enum ProxyState { - ProxyConnect(WriteAll>), - ProxyResponse(Read>>), -} - -pub fn connect(connection: T, connect_url: &str) -> ProxyTunnel { - let proxy = proxy_connect(connection, connect_url); - ProxyTunnel { - state: ProxyState::ProxyConnect(proxy), - } -} - -impl Future for ProxyTunnel { - type Item = T; - type Error = io::Error; - - fn poll(&mut self) -> Poll { - use self::ProxyState::*; - loop { - self.state = match self.state { - ProxyConnect(ref mut write) => { - let (connection, mut accumulator) = try_ready!(write.poll()); - - let capacity = accumulator.capacity(); - accumulator.resize(capacity, 0); - let window = Window::new(accumulator); - - let read = read(connection, window); - ProxyResponse(read) - } - - ProxyResponse(ref mut read_f) => { - let (connection, mut window, bytes_read) = try_ready!(read_f.poll()); - - if bytes_read == 0 { - return Err(io::Error::new(io::ErrorKind::Other, "Early EOF from proxy")); - } - - let data_end = window.start() + bytes_read; - - let buf = window.get_ref()[0..data_end].to_vec(); - let mut headers = [httparse::EMPTY_HEADER; 16]; - let mut response = httparse::Response::new(&mut headers); - let status = match response.parse(&buf) { - Ok(status) => status, - Err(err) => { - return Err(io::Error::new(io::ErrorKind::Other, err.to_string())); - } - }; - - if status.is_complete() { - if let Some(code) = response.code { - if code == 200 { - // Proxy says all is well - return Ok(Async::Ready(connection)); - } else { - let reason = response.reason.unwrap_or("no reason"); - let msg = format!("Proxy responded with {}: {}", code, reason); - - return Err(io::Error::new(io::ErrorKind::Other, msg)); - } - } else { - return Err(io::Error::new( - io::ErrorKind::Other, - "Malformed response from proxy", - )); - } - } else { - if data_end >= window.end() { - // Allocate some more buffer space - let newsize = data_end + 100; - window.get_mut().resize(newsize, 0); - window.set_end(newsize); - } - // We did not get a full header - window.set_start(data_end); - let read = read(connection, window); - ProxyResponse(read) - } - } - } - } - } -} - -fn proxy_connect(connection: T, connect_url: &str) -> WriteAll> { - let uri = Uri::from_str(connect_url).unwrap(); - let buffer = format!( +pub async fn connect( + mut connection: T, + connect_url: &str, +) -> io::Result { + let uri = connect_url.parse::().unwrap(); + let mut buffer = format!( "CONNECT {0}:{1} HTTP/1.1\r\n\ \r\n", - uri.host().expect(&format!("No host in {}", uri)), - uri.port().expect(&format!("No port in {}", uri)) + uri.host().unwrap_or_else(|| panic!("No host in {}", uri)), + uri.port().unwrap_or_else(|| panic!("No port in {}", uri)) ) .into_bytes(); + connection.write_all(buffer.as_ref()).await?; - write_all(connection, buffer) + buffer.clear(); + connection.read_to_end(&mut buffer).await?; + if buffer.is_empty() { + return Err(io::Error::new(io::ErrorKind::Other, "Early EOF from proxy")); + } + + let mut headers = [httparse::EMPTY_HEADER; 16]; + let mut response = httparse::Response::new(&mut headers); + + response + .parse(&buffer[..]) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?; + + match response.code { + Some(200) => Ok(connection), // Proxy says all is well + Some(code) => { + let reason = response.reason.unwrap_or("no reason"); + let msg = format!("Proxy responded with {}: {}", code, reason); + Err(io::Error::new(io::ErrorKind::Other, msg)) + } + None => Err(io::Error::new( + io::ErrorKind::Other, + "Malformed response from proxy", + )), + } } diff --git a/core/src/session.rs b/core/src/session.rs index 4d86a02b..2def4085 100644 --- a/core/src/session.rs +++ b/core/src/session.rs @@ -1,20 +1,20 @@ -use std::io; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, RwLock, Weak}; +use std::task::Poll; use std::time::{SystemTime, UNIX_EPOCH}; +use std::{io, pin::Pin, task::Context}; + +use once_cell::sync::OnceCell; use byteorder::{BigEndian, ByteOrder}; use bytes::Bytes; -use futures::sync::mpsc; -use futures::{Async, Future, IntoFuture, Poll, Stream}; -use tokio_core::reactor::{Handle, Remote}; +use futures::{channel::mpsc, Future, FutureExt, StreamExt, TryStream, TryStreamExt}; use crate::apresolve::apresolve_or_fallback; use crate::audio_key::AudioKeyManager; use crate::authentication::Credentials; use crate::cache::Cache; use crate::channel::ChannelManager; -use crate::component::Lazy; use crate::config::SessionConfig; use crate::connection; use crate::mercury::MercuryManager; @@ -32,13 +32,11 @@ struct SessionInternal { tx_connection: mpsc::UnboundedSender<(u8, Vec)>, - audio_key: Lazy, - channel: Lazy, - mercury: Lazy, + audio_key: OnceCell, + channel: OnceCell, + mercury: OnceCell, cache: Option>, - handle: Remote, - session_id: usize, } @@ -48,58 +46,34 @@ static SESSION_COUNTER: AtomicUsize = AtomicUsize::new(0); pub struct Session(Arc); impl Session { - pub fn connect( + pub async fn connect( config: SessionConfig, credentials: Credentials, cache: Option, - handle: Handle, - ) -> Box> { - let access_point = - apresolve_or_fallback::(&handle, &config.proxy, &config.ap_port); + ) -> io::Result { + let ap = apresolve_or_fallback(&config.proxy, &config.ap_port).await; - let handle_ = handle.clone(); - let proxy = config.proxy.clone(); - let connection = access_point.and_then(move |addr| { - info!("Connecting to AP \"{}\"", addr); - connection::connect(addr, &handle_, &proxy) - }); + info!("Connecting to AP \"{}\"", ap); + let mut conn = connection::connect(ap, &config.proxy).await?; - let device_id = config.device_id.clone(); - let authentication = connection.and_then(move |connection| { - connection::authenticate(connection, credentials, device_id) - }); + let reusable_credentials = + connection::authenticate(&mut conn, credentials, &config.device_id).await?; + info!("Authenticated as \"{}\" !", reusable_credentials.username); + if let Some(cache) = &cache { + cache.save_credentials(&reusable_credentials); + } - let result = authentication.map(move |(transport, reusable_credentials)| { - info!("Authenticated as \"{}\" !", reusable_credentials.username); - if let Some(ref cache) = cache { - cache.save_credentials(&reusable_credentials); - } + let session = Session::create(conn, config, cache, reusable_credentials.username); - let (session, task) = Session::create( - &handle, - transport, - config, - cache, - reusable_credentials.username.clone(), - ); - - handle.spawn(task.map_err(|e| { - error!("{:?}", e); - })); - - session - }); - - Box::new(result) + Ok(session) } fn create( - handle: &Handle, transport: connection::Transport, config: SessionConfig, cache: Option, username: String, - ) -> (Session, Box>) { + ) -> Session { let (sink, stream) = transport.split(); let (sender_tx, sender_rx) = mpsc::unbounded(); @@ -120,53 +94,50 @@ impl Session { cache: cache.map(Arc::new), - audio_key: Lazy::new(), - channel: Lazy::new(), - mercury: Lazy::new(), - - handle: handle.remote().clone(), + audio_key: OnceCell::new(), + channel: OnceCell::new(), + mercury: OnceCell::new(), session_id: session_id, })); - let sender_task = sender_rx - .map_err(|e| -> io::Error { panic!(e) }) - .forward(sink) - .map(|_| ()); + let sender_task = sender_rx.map(Ok::<_, io::Error>).forward(sink); let receiver_task = DispatchTask(stream, session.weak()); - let task = Box::new( - (receiver_task, sender_task) - .into_future() - .map(|((), ())| ()), - ); - - (session, task) + let task = + futures::future::join(sender_task, receiver_task).map(|_| io::Result::<_>::Ok(())); + tokio::spawn(task); + session } pub fn audio_key(&self) -> &AudioKeyManager { - self.0.audio_key.get(|| AudioKeyManager::new(self.weak())) + self.0 + .audio_key + .get_or_init(|| AudioKeyManager::new(self.weak())) } pub fn channel(&self) -> &ChannelManager { - self.0.channel.get(|| ChannelManager::new(self.weak())) + self.0 + .channel + .get_or_init(|| ChannelManager::new(self.weak())) } pub fn mercury(&self) -> &MercuryManager { - self.0.mercury.get(|| MercuryManager::new(self.weak())) + self.0 + .mercury + .get_or_init(|| MercuryManager::new(self.weak())) } pub fn time_delta(&self) -> i64 { self.0.data.read().unwrap().time_delta } - pub fn spawn(&self, f: F) + pub fn spawn(&self, task: T) where - F: FnOnce(&Handle) -> R + Send + 'static, - R: IntoFuture, - R::Future: 'static, + T: Future + Send + 'static, + T::Output: Send + 'static, { - self.0.handle.spawn(f) + tokio::spawn(task); } fn debug_info(&self) { @@ -178,7 +149,7 @@ impl Session { ); } - #[cfg_attr(feature = "cargo-clippy", allow(match_same_arms))] + #[allow(clippy::match_same_arms)] fn dispatch(&self, cmd: u8, data: Bytes) { match cmd { 0x4 => { @@ -273,35 +244,34 @@ impl Drop for SessionInternal { struct DispatchTask(S, SessionWeak) where - S: Stream; + S: TryStream + Unpin; impl Future for DispatchTask where - S: Stream, - ::Error: ::std::fmt::Debug, + S: TryStream + Unpin, + ::Ok: std::fmt::Debug, { - type Item = (); - type Error = S::Error; + type Output = Result<(), S::Error>; - fn poll(&mut self) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let session = match self.1.try_upgrade() { Some(session) => session, - None => return Ok(Async::Ready(())), + None => return Poll::Ready(Ok(())), }; loop { - let (cmd, data) = match self.0.poll() { - Ok(Async::Ready(Some(t))) => t, - Ok(Async::Ready(None)) => { + let (cmd, data) = match self.0.try_poll_next_unpin(cx) { + Poll::Ready(Some(Ok(t))) => t, + Poll::Ready(None) => { warn!("Connection to server closed."); session.shutdown(); - return Ok(Async::Ready(())); + return Poll::Ready(Ok(())); } - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(e) => { + Poll::Ready(Some(Err(e))) => { session.shutdown(); - return Err(From::from(e)); + return Poll::Ready(Err(e)); } + Poll::Pending => return Poll::Pending, }; session.dispatch(cmd, data); @@ -311,7 +281,7 @@ where impl Drop for DispatchTask where - S: Stream, + S: TryStream + Unpin, { fn drop(&mut self) { debug!("drop Dispatch");