diff --git a/Cargo.lock b/Cargo.lock index 55d319d1..43270907 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -585,6 +585,12 @@ version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a" +[[package]] +name = "futures-timer" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" + [[package]] name = "futures-util" version = "0.3.21" @@ -726,6 +732,21 @@ dependencies = [ "system-deps", ] +[[package]] +name = "governor" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19775995ee20209163239355bc3ad2f33f83da35d9ef72dea26e5af753552c87" +dependencies = [ + "futures", + "futures-timer", + "no-std-compat", + "nonzero_ext", + "parking_lot 0.12.1", + "rand", + "smallvec", +] + [[package]] name = "gstreamer" version = "0.18.8" @@ -1404,6 +1425,7 @@ dependencies = [ "form_urlencoded", "futures-core", "futures-util", + "governor", "hex", "hmac", "http", @@ -1413,6 +1435,7 @@ dependencies = [ "hyper-rustls 0.23.0", "librespot-protocol", "log", + "nonzero_ext", "num", "num-bigint", "num-derive", @@ -1709,6 +1732,12 @@ dependencies = [ "memoffset", ] +[[package]] +name = "no-std-compat" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" + [[package]] name = "nom" version = "7.1.1" @@ -1719,6 +1748,12 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nonzero_ext" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" + [[package]] name = "ntapi" version = "0.3.7" diff --git a/core/Cargo.toml b/core/Cargo.toml index 476f5216..d0bcda02 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -22,6 +22,7 @@ dns-sd = { version = "0.1", optional = true } form_urlencoded = "1.0" futures-core = "0.3" futures-util = { version = "0.3", features = ["alloc", "bilock", "sink", "unstable"] } +governor = { version = "0.4", default-features = false, features = ["std", "jitter"] } hex = "0.4" hmac = "0.12" httparse = "1.7" @@ -30,6 +31,7 @@ hyper = { version = "0.14", features = ["client", "http1", "http2", "tcp"] } hyper-proxy = { version = "0.9", default-features = false, features = ["rustls"] } hyper-rustls = { version = "0.23", features = ["http2"] } log = "0.4" +nonzero_ext = "0.3" num = "0.4" num-bigint = { version = "0.4", features = ["rand"] } num-derive = "0.3" diff --git a/core/src/http_client.rs b/core/src/http_client.rs index 4e454f97..ef9af1ac 100644 --- a/core/src/http_client.rs +++ b/core/src/http_client.rs @@ -1,7 +1,13 @@ -use std::env::consts::OS; +use std::{env::consts::OS, time::Duration}; use bytes::Bytes; use futures_util::{future::IntoStream, FutureExt}; +use governor::{ + clock::MonotonicClock, + middleware::NoOpMiddleware, + state::{InMemoryState, NotKeyed}, + Jitter, Quota, RateLimiter, +}; use http::{header::HeaderValue, Uri}; use hyper::{ client::{HttpConnector, ResponseFuture}, @@ -10,6 +16,7 @@ use hyper::{ }; use hyper_proxy::{Intercept, Proxy, ProxyConnector}; use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder}; +use nonzero_ext::nonzero; use once_cell::sync::OnceCell; use sysinfo::{System, SystemExt}; use thiserror::Error; @@ -20,6 +27,12 @@ use crate::{ Error, }; +// The 30 seconds interval is documented by Spotify, but the calls per interval +// is a guesstimate and probably subject to licensing (purchasing extra calls) +// and may change at any time. +pub const RATE_LIMIT_INTERVAL: u64 = 30; // seconds +pub const RATE_LIMIT_CALLS_PER_INTERVAL: u32 = 300; + #[derive(Debug, Error)] pub enum HttpClientError { #[error("Response status code: {0}")] @@ -74,11 +87,11 @@ impl From for Error { type HyperClient = Client>, Body>; -#[derive(Clone)] pub struct HttpClient { user_agent: HeaderValue, proxy_url: Option, hyper_client: OnceCell, + rate_limiter: RateLimiter, } impl HttpClient { @@ -109,10 +122,18 @@ impl HttpClient { HeaderValue::from_static(FALLBACK_USER_AGENT) }); + let replenish_interval_ns = Duration::from_secs(RATE_LIMIT_INTERVAL).as_nanos() + / RATE_LIMIT_CALLS_PER_INTERVAL as u128; + let quota = Quota::with_period(Duration::from_nanos(replenish_interval_ns as u64)) + .expect("replenish interval should be valid") + .allow_burst(nonzero![RATE_LIMIT_CALLS_PER_INTERVAL]); + let rate_limiter = RateLimiter::direct(quota); + Self { user_agent, proxy_url: proxy_url.cloned(), hyper_client: OnceCell::new(), + rate_limiter, } } @@ -147,17 +168,54 @@ impl HttpClient { pub async fn request(&self, req: Request) -> Result, Error> { debug!("Requesting {}", req.uri().to_string()); - let request = self.request_fut(req)?; - let response = request.await; + // `Request` does not implement `Clone` because its `Body` may be a single-shot stream. + // As correct as that may be technically, we now need all this boilerplate to clone it + // ourselves, as any `Request` is moved in the loop. + let (parts, body) = req.into_parts(); + let body_as_bytes = hyper::body::to_bytes(body) + .await + .unwrap_or_else(|_| Bytes::new()); - if let Ok(response) = &response { - let code = response.status(); - if code != StatusCode::OK { - return Err(HttpClientError::StatusCode(code).into()); + loop { + let mut req = Request::new(Body::from(body_as_bytes.clone())); + *req.method_mut() = parts.method.clone(); + *req.uri_mut() = parts.uri.clone(); + *req.version_mut() = parts.version; + *req.headers_mut() = parts.headers.clone(); + + // For rate limiting we cannot *just* depend on Spotify sending us HTTP/429 + // Retry-After headers. For example, when there is a service interruption + // and HTTP/500 is returned, we don't want to DoS the Spotify infrastructure. + self.rate_limiter + .until_ready_with_jitter(Jitter::up_to(Duration::from_secs(5))) + .await; + + let request = self.request_fut(req)?; + let response = request.await; + + if let Ok(response) = &response { + let code = response.status(); + + if code == StatusCode::TOO_MANY_REQUESTS { + if let Some(retry_after) = response.headers().get("Retry-After") { + if let Ok(retry_after_str) = retry_after.to_str() { + if let Ok(retry_after_secs) = retry_after_str.parse::() { + warn!("Rate limiting, retrying in {} seconds...", retry_after_secs); + let duration = Duration::from_secs(retry_after_secs); + tokio::time::sleep(duration).await; + continue; + } + } + } + } + + if code != StatusCode::OK { + return Err(HttpClientError::StatusCode(code).into()); + } } - } - Ok(response?) + return Ok(response?); + } } pub async fn request_body(&self, req: Request) -> Result { diff --git a/core/src/spclient.rs b/core/src/spclient.rs index 433fa64f..4af0472f 100644 --- a/core/src/spclient.rs +++ b/core/src/spclient.rs @@ -15,7 +15,6 @@ use hyper::{ Body, HeaderMap, Method, Request, }; use protobuf::{Message, ProtobufEnum}; -use rand::Rng; use sha1::{Digest, Sha1}; use sysinfo::{System, SystemExt}; use thiserror::Error; @@ -176,7 +175,7 @@ impl SpClient { return Ok(client_token.access_token); } - trace!("Client token unavailable or expired, requesting new token."); + debug!("Client token unavailable or expired, requesting new token."); let mut request = ClientTokenRequest::new(); request.set_request_type(ClientTokenRequestType::REQUEST_CLIENT_DATA_REQUEST); @@ -270,7 +269,7 @@ impl SpClient { // or are presented a hash cash challenge to solve first Some(ClientTokenResponseType::RESPONSE_GRANTED_TOKEN_RESPONSE) => break message, Some(ClientTokenResponseType::RESPONSE_CHALLENGES_RESPONSE) => { - trace!("Received a hash cash challenge, solving..."); + debug!("Received a hash cash challenge, solving..."); let challenges = message.get_challenges().clone(); let state = challenges.get_state(); @@ -500,16 +499,7 @@ impl SpClient { } } - // When retrying, avoid hammering the Spotify infrastructure by sleeping a while. - // The backoff time is chosen randomly from an ever-increasing range. - let max_seconds = u64::pow(tries as u64, 2) * 3; - let backoff = Duration::from_secs(rand::thread_rng().gen_range(1..=max_seconds)); - warn!( - "Unable to complete API request, waiting {} seconds before retrying...", - backoff.as_secs(), - ); debug!("Error was: {:?}", last_response); - tokio::time::sleep(backoff).await; } last_response