1
0
Fork 0
mirror of https://github.com/librespot-org/librespot.git synced 2025-10-05 02:39:53 +02:00

Merge pull request #674 from Johannesd3/proxy-support

[Tokio migration] Add back hyper-proxy
This commit is contained in:
Ash 2021-03-29 21:00:18 +02:00 committed by GitHub
commit d4dfd4890f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 178 additions and 197 deletions

View file

@ -1,73 +1,68 @@
const AP_FALLBACK: &str = "ap.spotify.com:443";
use std::error::Error;
use hyper::client::HttpConnector;
use hyper::{Body, Client, Method, Request, Uri};
use hyper_proxy::{Intercept, Proxy, ProxyConnector};
use serde::Deserialize;
use url::Url;
cfg_if! {
if #[cfg(feature = "apresolve")] {
const APRESOLVE_ENDPOINT: &str = "http://apresolve.spotify.com:80";
use super::AP_FALLBACK;
use std::error::Error;
const APRESOLVE_ENDPOINT: &str = "http://apresolve.spotify.com:80";
use hyper::{Body, Client, Method, Request, Uri};
use serde::{Serialize, Deserialize};
use crate::proxytunnel::ProxyTunnel;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct APResolveData {
ap_list: Vec<String>,
}
async fn apresolve(proxy: &Option<Url>, ap_port: &Option<u16>) -> Result<String, Box<dyn Error>> {
let port = ap_port.unwrap_or(443);
let req = Request::builder()
.method(Method::GET)
.uri(
APRESOLVE_ENDPOINT
.parse::<Uri>()
.expect("invalid AP resolve URL"),
)
.body(Body::empty())?;
let response = if let Some(url) = proxy {
Client::builder()
.build(ProxyTunnel::new(&url.socket_addrs(|| None)?[..])?)
.request(req)
.await?
} else {
Client::new().request(req).await?
};
let body = hyper::body::to_bytes(response.into_body()).await?;
let data: APResolveData = serde_json::from_slice(body.as_ref())?;
let ap = if ap_port.is_some() || proxy.is_some() {
data.ap_list.into_iter().find_map(|ap| {
if ap.parse::<Uri>().ok()?.port()? == port {
Some(ap)
} else {
None
}
})
} else {
data.ap_list.into_iter().next()
}
.ok_or("empty AP List")?;
Ok(ap)
}
pub async fn apresolve_or_fallback(proxy: &Option<Url>, ap_port: &Option<u16>) -> String {
apresolve(proxy, ap_port).await.unwrap_or_else(|e| {
warn!("Failed to resolve Access Point: {}", e);
warn!("Using fallback \"{}\"", AP_FALLBACK);
AP_FALLBACK.into()
})
}
} else {
pub async fn apresolve_or_fallback(_: &Option<Url>, _: &Option<u16>) -> String {
AP_FALLBACK.to_string()
}
}
#[derive(Clone, Debug, Deserialize)]
struct APResolveData {
ap_list: Vec<String>,
}
async fn try_apresolve(
proxy: Option<&Url>,
ap_port: Option<u16>,
) -> Result<String, Box<dyn Error>> {
let port = ap_port.unwrap_or(443);
let mut req = Request::new(Body::empty());
*req.method_mut() = Method::GET;
// panic safety: APRESOLVE_ENDPOINT above is valid url.
*req.uri_mut() = APRESOLVE_ENDPOINT.parse().expect("invalid AP resolve URL");
let response = if let Some(url) = proxy {
// Panic safety: all URLs are valid URIs
let uri = url.to_string().parse().unwrap();
let proxy = Proxy::new(Intercept::All, uri);
let connector = HttpConnector::new();
let proxy_connector = ProxyConnector::from_proxy_unsecured(connector, proxy);
Client::builder()
.build(proxy_connector)
.request(req)
.await?
} else {
Client::new().request(req).await?
};
let body = hyper::body::to_bytes(response.into_body()).await?;
let data: APResolveData = serde_json::from_slice(body.as_ref())?;
let ap = if ap_port.is_some() || proxy.is_some() {
data.ap_list.into_iter().find_map(|ap| {
if ap.parse::<Uri>().ok()?.port()? == port {
Some(ap)
} else {
None
}
})
} else {
data.ap_list.into_iter().next()
}
.ok_or("empty AP List")?;
Ok(ap)
}
pub async fn apresolve(proxy: Option<&Url>, ap_port: Option<u16>) -> String {
try_apresolve(proxy, ap_port).await.unwrap_or_else(|e| {
warn!("Failed to resolve Access Point: {}", e);
warn!("Using fallback \"{}\"", AP_FALLBACK);
AP_FALLBACK.into()
})
}

View file

