diff --git a/Cargo.lock b/Cargo.lock index fbad5707..12e2d78d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -510,6 +510,20 @@ dependencies = [ "syn", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "dasp_sample" version = "0.11.0" @@ -729,12 +743,6 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ced73b1dacfc750a6db6c0a0c3a3853c8b41997e2e2c563dc90804ae6867959" -[[package]] -name = "fixedbitset" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" - [[package]] name = "flate2" version = "1.1.2" @@ -1016,6 +1024,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "444405bbb1a762387aa22dd569429533b54a1d8759d35d3b64cb39b0293eaa19" dependencies = [ "cfg-if", + "dashmap", "futures-sink", "futures-timer", "futures-util", @@ -1169,6 +1178,12 @@ dependencies = [ "tracing", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.5" @@ -1884,7 +1899,6 @@ dependencies = [ "hyper-util", "librespot-core", "log", - "parking_lot", "tempfile", "thiserror 2.0.16", "tokio", @@ -1939,7 +1953,6 @@ dependencies = [ "num-derive", "num-integer", "num-traits", - "parking_lot", "pbkdf2", "pin-project-lite", "priority-queue", @@ -2044,7 +2057,6 @@ dependencies = [ "librespot-metadata", "log", "ogg", - "parking_lot", "portable-atomic", "portaudio-rs", "rand 0.9.2", @@ -2586,13 +2598,10 @@ version = "0.9.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc838d2a56b5b1a6c25f55575dfc605fabb63bb2365f6c2353ef9159aa69e4a5" dependencies = [ - "backtrace", "cfg-if", "libc", - "petgraph", "redox_syscall", "smallvec", - "thread-id", "windows-targets 0.52.6", ] @@ -2633,16 +2642,6 @@ version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" -[[package]] -name = "petgraph" -version = "0.6.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" -dependencies = [ - "fixedbitset", - "indexmap", -] - [[package]] name = "pin-project-lite" version = "0.2.16" @@ -3776,16 +3775,6 @@ dependencies = [ "syn", ] -[[package]] -name = "thread-id" -version = "4.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cfe8f25bbdd100db7e1d34acf7fd2dc59c4bf8f7483f505eaa7d4f12f76cc0ea" -dependencies = [ - "libc", - "winapi", -] - [[package]] name = "time" version = "0.3.44" @@ -3855,7 +3844,6 @@ dependencies = [ "io-uring", "libc", "mio", - "parking_lot", "pin-project-lite", "signal-hook-registry", "slab", diff --git a/Cargo.toml b/Cargo.toml index bcb3664a..63a5927a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -179,7 +179,6 @@ tokio = { version = "1", features = [ "macros", "signal", "sync", - "parking_lot", "process", ] } url = "2.2" diff --git a/audio/Cargo.toml b/audio/Cargo.toml index c7d8bd05..3ff3aac1 100644 --- a/audio/Cargo.toml +++ b/audio/Cargo.toml @@ -28,7 +28,6 @@ http-body-util = "0.1" hyper = { version = "1.6", features = ["http1", "http2"] } hyper-util = { version = "0.1", features = ["client", "http2"] } log = "0.4" -parking_lot = { version = "0.12", features = ["deadlock_detection"] } tempfile = "3" thiserror = "2" -tokio = { version = "1", features = ["macros", "parking_lot", "sync"] } +tokio = { version = "1", features = ["macros", "sync"] } diff --git a/audio/src/fetch/mod.rs b/audio/src/fetch/mod.rs index 781e8a32..271f5b64 100644 --- a/audio/src/fetch/mod.rs +++ b/audio/src/fetch/mod.rs @@ -8,13 +8,14 @@ use std::{ Arc, OnceLock, atomic::{AtomicBool, AtomicUsize, Ordering}, }, + sync::{Condvar, Mutex}, time::Duration, }; use futures_util::{StreamExt, TryFutureExt, future::IntoStream}; use hyper::{Response, StatusCode, body::Incoming, header::CONTENT_RANGE}; use hyper_util::client::legacy::ResponseFuture; -use parking_lot::{Condvar, Mutex}; + use tempfile::NamedTempFile; use thiserror::Error; use tokio::sync::{Semaphore, mpsc, oneshot}; @@ -27,6 +28,8 @@ use crate::range_set::{Range, RangeSet}; pub type AudioFileResult = Result<(), librespot_core::Error>; +const DOWNLOAD_STATUS_POISON_MSG: &str = "audio download status mutex should not be poisoned"; + #[derive(Error, Debug)] pub enum AudioFileError { #[error("other end of channel disconnected")] @@ -163,7 +166,10 @@ impl StreamLoaderController { pub fn range_available(&self, range: Range) -> bool { if let Some(ref shared) = self.stream_shared { - let download_status = shared.download_status.lock(); + let download_status = shared + .download_status + .lock() + .expect(DOWNLOAD_STATUS_POISON_MSG); range.length <= download_status @@ -214,19 +220,28 @@ impl StreamLoaderController { self.fetch(range); if let Some(ref shared) = self.stream_shared { - let mut download_status = shared.download_status.lock(); + let mut download_status = shared + .download_status + .lock() + .expect(DOWNLOAD_STATUS_POISON_MSG); let download_timeout = AudioFetchParams::get().download_timeout; - while range.length - > download_status - .downloaded - .contained_length_from_value(range.start) - { - if shared - .cond - .wait_for(&mut download_status, download_timeout) - .timed_out() + loop { + if range.length + <= download_status + .downloaded + .contained_length_from_value(range.start) { + break; + } + + let (new_download_status, wait_result) = shared + .cond + .wait_timeout(download_status, download_timeout) + .expect(DOWNLOAD_STATUS_POISON_MSG); + + download_status = new_download_status; + if wait_result.timed_out() { return Err(AudioFileError::WaitTimeout.into()); } @@ -558,7 +573,11 @@ impl Read for AudioFileStreaming { let mut ranges_to_request = RangeSet::new(); ranges_to_request.add_range(&Range::new(offset, length_to_request)); - let mut download_status = self.shared.download_status.lock(); + let mut download_status = self + .shared + .download_status + .lock() + .expect(DOWNLOAD_STATUS_POISON_MSG); ranges_to_request.subtract_range_set(&download_status.downloaded); ranges_to_request.subtract_range_set(&download_status.requested); @@ -571,12 +590,14 @@ impl Read for AudioFileStreaming { let download_timeout = AudioFetchParams::get().download_timeout; while !download_status.downloaded.contains(offset) { - if self + let (new_download_status, wait_result) = self .shared .cond - .wait_for(&mut download_status, download_timeout) - .timed_out() - { + .wait_timeout(download_status, download_timeout) + .expect(DOWNLOAD_STATUS_POISON_MSG); + + download_status = new_download_status; + if wait_result.timed_out() { return Err(io::Error::new( io::ErrorKind::TimedOut, Error::deadline_exceeded(AudioFileError::WaitTimeout), @@ -619,6 +640,7 @@ impl Seek for AudioFileStreaming { .shared .download_status .lock() + .expect(DOWNLOAD_STATUS_POISON_MSG) .downloaded .contains(requested_pos as usize); diff --git a/audio/src/fetch/receive.rs b/audio/src/fetch/receive.rs index 3d7dfa64..4c894cf6 100644 --- a/audio/src/fetch/receive.rs +++ b/audio/src/fetch/receive.rs @@ -33,6 +33,7 @@ enum ReceivedData { } const ONE_SECOND: Duration = Duration::from_secs(1); +const DOWNLOAD_STATUS_POISON_MSG: &str = "audio download status mutex should not be poisoned"; async fn receive_data( shared: Arc, @@ -124,7 +125,10 @@ async fn receive_data( if bytes_remaining > 0 { { let missing_range = Range::new(offset, bytes_remaining); - let mut download_status = shared.download_status.lock(); + let mut download_status = shared + .download_status + .lock() + .expect(DOWNLOAD_STATUS_POISON_MSG); download_status.requested.subtract_range(&missing_range); shared.cond.notify_all(); } @@ -189,7 +193,11 @@ impl AudioFileFetch { // The iteration that follows spawns streamers fast, without awaiting them, // so holding the lock for the entire scope of this function should be faster // then locking and unlocking multiple times. - let mut download_status = self.shared.download_status.lock(); + let mut download_status = self + .shared + .download_status + .lock() + .expect(DOWNLOAD_STATUS_POISON_MSG); ranges_to_request.subtract_range_set(&download_status.downloaded); ranges_to_request.subtract_range_set(&download_status.requested); @@ -227,7 +235,11 @@ impl AudioFileFetch { let mut missing_data = RangeSet::new(); missing_data.add_range(&Range::new(0, self.shared.file_size)); { - let download_status = self.shared.download_status.lock(); + let download_status = self + .shared + .download_status + .lock() + .expect(DOWNLOAD_STATUS_POISON_MSG); missing_data.subtract_range_set(&download_status.downloaded); missing_data.subtract_range_set(&download_status.requested); } @@ -349,7 +361,11 @@ impl AudioFileFetch { let received_range = Range::new(data.offset, data.data.len()); let full = { - let mut download_status = self.shared.download_status.lock(); + let mut download_status = self + .shared + .download_status + .lock() + .expect(DOWNLOAD_STATUS_POISON_MSG); download_status.downloaded.add_range(&received_range); self.shared.cond.notify_all(); @@ -415,7 +431,10 @@ pub(super) async fn audio_file_fetch( initial_request.offset + initial_request.length, ); - let mut download_status = shared.download_status.lock(); + let mut download_status = shared + .download_status + .lock() + .expect(DOWNLOAD_STATUS_POISON_MSG); download_status.requested.add_range(&requested_range); } @@ -466,7 +485,11 @@ pub(super) async fn audio_file_fetch( if fetch.shared.is_download_streaming() && fetch.has_download_slots_available() { let bytes_pending: usize = { - let download_status = fetch.shared.download_status.lock(); + let download_status = fetch + .shared + .download_status + .lock() + .expect(DOWNLOAD_STATUS_POISON_MSG); download_status .requested diff --git a/connect/Cargo.toml b/connect/Cargo.toml index b0dd7859..08d24f66 100644 --- a/connect/Cargo.toml +++ b/connect/Cargo.toml @@ -28,6 +28,6 @@ protobuf = "3.7" rand = { version = "0.9", default-features = false, features = ["small_rng"] } serde_json = "1.0" thiserror = "2" -tokio = { version = "1", features = ["macros", "parking_lot", "sync"] } +tokio = { version = "1", features = ["macros", "sync"] } tokio-stream = { version = "0.1", default-features = false } uuid = { version = "1.18", default-features = false, features = ["v4"] } diff --git a/core/Cargo.toml b/core/Cargo.toml index ccc48b13..4a6dea33 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -56,7 +56,7 @@ futures-util = { version = "0.3", default-features = false, features = [ "bilock", "unstable", ] } -governor = { version = "0.10", default-features = false, features = ["std"] } +governor = { version = "0.10", default-features = false, features = ["std", "dashmap"] } hmac = "0.12" httparse = "1.10" http = "1.3" @@ -80,7 +80,6 @@ num-bigint = "0.4" num-derive = "0.4" num-integer = "0.1" num-traits = "0.2" -parking_lot = { version = "0.12", features = ["deadlock_detection"] } pbkdf2 = { version = "0.12", default-features = false, features = ["hmac"] } pin-project-lite = "0.2" priority-queue = "2.5" @@ -100,7 +99,6 @@ tokio = { version = "1", features = [ "io-util", "macros", "net", - "parking_lot", "rt", "sync", "time", @@ -119,4 +117,4 @@ vergen-gitcl = { version = "1.0", default-features = false, features = [ ] } [dev-dependencies] -tokio = { version = "1", features = ["macros", "parking_lot"] } +tokio = { version = "1", features = ["macros"] } diff --git a/core/src/cache.rs b/core/src/cache.rs index 2d2ef53d..15e35d21 100644 --- a/core/src/cache.rs +++ b/core/src/cache.rs @@ -4,16 +4,17 @@ use std::{ fs::{self, File}, io::{self, Read, Write}, path::{Path, PathBuf}, - sync::Arc, + sync::{Arc, Mutex}, time::SystemTime, }; -use parking_lot::Mutex; use priority_queue::PriorityQueue; use thiserror::Error; use crate::{Error, FileId, authentication::Credentials, error::ErrorKind}; +const CACHE_LIMITER_POISON_MSG: &str = "cache limiter mutex should not be poisoned"; + #[derive(Debug, Error)] pub enum CacheError { #[error("audio cache location is not configured")] @@ -189,15 +190,24 @@ impl FsSizeLimiter { } fn add(&self, file: &Path, size: u64) { - self.limiter.lock().add(file, size, SystemTime::now()) + self.limiter + .lock() + .expect(CACHE_LIMITER_POISON_MSG) + .add(file, size, SystemTime::now()) } fn touch(&self, file: &Path) -> bool { - self.limiter.lock().update(file, SystemTime::now()) + self.limiter + .lock() + .expect(CACHE_LIMITER_POISON_MSG) + .update(file, SystemTime::now()) } fn remove(&self, file: &Path) -> bool { - self.limiter.lock().remove(file) + self.limiter + .lock() + .expect(CACHE_LIMITER_POISON_MSG) + .remove(file) } fn prune_internal Option>(mut pop: F) -> Result<(), Error> { @@ -232,7 +242,7 @@ impl FsSizeLimiter { } fn prune(&self) -> Result<(), Error> { - Self::prune_internal(|| self.limiter.lock().pop()) + Self::prune_internal(|| self.limiter.lock().expect(CACHE_LIMITER_POISON_MSG).pop()) } fn new(path: &Path, limit: u64) -> Result { diff --git a/core/src/component.rs b/core/src/component.rs index ebe42e8d..75387ae5 100644 --- a/core/src/component.rs +++ b/core/src/component.rs @@ -1,20 +1,23 @@ +pub(crate) const COMPONENT_POISON_MSG: &str = "component mutex should not be poisoned"; + macro_rules! component { ($name:ident : $inner:ident { $($key:ident : $ty:ty = $value:expr,)* }) => { #[derive(Clone)] - pub struct $name(::std::sync::Arc<($crate::session::SessionWeak, ::parking_lot::Mutex<$inner>)>); + pub struct $name(::std::sync::Arc<($crate::session::SessionWeak, ::std::sync::Mutex<$inner>)>); impl $name { #[allow(dead_code)] pub(crate) fn new(session: $crate::session::SessionWeak) -> $name { debug!(target:"librespot::component", "new {}", stringify!($name)); - $name(::std::sync::Arc::new((session, ::parking_lot::Mutex::new($inner { + $name(::std::sync::Arc::new((session, ::std::sync::Mutex::new($inner { $($key : $value,)* })))) } #[allow(dead_code)] fn lock R, R>(&self, f: F) -> R { - let mut inner = (self.0).1.lock(); + let mut inner = (self.0).1.lock() + .expect($crate::component::COMPONENT_POISON_MSG); f(&mut inner) } diff --git a/core/src/dealer/mod.rs b/core/src/dealer/mod.rs index 4f738403..63ee6e72 100644 --- a/core/src/dealer/mod.rs +++ b/core/src/dealer/mod.rs @@ -6,7 +6,7 @@ use std::{ iter, pin::Pin, sync::{ - Arc, + Arc, Mutex, atomic::{self, AtomicBool}, }, task::Poll, @@ -15,7 +15,6 @@ use std::{ use futures_core::{Future, Stream}; use futures_util::{SinkExt, StreamExt, future::join_all}; -use parking_lot::Mutex; use thiserror::Error; use tokio::{ select, @@ -57,6 +56,11 @@ const PING_TIMEOUT: Duration = Duration::from_secs(3); const RECONNECT_INTERVAL: Duration = Duration::from_secs(10); +const DEALER_REQUEST_HANDLERS_POISON_MSG: &str = + "dealer request handlers mutex should not be poisoned"; +const DEALER_MESSAGE_HANDLERS_POISON_MSG: &str = + "dealer message handlers mutex should not be poisoned"; + struct Response { pub success: bool, } @@ -350,6 +354,7 @@ impl DealerShared { if self .message_handlers .lock() + .expect(DEALER_MESSAGE_HANDLERS_POISON_MSG) .retain(split, &mut |tx| tx.send(msg.clone()).is_ok()) { return; @@ -387,7 +392,10 @@ impl DealerShared { return; }; - let handler_map = self.request_handlers.lock(); + let handler_map = self + .request_handlers + .lock() + .expect(DEALER_REQUEST_HANDLERS_POISON_MSG); if let Some(handler) = handler_map.get(split) { handler.handle_request(payload_request, responder); @@ -425,21 +433,51 @@ impl Dealer { where H: RequestHandler, { - add_handler(&mut self.shared.request_handlers.lock(), uri, handler) + add_handler( + &mut self + .shared + .request_handlers + .lock() + .expect(DEALER_REQUEST_HANDLERS_POISON_MSG), + uri, + handler, + ) } pub fn remove_handler(&self, uri: &str) -> Option> { - remove_handler(&mut self.shared.request_handlers.lock(), uri) + remove_handler( + &mut self + .shared + .request_handlers + .lock() + .expect(DEALER_REQUEST_HANDLERS_POISON_MSG), + uri, + ) } pub fn subscribe(&self, uris: &[&str]) -> Result { - subscribe(&mut self.shared.message_handlers.lock(), uris) + subscribe( + &mut self + .shared + .message_handlers + .lock() + .expect(DEALER_MESSAGE_HANDLERS_POISON_MSG), + uris, + ) } pub fn handles(&self, uri: &str) -> bool { handles( - &self.shared.request_handlers.lock(), - &self.shared.message_handlers.lock(), + &self + .shared + .request_handlers + .lock() + .expect(DEALER_REQUEST_HANDLERS_POISON_MSG), + &self + .shared + .message_handlers + .lock() + .expect(DEALER_MESSAGE_HANDLERS_POISON_MSG), uri, ) } diff --git a/core/src/http_client.rs b/core/src/http_client.rs index 9d5d54fc..7d7c6d8b 100644 --- a/core/src/http_client.rs +++ b/core/src/http_client.rs @@ -1,5 +1,4 @@ use std::{ - collections::HashMap, sync::OnceLock, time::{Duration, Instant}, }; @@ -7,7 +6,8 @@ use std::{ use bytes::Bytes; use futures_util::{FutureExt, future::IntoStream}; use governor::{ - Quota, RateLimiter, clock::MonotonicClock, middleware::NoOpMiddleware, state::InMemoryState, + Quota, RateLimiter, clock::MonotonicClock, middleware::NoOpMiddleware, + state::keyed::DashMapStateStore, }; use http::{Uri, header::HeaderValue}; use http_body_util::{BodyExt, Full}; @@ -18,7 +18,6 @@ use hyper_util::{ rt::TokioExecutor, }; use nonzero_ext::nonzero; -use parking_lot::Mutex; use thiserror::Error; use url::Url; @@ -102,8 +101,7 @@ pub struct HttpClient { // while the DashMap variant is more performant, our level of concurrency // is pretty low so we can save pulling in that extra dependency - rate_limiter: - RateLimiter>, MonotonicClock, NoOpMiddleware>, + rate_limiter: RateLimiter, MonotonicClock, NoOpMiddleware>, } impl HttpClient { diff --git a/core/src/session.rs b/core/src/session.rs index 91c41781..333678fd 100644 --- a/core/src/session.rs +++ b/core/src/session.rs @@ -4,8 +4,7 @@ use std::{ io, pin::Pin, process::exit, - sync::OnceLock, - sync::{Arc, Weak}, + sync::{Arc, OnceLock, RwLock, Weak}, task::{Context, Poll}, time::{Duration, SystemTime, UNIX_EPOCH}, }; @@ -34,7 +33,6 @@ use futures_core::TryStream; use futures_util::StreamExt; use librespot_protocol::authentication::AuthenticationType; use num_traits::FromPrimitive; -use parking_lot::RwLock; use pin_project_lite::pin_project; use quick_xml::events::Event; use thiserror::Error; @@ -45,6 +43,8 @@ use tokio::{ use tokio_stream::wrappers::UnboundedReceiverStream; use uuid::Uuid; +const SESSION_DATA_POISON_MSG: &str = "session data rwlock should not be poisoned"; + #[derive(Debug, Error)] pub enum SessionError { #[error(transparent)] @@ -338,7 +338,11 @@ impl Session { } pub fn time_delta(&self) -> i64 { - self.0.data.read().time_delta + self.0 + .data + .read() + .expect(SESSION_DATA_POISON_MSG) + .time_delta } pub fn spawn(&self, task: T) @@ -388,15 +392,32 @@ impl Session { // you need more fields at once, in which case this can spare multiple `read` // locks. pub fn user_data(&self) -> UserData { - self.0.data.read().user_data.clone() + self.0 + .data + .read() + .expect(SESSION_DATA_POISON_MSG) + .user_data + .clone() } pub fn session_id(&self) -> String { - self.0.data.read().session_id.clone() + self.0 + .data + .read() + .expect(SESSION_DATA_POISON_MSG) + .session_id + .clone() } pub fn set_session_id(&self, session_id: &str) { - session_id.clone_into(&mut self.0.data.write().session_id); + session_id.clone_into( + &mut self + .0 + .data + .write() + .expect(SESSION_DATA_POISON_MSG) + .session_id, + ); } pub fn device_id(&self) -> &str { @@ -404,63 +425,155 @@ impl Session { } pub fn client_id(&self) -> String { - self.0.data.read().client_id.clone() + self.0 + .data + .read() + .expect(SESSION_DATA_POISON_MSG) + .client_id + .clone() } pub fn set_client_id(&self, client_id: &str) { - client_id.clone_into(&mut self.0.data.write().client_id); + client_id.clone_into( + &mut self + .0 + .data + .write() + .expect(SESSION_DATA_POISON_MSG) + .client_id, + ); } pub fn client_name(&self) -> String { - self.0.data.read().client_name.clone() + self.0 + .data + .read() + .expect(SESSION_DATA_POISON_MSG) + .client_name + .clone() } pub fn set_client_name(&self, client_name: &str) { - client_name.clone_into(&mut self.0.data.write().client_name); + client_name.clone_into( + &mut self + .0 + .data + .write() + .expect(SESSION_DATA_POISON_MSG) + .client_name, + ); } pub fn client_brand_name(&self) -> String { - self.0.data.read().client_brand_name.clone() + self.0 + .data + .read() + .expect(SESSION_DATA_POISON_MSG) + .client_brand_name + .clone() } pub fn set_client_brand_name(&self, client_brand_name: &str) { - client_brand_name.clone_into(&mut self.0.data.write().client_brand_name); + client_brand_name.clone_into( + &mut self + .0 + .data + .write() + .expect(SESSION_DATA_POISON_MSG) + .client_brand_name, + ); } pub fn client_model_name(&self) -> String { - self.0.data.read().client_model_name.clone() + self.0 + .data + .read() + .expect(SESSION_DATA_POISON_MSG) + .client_model_name + .clone() } pub fn set_client_model_name(&self, client_model_name: &str) { - client_model_name.clone_into(&mut self.0.data.write().client_model_name); + client_model_name.clone_into( + &mut self + .0 + .data + .write() + .expect(SESSION_DATA_POISON_MSG) + .client_model_name, + ); } pub fn connection_id(&self) -> String { - self.0.data.read().connection_id.clone() + self.0 + .data + .read() + .expect(SESSION_DATA_POISON_MSG) + .connection_id + .clone() } pub fn set_connection_id(&self, connection_id: &str) { - connection_id.clone_into(&mut self.0.data.write().connection_id); + connection_id.clone_into( + &mut self + .0 + .data + .write() + .expect(SESSION_DATA_POISON_MSG) + .connection_id, + ); } pub fn username(&self) -> String { - self.0.data.read().user_data.canonical_username.clone() + self.0 + .data + .read() + .expect(SESSION_DATA_POISON_MSG) + .user_data + .canonical_username + .clone() } pub fn set_username(&self, username: &str) { - username.clone_into(&mut self.0.data.write().user_data.canonical_username); + username.clone_into( + &mut self + .0 + .data + .write() + .expect(SESSION_DATA_POISON_MSG) + .user_data + .canonical_username, + ); } pub fn auth_data(&self) -> Vec { - self.0.data.read().auth_data.clone() + self.0 + .data + .read() + .expect(SESSION_DATA_POISON_MSG) + .auth_data + .clone() } pub fn set_auth_data(&self, auth_data: &[u8]) { - auth_data.clone_into(&mut self.0.data.write().auth_data); + auth_data.clone_into( + &mut self + .0 + .data + .write() + .expect(SESSION_DATA_POISON_MSG) + .auth_data, + ); } pub fn country(&self) -> String { - self.0.data.read().user_data.country.clone() + self.0 + .data + .read() + .expect(SESSION_DATA_POISON_MSG) + .user_data + .country + .clone() } pub fn filter_explicit_content(&self) -> bool { @@ -489,6 +602,7 @@ impl Session { self.0 .data .write() + .expect(SESSION_DATA_POISON_MSG) .user_data .attributes .insert(key.to_owned(), value.to_owned()) @@ -497,11 +611,24 @@ impl Session { pub fn set_user_attributes(&self, attributes: UserAttributes) { Self::check_catalogue(&attributes); - self.0.data.write().user_data.attributes.extend(attributes) + self.0 + .data + .write() + .expect(SESSION_DATA_POISON_MSG) + .user_data + .attributes + .extend(attributes) } pub fn get_user_attribute(&self, key: &str) -> Option { - self.0.data.read().user_data.attributes.get(key).cloned() + self.0 + .data + .read() + .expect(SESSION_DATA_POISON_MSG) + .user_data + .attributes + .get(key) + .cloned() } fn weak(&self) -> SessionWeak { @@ -510,13 +637,13 @@ impl Session { pub fn shutdown(&self) { debug!("Shutdown: Invalidating session"); - self.0.data.write().invalid = true; + self.0.data.write().expect(SESSION_DATA_POISON_MSG).invalid = true; self.mercury().shutdown(); self.channel().shutdown(); } pub fn is_invalid(&self) -> bool { - self.0.data.read().invalid + self.0.data.read().expect(SESSION_DATA_POISON_MSG).invalid } } @@ -643,7 +770,7 @@ where .unwrap_or(Duration::ZERO) .as_secs() as i64; { - let mut data = session.0.data.write(); + let mut data = session.0.data.write().expect(SESSION_DATA_POISON_MSG); data.time_delta = server_timestamp.saturating_sub(timestamp); } @@ -668,7 +795,13 @@ where Some(CountryCode) => { let country = String::from_utf8(data.as_ref().to_owned())?; info!("Country: {country:?}"); - session.0.data.write().user_data.country = country; + session + .0 + .data + .write() + .expect(SESSION_DATA_POISON_MSG) + .user_data + .country = country; Ok(()) } Some(StreamChunkRes) | Some(ChannelError) => session.channel().dispatch(cmd, data), @@ -713,7 +846,13 @@ where trace!("Received product info: {user_attributes:#?}"); Session::check_catalogue(&user_attributes); - session.0.data.write().user_data.attributes = user_attributes; + session + .0 + .data + .write() + .expect(SESSION_DATA_POISON_MSG) + .user_data + .attributes = user_attributes; Ok(()) } Some(SecretBlock) diff --git a/discovery/Cargo.toml b/discovery/Cargo.toml index 9ec64939..2f86d5ac 100644 --- a/discovery/Cargo.toml +++ b/discovery/Cargo.toml @@ -51,7 +51,7 @@ serde_repr = "0.1" serde_json = "1.0" sha1 = "0.10" thiserror = "2" -tokio = { version = "1", features = ["parking_lot", "sync", "rt"] } +tokio = { version = "1", features = ["sync", "rt"] } zbus = { version = "5", default-features = false, features = [ "tokio", ], optional = true } @@ -59,4 +59,4 @@ zbus = { version = "5", default-features = false, features = [ [dev-dependencies] futures = "0.3" hex = "0.4" -tokio = { version = "1", features = ["macros", "parking_lot", "rt"] } +tokio = { version = "1", features = ["macros", "rt"] } diff --git a/playback/Cargo.toml b/playback/Cargo.toml index d780cbda..2001c680 100644 --- a/playback/Cargo.toml +++ b/playback/Cargo.toml @@ -53,11 +53,10 @@ librespot-metadata = { version = "0.7.1", path = "../metadata", default-features futures-util = { version = "0.3", default-features = false, features = ["std"] } log = "0.4" -parking_lot = { version = "0.12", features = ["deadlock_detection"] } portable-atomic = "1" shell-words = "1.1" thiserror = "2" -tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "sync"] } +tokio = { version = "1", features = ["rt-multi-thread", "sync"] } zerocopy = { version = "0.8", features = ["derive"] } # Backends diff --git a/playback/src/audio_backend/gstreamer.rs b/playback/src/audio_backend/gstreamer.rs index b9087e3d..f41d4333 100644 --- a/playback/src/audio_backend/gstreamer.rs +++ b/playback/src/audio_backend/gstreamer.rs @@ -1,3 +1,5 @@ +use std::sync::{Arc, Mutex}; + use gstreamer::{ State, event::{FlushStart, FlushStop}, @@ -8,8 +10,7 @@ use gstreamer as gst; use gstreamer_app as gst_app; use gstreamer_audio as gst_audio; -use parking_lot::Mutex; -use std::sync::Arc; +const GSTREAMER_ASYNC_ERROR_POISON_MSG: &str = "gstreamer async error mutex should not be poisoned"; use super::{Open, Sink, SinkAsBytes, SinkError, SinkResult}; @@ -97,7 +98,9 @@ impl Open for GstreamerSink { gst::MessageView::Eos(_) => { println!("gst signaled end of stream"); - let mut async_error_storage = async_error_clone.lock(); + let mut async_error_storage = async_error_clone + .lock() + .expect(GSTREAMER_ASYNC_ERROR_POISON_MSG); *async_error_storage = Some(String::from("gst signaled end of stream")); } gst::MessageView::Error(err) => { @@ -108,7 +111,9 @@ impl Open for GstreamerSink { err.debug() ); - let mut async_error_storage = async_error_clone.lock(); + let mut async_error_storage = async_error_clone + .lock() + .expect(GSTREAMER_ASYNC_ERROR_POISON_MSG); *async_error_storage = Some(format!( "Error from {:?}: {} ({:?})", err.src().map(|s| s.path_string()), @@ -138,7 +143,10 @@ impl Open for GstreamerSink { impl Sink for GstreamerSink { fn start(&mut self) -> SinkResult<()> { - *self.async_error.lock() = None; + *self + .async_error + .lock() + .expect(GSTREAMER_ASYNC_ERROR_POISON_MSG) = None; self.appsrc.send_event(FlushStop::new(true)); self.bufferpool .set_active(true) @@ -150,7 +158,10 @@ impl Sink for GstreamerSink { } fn stop(&mut self) -> SinkResult<()> { - *self.async_error.lock() = None; + *self + .async_error + .lock() + .expect(GSTREAMER_ASYNC_ERROR_POISON_MSG) = None; self.appsrc.send_event(FlushStart::new()); self.pipeline .set_state(State::Paused) @@ -173,7 +184,11 @@ impl Drop for GstreamerSink { impl SinkAsBytes for GstreamerSink { #[inline] fn write_bytes(&mut self, data: &[u8]) -> SinkResult<()> { - if let Some(async_error) = &*self.async_error.lock() { + if let Some(async_error) = &*self + .async_error + .lock() + .expect(GSTREAMER_ASYNC_ERROR_POISON_MSG) + { return Err(SinkError::OnWrite(async_error.to_string())); } diff --git a/playback/src/player.rs b/playback/src/player.rs index 886c9cf3..a4a03ca3 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -6,6 +6,7 @@ use std::{ mem, pin::Pin, process::exit, + sync::Mutex, sync::{ Arc, atomic::{AtomicUsize, Ordering}, @@ -32,7 +33,6 @@ use futures_util::{ stream::futures_unordered::FuturesUnordered, }; use librespot_metadata::track::Tracks; -use parking_lot::Mutex; use symphonia::core::io::MediaSource; use tokio::sync::{mpsc, oneshot}; @@ -46,6 +46,8 @@ pub const PCM_AT_0DBFS: f64 = 1.0; // otherwise expect in Vorbis comments. This packet isn't well-formed and players may balk at it. const SPOTIFY_OGG_HEADER_END: u64 = 0xa7; +const LOAD_HANDLES_POISON_MSG: &str = "load handles mutex should not be poisoned"; + pub type PlayerResult = Result<(), Error>; pub struct Player { @@ -2281,11 +2283,11 @@ impl PlayerInternal { let _ = result_tx.send(data); } - let mut load_handles = load_handles_clone.lock(); + let mut load_handles = load_handles_clone.lock().expect(LOAD_HANDLES_POISON_MSG); load_handles.remove(&thread::current().id()); }); - let mut load_handles = self.load_handles.lock(); + let mut load_handles = self.load_handles.lock().expect(LOAD_HANDLES_POISON_MSG); load_handles.insert(load_handle.thread().id(), load_handle); result_rx.map_err(|_| ()) @@ -2320,7 +2322,7 @@ impl Drop for PlayerInternal { let handles: Vec> = { // waiting for the thread while holding the mutex would result in a deadlock - let mut load_handles = self.load_handles.lock(); + let mut load_handles = self.load_handles.lock().expect(LOAD_HANDLES_POISON_MSG); load_handles .drain()