@ -58,25 +58,11 @@ impl From<APLoginFailed> for AuthenticationError {
}
}
pub async fn connect(addr: String, proxy: &Option<Url>) -> io::Result<Transport> {
let socket = if let Some(proxy) = proxy {
info!("Using proxy \"{}\"", proxy);
pub async fn connect(addr: String, proxy: Option<&Url>) -> io::Result<Transport> {
let socket = if let Some(proxy_url) = proxy {
info!("Using proxy \"{}\"", proxy_url);
let mut split = addr.rsplit(':');
let port = split
.next()
.unwrap() // will never panic, split iterator contains at least one element
.parse()
.map_err(|e| {
io::Error::new(io::ErrorKind::InvalidInput, format!("Invalid port: {}", e))
})?;
let host = split
.next()
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "Missing port"))?;
let socket_addr = proxy.socket_addrs(|| None).and_then(|addrs| {
let socket_addr = proxy_url.socket_addrs(|| None).and_then(|addrs| {
addrs.into_iter().next().ok_or_else(|| {
io::Error::new(
io::ErrorKind::NotFound,
@ -86,13 +72,34 @@ pub async fn connect(addr: String, proxy: &Option<Url>) -> io::Result<Transport>
})?;
let socket = TcpStream::connect(&socket_addr).await?;
proxytunnel::connect(socket, host, port).await?
} else {
let socket_addr = addr.to_socket_addrs().and_then(|mut iter| {
iter.next().ok_or_else(|| {
io::Error::new(io::ErrorKind::NotFound, "Can't resolve server address")
})
let uri = addr.parse::<http::Uri>().map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidData,
"Can't parse access point address",
)
})?;
let host = uri.host().ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"The access point address contains no hostname",
)
})?;
let port = uri.port().ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"The access point address contains no port",
)
})?;
proxytunnel::proxy_connect(socket, host, port.as_str()).await?
} else {
let socket_addr = addr.to_socket_addrs()?.next().ok_or_else(|| {
io::Error::new(
io::ErrorKind::NotFound,
"Can't resolve access point address",
)
})?;
TcpStream::connect(&socket_addr).await?
};

View file

@ -2,15 +2,12 @@
#[macro_use]
extern crate log;
#[macro_use]
extern crate cfg_if;
use librespot_protocol as protocol;
#[macro_use]
mod component;
mod apresolve;
pub mod audio_key;
pub mod authentication;
pub mod cache;
@ -25,3 +22,15 @@ pub mod session;
pub mod spotify_id;
pub mod util;
pub mod version;
const AP_FALLBACK: &str = "ap.spotify.com:443";
#[cfg(feature = "apresolve")]
mod apresolve;
#[cfg(not(feature = "apresolve"))]
mod apresolve {
pub async fn apresolve(_: Option<&url::Url>, _: Option<u16>) -> String {
return super::AP_FALLBACK.into();
}
}

View file

@ -2,16 +2,16 @@ use std::io;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
pub async fn connect<T: AsyncRead + AsyncWrite + Unpin>(
pub async fn proxy_connect<T: AsyncRead + AsyncWrite + Unpin>(
mut proxy_connection: T,
connect_host: &str,
connect_port: u16,
connect_port: &str,
) -> io::Result<T> {
let mut buffer = Vec::new();
buffer.extend_from_slice(b"CONNECT ");
buffer.extend_from_slice(connect_host.as_bytes());
buffer.push(b':');
buffer.extend_from_slice(connect_port.to_string().as_bytes());
buffer.extend_from_slice(connect_port.as_bytes());
buffer.extend_from_slice(b" HTTP/1.1\r\n\r\n");
proxy_connection.write_all(buffer.as_ref()).await?;
@ -49,61 +49,7 @@ pub async fn connect<T: AsyncRead + AsyncWrite + Unpin>(
}
if offset >= buffer.len() {
buffer.resize(buffer.len() * 2, 0);
}
}
}
cfg_if! {
if #[cfg(feature = "apresolve")] {
use std::future::Future;
use std::net::{SocketAddr, ToSocketAddrs};
use std::pin::Pin;
use std::task::Poll;
use hyper::service::Service;
use hyper::Uri;
use tokio::net::TcpStream;
#[derive(Clone)]
pub struct ProxyTunnel {
proxy_addr: SocketAddr,
}
impl ProxyTunnel {
pub fn new<T: ToSocketAddrs>(addr: T) -> io::Result<Self> {
let addr = addr.to_socket_addrs()?.next().ok_or_else(|| {
io::Error::new(io::ErrorKind::InvalidInput, "No socket address given")
})?;
Ok(Self { proxy_addr: addr })
}
}
impl Service<Uri> for ProxyTunnel {
type Response = TcpStream;
type Error = io::Error;
type Future = Pin<Box<dyn Future<Output = io::Result<TcpStream>> + Send>>;
fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, url: Uri) -> Self::Future {
let proxy_addr = self.proxy_addr;
let fut = async move {
let host = url
.host()
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "Host is missing"))?;
let port = url
.port()
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "Port is missing"))?;
let conn = TcpStream::connect(proxy_addr).await?;
connect(conn, host, port.as_u16()).await
};
Box::pin(fut)
}
buffer.resize(buffer.len() + 100, 0);
}
}
}

View file

@ -16,7 +16,7 @@ use thiserror::Error;
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use crate::apresolve::apresolve_or_fallback;
use crate::apresolve::apresolve;
use crate::audio_key::AudioKeyManager;
use crate::authentication::Credentials;
use crate::cache::Cache;
@ -67,10 +67,10 @@ impl Session {
credentials: Credentials,
cache: Option<Cache>,
) -> Result<Session, SessionError> {
let ap = apresolve_or_fallback(&config.proxy, &config.ap_port).await;
let ap = apresolve(config.proxy.as_ref(), config.ap_port).await;
info!("Connecting to AP \"{}\"", ap);
let mut conn = connection::connect(ap, &config.proxy).await?;
let mut conn = connection::connect(ap, config.proxy.as_ref()).await?;
let reusable_credentials =
connection::authenticate(&mut conn, credentials, &config.device_id).await?;