1
0
Fork 0
mirror of https://github.com/librespot-org/librespot.git synced 2025-10-03 17:59:24 +02:00

refactor: switch from parking_lot to std sync primitives

Replace parking_lot with std::sync::{Mutex, RwLock, Condvar} throughout the
codebase. Update dependencies and code to use poisoning-aware locks, adding
explicit panic messages where necessary. Update governor to use DashMapStateStore
for rate limiting.
This commit is contained in:
Roderick van Domburg 2025-08-20 12:29:53 +02:00
parent 6f473ebc02
commit fba64955c9
No known key found for this signature in database
GPG key ID: 607FA06CB5236AE0
16 changed files with 363 additions and 130 deletions

54
Cargo.lock generated
View file

@ -510,6 +510,20 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "dashmap"
version = "6.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
dependencies = [
"cfg-if",
"crossbeam-utils",
"hashbrown 0.14.5",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]] [[package]]
name = "dasp_sample" name = "dasp_sample"
version = "0.11.0" version = "0.11.0"
@ -729,12 +743,6 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ced73b1dacfc750a6db6c0a0c3a3853c8b41997e2e2c563dc90804ae6867959" checksum = "1ced73b1dacfc750a6db6c0a0c3a3853c8b41997e2e2c563dc90804ae6867959"
[[package]]
name = "fixedbitset"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
[[package]] [[package]]
name = "flate2" name = "flate2"
version = "1.1.2" version = "1.1.2"
@ -1016,6 +1024,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "444405bbb1a762387aa22dd569429533b54a1d8759d35d3b64cb39b0293eaa19" checksum = "444405bbb1a762387aa22dd569429533b54a1d8759d35d3b64cb39b0293eaa19"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"dashmap",
"futures-sink", "futures-sink",
"futures-timer", "futures-timer",
"futures-util", "futures-util",
@ -1169,6 +1178,12 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "hashbrown"
version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
[[package]] [[package]]
name = "hashbrown" name = "hashbrown"
version = "0.15.5" version = "0.15.5"
@ -1884,7 +1899,6 @@ dependencies = [
"hyper-util", "hyper-util",
"librespot-core", "librespot-core",
"log", "log",
"parking_lot",
"tempfile", "tempfile",
"thiserror 2.0.16", "thiserror 2.0.16",
"tokio", "tokio",
@ -1939,7 +1953,6 @@ dependencies = [
"num-derive", "num-derive",
"num-integer", "num-integer",
"num-traits", "num-traits",
"parking_lot",
"pbkdf2", "pbkdf2",
"pin-project-lite", "pin-project-lite",
"priority-queue", "priority-queue",
@ -2044,7 +2057,6 @@ dependencies = [
"librespot-metadata", "librespot-metadata",
"log", "log",
"ogg", "ogg",
"parking_lot",
"portable-atomic", "portable-atomic",
"portaudio-rs", "portaudio-rs",
"rand 0.9.2", "rand 0.9.2",
@ -2586,13 +2598,10 @@ version = "0.9.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc838d2a56b5b1a6c25f55575dfc605fabb63bb2365f6c2353ef9159aa69e4a5" checksum = "bc838d2a56b5b1a6c25f55575dfc605fabb63bb2365f6c2353ef9159aa69e4a5"
dependencies = [ dependencies = [
"backtrace",
"cfg-if", "cfg-if",
"libc", "libc",
"petgraph",
"redox_syscall", "redox_syscall",
"smallvec", "smallvec",
"thread-id",
"windows-targets 0.52.6", "windows-targets 0.52.6",
] ]
@ -2633,16 +2642,6 @@ version = "2.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220"
[[package]]
name = "petgraph"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db"
dependencies = [
"fixedbitset",
"indexmap",
]
[[package]] [[package]]
name = "pin-project-lite" name = "pin-project-lite"
version = "0.2.16" version = "0.2.16"
@ -3776,16 +3775,6 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "thread-id"
version = "4.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfe8f25bbdd100db7e1d34acf7fd2dc59c4bf8f7483f505eaa7d4f12f76cc0ea"
dependencies = [
"libc",
"winapi",
]
[[package]] [[package]]
name = "time" name = "time"
version = "0.3.44" version = "0.3.44"
@ -3855,7 +3844,6 @@ dependencies = [
"io-uring", "io-uring",
"libc", "libc",
"mio", "mio",
"parking_lot",
"pin-project-lite", "pin-project-lite",
"signal-hook-registry", "signal-hook-registry",
"slab", "slab",

View file

@ -179,7 +179,6 @@ tokio = { version = "1", features = [
"macros", "macros",
"signal", "signal",
"sync", "sync",
"parking_lot",
"process", "process",
] } ] }
url = "2.2" url = "2.2"

View file

@ -28,7 +28,6 @@ http-body-util = "0.1"
hyper = { version = "1.6", features = ["http1", "http2"] } hyper = { version = "1.6", features = ["http1", "http2"] }
hyper-util = { version = "0.1", features = ["client", "http2"] } hyper-util = { version = "0.1", features = ["client", "http2"] }
log = "0.4" log = "0.4"
parking_lot = { version = "0.12", features = ["deadlock_detection"] }
tempfile = "3" tempfile = "3"
thiserror = "2" thiserror = "2"
tokio = { version = "1", features = ["macros", "parking_lot", "sync"] } tokio = { version = "1", features = ["macros", "sync"] }

View file

@ -8,13 +8,14 @@ use std::{
Arc, OnceLock, Arc, OnceLock,
atomic::{AtomicBool, AtomicUsize, Ordering}, atomic::{AtomicBool, AtomicUsize, Ordering},
}, },
sync::{Condvar, Mutex},
time::Duration, time::Duration,
}; };
use futures_util::{StreamExt, TryFutureExt, future::IntoStream}; use futures_util::{StreamExt, TryFutureExt, future::IntoStream};
use hyper::{Response, StatusCode, body::Incoming, header::CONTENT_RANGE}; use hyper::{Response, StatusCode, body::Incoming, header::CONTENT_RANGE};
use hyper_util::client::legacy::ResponseFuture; use hyper_util::client::legacy::ResponseFuture;
use parking_lot::{Condvar, Mutex};
use tempfile::NamedTempFile; use tempfile::NamedTempFile;
use thiserror::Error; use thiserror::Error;
use tokio::sync::{Semaphore, mpsc, oneshot}; use tokio::sync::{Semaphore, mpsc, oneshot};
@ -27,6 +28,8 @@ use crate::range_set::{Range, RangeSet};
pub type AudioFileResult = Result<(), librespot_core::Error>; pub type AudioFileResult = Result<(), librespot_core::Error>;
const DOWNLOAD_STATUS_POISON_MSG: &str = "audio download status mutex should not be poisoned";
#[derive(Error, Debug)] #[derive(Error, Debug)]
pub enum AudioFileError { pub enum AudioFileError {
#[error("other end of channel disconnected")] #[error("other end of channel disconnected")]
@ -163,7 +166,10 @@ impl StreamLoaderController {
pub fn range_available(&self, range: Range) -> bool { pub fn range_available(&self, range: Range) -> bool {
if let Some(ref shared) = self.stream_shared { if let Some(ref shared) = self.stream_shared {
let download_status = shared.download_status.lock(); let download_status = shared
.download_status
.lock()
.expect(DOWNLOAD_STATUS_POISON_MSG);
range.length range.length
<= download_status <= download_status
@ -214,19 +220,28 @@ impl StreamLoaderController {
self.fetch(range); self.fetch(range);
if let Some(ref shared) = self.stream_shared { if let Some(ref shared) = self.stream_shared {
let mut download_status = shared.download_status.lock(); let mut download_status = shared
.download_status
.lock()
.expect(DOWNLOAD_STATUS_POISON_MSG);
let download_timeout = AudioFetchParams::get().download_timeout; let download_timeout = AudioFetchParams::get().download_timeout;
while range.length loop {
> download_status if range.length
<= download_status
.downloaded .downloaded
.contained_length_from_value(range.start) .contained_length_from_value(range.start)
{ {
if shared break;
}
let (new_download_status, wait_result) = shared
.cond .cond
.wait_for(&mut download_status, download_timeout) .wait_timeout(download_status, download_timeout)
.timed_out() .expect(DOWNLOAD_STATUS_POISON_MSG);
{
download_status = new_download_status;
if wait_result.timed_out() {
return Err(AudioFileError::WaitTimeout.into()); return Err(AudioFileError::WaitTimeout.into());
} }
@ -558,7 +573,11 @@ impl Read for AudioFileStreaming {
let mut ranges_to_request = RangeSet::new(); let mut ranges_to_request = RangeSet::new();
ranges_to_request.add_range(&Range::new(offset, length_to_request)); ranges_to_request.add_range(&Range::new(offset, length_to_request));
let mut download_status = self.shared.download_status.lock(); let mut download_status = self
.shared
.download_status
.lock()
.expect(DOWNLOAD_STATUS_POISON_MSG);
ranges_to_request.subtract_range_set(&download_status.downloaded); ranges_to_request.subtract_range_set(&download_status.downloaded);
ranges_to_request.subtract_range_set(&download_status.requested); ranges_to_request.subtract_range_set(&download_status.requested);
@ -571,12 +590,14 @@ impl Read for AudioFileStreaming {
let download_timeout = AudioFetchParams::get().download_timeout; let download_timeout = AudioFetchParams::get().download_timeout;
while !download_status.downloaded.contains(offset) { while !download_status.downloaded.contains(offset) {
if self let (new_download_status, wait_result) = self
.shared .shared
.cond .cond
.wait_for(&mut download_status, download_timeout) .wait_timeout(download_status, download_timeout)
.timed_out() .expect(DOWNLOAD_STATUS_POISON_MSG);
{
download_status = new_download_status;
if wait_result.timed_out() {
return Err(io::Error::new( return Err(io::Error::new(
io::ErrorKind::TimedOut, io::ErrorKind::TimedOut,
Error::deadline_exceeded(AudioFileError::WaitTimeout), Error::deadline_exceeded(AudioFileError::WaitTimeout),
@ -619,6 +640,7 @@ impl Seek for AudioFileStreaming {
.shared .shared
.download_status .download_status
.lock() .lock()
.expect(DOWNLOAD_STATUS_POISON_MSG)
.downloaded .downloaded
.contains(requested_pos as usize); .contains(requested_pos as usize);

View file

@ -33,6 +33,7 @@ enum ReceivedData {
} }
const ONE_SECOND: Duration = Duration::from_secs(1); const ONE_SECOND: Duration = Duration::from_secs(1);
const DOWNLOAD_STATUS_POISON_MSG: &str = "audio download status mutex should not be poisoned";
async fn receive_data( async fn receive_data(
shared: Arc<AudioFileShared>, shared: Arc<AudioFileShared>,
@ -124,7 +125,10 @@ async fn receive_data(
if bytes_remaining > 0 { if bytes_remaining > 0 {
{ {
let missing_range = Range::new(offset, bytes_remaining); let missing_range = Range::new(offset, bytes_remaining);
let mut download_status = shared.download_status.lock(); let mut download_status = shared
.download_status
.lock()
.expect(DOWNLOAD_STATUS_POISON_MSG);
download_status.requested.subtract_range(&missing_range); download_status.requested.subtract_range(&missing_range);
shared.cond.notify_all(); shared.cond.notify_all();
} }
@ -189,7 +193,11 @@ impl AudioFileFetch {
// The iteration that follows spawns streamers fast, without awaiting them, // The iteration that follows spawns streamers fast, without awaiting them,
// so holding the lock for the entire scope of this function should be faster // so holding the lock for the entire scope of this function should be faster
// then locking and unlocking multiple times. // then locking and unlocking multiple times.
let mut download_status = self.shared.download_status.lock(); let mut download_status = self
.shared
.download_status
.lock()
.expect(DOWNLOAD_STATUS_POISON_MSG);
ranges_to_request.subtract_range_set(&download_status.downloaded); ranges_to_request.subtract_range_set(&download_status.downloaded);
ranges_to_request.subtract_range_set(&download_status.requested); ranges_to_request.subtract_range_set(&download_status.requested);
@ -227,7 +235,11 @@ impl AudioFileFetch {
let mut missing_data = RangeSet::new(); let mut missing_data = RangeSet::new();
missing_data.add_range(&Range::new(0, self.shared.file_size)); missing_data.add_range(&Range::new(0, self.shared.file_size));
{ {
let download_status = self.shared.download_status.lock(); let download_status = self
.shared
.download_status
.lock()
.expect(DOWNLOAD_STATUS_POISON_MSG);
missing_data.subtract_range_set(&download_status.downloaded); missing_data.subtract_range_set(&download_status.downloaded);
missing_data.subtract_range_set(&download_status.requested); missing_data.subtract_range_set(&download_status.requested);
} }
@ -349,7 +361,11 @@ impl AudioFileFetch {
let received_range = Range::new(data.offset, data.data.len()); let received_range = Range::new(data.offset, data.data.len());
let full = { let full = {
let mut download_status = self.shared.download_status.lock(); let mut download_status = self
.shared
.download_status
.lock()
.expect(DOWNLOAD_STATUS_POISON_MSG);
download_status.downloaded.add_range(&received_range); download_status.downloaded.add_range(&received_range);
self.shared.cond.notify_all(); self.shared.cond.notify_all();
@ -415,7 +431,10 @@ pub(super) async fn audio_file_fetch(
initial_request.offset + initial_request.length, initial_request.offset + initial_request.length,
); );
let mut download_status = shared.download_status.lock(); let mut download_status = shared
.download_status
.lock()
.expect(DOWNLOAD_STATUS_POISON_MSG);
download_status.requested.add_range(&requested_range); download_status.requested.add_range(&requested_range);
} }
@ -466,7 +485,11 @@ pub(super) async fn audio_file_fetch(
if fetch.shared.is_download_streaming() && fetch.has_download_slots_available() { if fetch.shared.is_download_streaming() && fetch.has_download_slots_available() {
let bytes_pending: usize = { let bytes_pending: usize = {
let download_status = fetch.shared.download_status.lock(); let download_status = fetch
.shared
.download_status
.lock()
.expect(DOWNLOAD_STATUS_POISON_MSG);
download_status download_status
.requested .requested

View file

@ -28,6 +28,6 @@ protobuf = "3.7"
rand = { version = "0.9", default-features = false, features = ["small_rng"] } rand = { version = "0.9", default-features = false, features = ["small_rng"] }
serde_json = "1.0" serde_json = "1.0"
thiserror = "2" thiserror = "2"
tokio = { version = "1", features = ["macros", "parking_lot", "sync"] } tokio = { version = "1", features = ["macros", "sync"] }
tokio-stream = { version = "0.1", default-features = false } tokio-stream = { version = "0.1", default-features = false }
uuid = { version = "1.18", default-features = false, features = ["v4"] } uuid = { version = "1.18", default-features = false, features = ["v4"] }

View file

@ -56,7 +56,7 @@ futures-util = { version = "0.3", default-features = false, features = [
"bilock", "bilock",
"unstable", "unstable",
] } ] }
governor = { version = "0.10", default-features = false, features = ["std"] } governor = { version = "0.10", default-features = false, features = ["std", "dashmap"] }
hmac = "0.12" hmac = "0.12"
httparse = "1.10" httparse = "1.10"
http = "1.3" http = "1.3"
@ -80,7 +80,6 @@ num-bigint = "0.4"
num-derive = "0.4" num-derive = "0.4"
num-integer = "0.1" num-integer = "0.1"
num-traits = "0.2" num-traits = "0.2"
parking_lot = { version = "0.12", features = ["deadlock_detection"] }
pbkdf2 = { version = "0.12", default-features = false, features = ["hmac"] } pbkdf2 = { version = "0.12", default-features = false, features = ["hmac"] }
pin-project-lite = "0.2" pin-project-lite = "0.2"
priority-queue = "2.5" priority-queue = "2.5"
@ -100,7 +99,6 @@ tokio = { version = "1", features = [
"io-util", "io-util",
"macros", "macros",
"net", "net",
"parking_lot",
"rt", "rt",
"sync", "sync",
"time", "time",
@ -119,4 +117,4 @@ vergen-gitcl = { version = "1.0", default-features = false, features = [
] } ] }
[dev-dependencies] [dev-dependencies]
tokio = { version = "1", features = ["macros", "parking_lot"] } tokio = { version = "1", features = ["macros"] }

View file

@ -4,16 +4,17 @@ use std::{
fs::{self, File}, fs::{self, File},
io::{self, Read, Write}, io::{self, Read, Write},
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::Arc, sync::{Arc, Mutex},
time::SystemTime, time::SystemTime,
}; };
use parking_lot::Mutex;
use priority_queue::PriorityQueue; use priority_queue::PriorityQueue;
use thiserror::Error; use thiserror::Error;
use crate::{Error, FileId, authentication::Credentials, error::ErrorKind}; use crate::{Error, FileId, authentication::Credentials, error::ErrorKind};
const CACHE_LIMITER_POISON_MSG: &str = "cache limiter mutex should not be poisoned";
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum CacheError { pub enum CacheError {
#[error("audio cache location is not configured")] #[error("audio cache location is not configured")]
@ -189,15 +190,24 @@ impl FsSizeLimiter {
} }
fn add(&self, file: &Path, size: u64) { fn add(&self, file: &Path, size: u64) {
self.limiter.lock().add(file, size, SystemTime::now()) self.limiter
.lock()
.expect(CACHE_LIMITER_POISON_MSG)
.add(file, size, SystemTime::now())
} }
fn touch(&self, file: &Path) -> bool { fn touch(&self, file: &Path) -> bool {
self.limiter.lock().update(file, SystemTime::now()) self.limiter
.lock()
.expect(CACHE_LIMITER_POISON_MSG)
.update(file, SystemTime::now())
} }
fn remove(&self, file: &Path) -> bool { fn remove(&self, file: &Path) -> bool {
self.limiter.lock().remove(file) self.limiter
.lock()
.expect(CACHE_LIMITER_POISON_MSG)
.remove(file)
} }
fn prune_internal<F: FnMut() -> Option<PathBuf>>(mut pop: F) -> Result<(), Error> { fn prune_internal<F: FnMut() -> Option<PathBuf>>(mut pop: F) -> Result<(), Error> {
@ -232,7 +242,7 @@ impl FsSizeLimiter {
} }
fn prune(&self) -> Result<(), Error> { fn prune(&self) -> Result<(), Error> {
Self::prune_internal(|| self.limiter.lock().pop()) Self::prune_internal(|| self.limiter.lock().expect(CACHE_LIMITER_POISON_MSG).pop())
} }
fn new(path: &Path, limit: u64) -> Result<Self, Error> { fn new(path: &Path, limit: u64) -> Result<Self, Error> {

View file

@ -1,20 +1,23 @@
pub(crate) const COMPONENT_POISON_MSG: &str = "component mutex should not be poisoned";
macro_rules! component { macro_rules! component {
($name:ident : $inner:ident { $($key:ident : $ty:ty = $value:expr,)* }) => { ($name:ident : $inner:ident { $($key:ident : $ty:ty = $value:expr,)* }) => {
#[derive(Clone)] #[derive(Clone)]
pub struct $name(::std::sync::Arc<($crate::session::SessionWeak, ::parking_lot::Mutex<$inner>)>); pub struct $name(::std::sync::Arc<($crate::session::SessionWeak, ::std::sync::Mutex<$inner>)>);
impl $name { impl $name {
#[allow(dead_code)] #[allow(dead_code)]
pub(crate) fn new(session: $crate::session::SessionWeak) -> $name { pub(crate) fn new(session: $crate::session::SessionWeak) -> $name {
debug!(target:"librespot::component", "new {}", stringify!($name)); debug!(target:"librespot::component", "new {}", stringify!($name));
$name(::std::sync::Arc::new((session, ::parking_lot::Mutex::new($inner { $name(::std::sync::Arc::new((session, ::std::sync::Mutex::new($inner {
$($key : $value,)* $($key : $value,)*
})))) }))))
} }
#[allow(dead_code)] #[allow(dead_code)]
fn lock<F: FnOnce(&mut $inner) -> R, R>(&self, f: F) -> R { fn lock<F: FnOnce(&mut $inner) -> R, R>(&self, f: F) -> R {
let mut inner = (self.0).1.lock(); let mut inner = (self.0).1.lock()
.expect($crate::component::COMPONENT_POISON_MSG);
f(&mut inner) f(&mut inner)
} }

View file

@ -6,7 +6,7 @@ use std::{
iter, iter,
pin::Pin, pin::Pin,
sync::{ sync::{
Arc, Arc, Mutex,
atomic::{self, AtomicBool}, atomic::{self, AtomicBool},
}, },
task::Poll, task::Poll,
@ -15,7 +15,6 @@ use std::{
use futures_core::{Future, Stream}; use futures_core::{Future, Stream};
use futures_util::{SinkExt, StreamExt, future::join_all}; use futures_util::{SinkExt, StreamExt, future::join_all};
use parking_lot::Mutex;
use thiserror::Error; use thiserror::Error;
use tokio::{ use tokio::{
select, select,
@ -57,6 +56,11 @@ const PING_TIMEOUT: Duration = Duration::from_secs(3);
const RECONNECT_INTERVAL: Duration = Duration::from_secs(10); const RECONNECT_INTERVAL: Duration = Duration::from_secs(10);
const DEALER_REQUEST_HANDLERS_POISON_MSG: &str =
"dealer request handlers mutex should not be poisoned";
const DEALER_MESSAGE_HANDLERS_POISON_MSG: &str =
"dealer message handlers mutex should not be poisoned";
struct Response { struct Response {
pub success: bool, pub success: bool,
} }
@ -350,6 +354,7 @@ impl DealerShared {
if self if self
.message_handlers .message_handlers
.lock() .lock()
.expect(DEALER_MESSAGE_HANDLERS_POISON_MSG)
.retain(split, &mut |tx| tx.send(msg.clone()).is_ok()) .retain(split, &mut |tx| tx.send(msg.clone()).is_ok())
{ {
return; return;
@ -387,7 +392,10 @@ impl DealerShared {
return; return;
}; };
let handler_map = self.request_handlers.lock(); let handler_map = self
.request_handlers
.lock()
.expect(DEALER_REQUEST_HANDLERS_POISON_MSG);
if let Some(handler) = handler_map.get(split) { if let Some(handler) = handler_map.get(split) {
handler.handle_request(payload_request, responder); handler.handle_request(payload_request, responder);
@ -425,21 +433,51 @@ impl Dealer {
where where
H: RequestHandler, H: RequestHandler,
{ {
add_handler(&mut self.shared.request_handlers.lock(), uri, handler) add_handler(
&mut self
.shared
.request_handlers
.lock()
.expect(DEALER_REQUEST_HANDLERS_POISON_MSG),
uri,
handler,
)
} }
pub fn remove_handler(&self, uri: &str) -> Option<Box<dyn RequestHandler>> { pub fn remove_handler(&self, uri: &str) -> Option<Box<dyn RequestHandler>> {
remove_handler(&mut self.shared.request_handlers.lock(), uri) remove_handler(
&mut self
.shared
.request_handlers
.lock()
.expect(DEALER_REQUEST_HANDLERS_POISON_MSG),
uri,
)
} }
pub fn subscribe(&self, uris: &[&str]) -> Result<Subscription, Error> { pub fn subscribe(&self, uris: &[&str]) -> Result<Subscription, Error> {
subscribe(&mut self.shared.message_handlers.lock(), uris) subscribe(
&mut self
.shared
.message_handlers
.lock()
.expect(DEALER_MESSAGE_HANDLERS_POISON_MSG),
uris,
)
} }
pub fn handles(&self, uri: &str) -> bool { pub fn handles(&self, uri: &str) -> bool {
handles( handles(
&self.shared.request_handlers.lock(), &self
&self.shared.message_handlers.lock(), .shared
.request_handlers
.lock()
.expect(DEALER_REQUEST_HANDLERS_POISON_MSG),
&self
.shared
.message_handlers
.lock()
.expect(DEALER_MESSAGE_HANDLERS_POISON_MSG),
uri, uri,
) )
} }

View file

@ -1,5 +1,4 @@
use std::{ use std::{
collections::HashMap,
sync::OnceLock, sync::OnceLock,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
@ -7,7 +6,8 @@ use std::{
use bytes::Bytes; use bytes::Bytes;
use futures_util::{FutureExt, future::IntoStream}; use futures_util::{FutureExt, future::IntoStream};
use governor::{ use governor::{
Quota, RateLimiter, clock::MonotonicClock, middleware::NoOpMiddleware, state::InMemoryState, Quota, RateLimiter, clock::MonotonicClock, middleware::NoOpMiddleware,
state::keyed::DashMapStateStore,
}; };
use http::{Uri, header::HeaderValue}; use http::{Uri, header::HeaderValue};
use http_body_util::{BodyExt, Full}; use http_body_util::{BodyExt, Full};
@ -18,7 +18,6 @@ use hyper_util::{
rt::TokioExecutor, rt::TokioExecutor,
}; };
use nonzero_ext::nonzero; use nonzero_ext::nonzero;
use parking_lot::Mutex;
use thiserror::Error; use thiserror::Error;
use url::Url; use url::Url;
@ -102,8 +101,7 @@ pub struct HttpClient {
// while the DashMap variant is more performant, our level of concurrency // while the DashMap variant is more performant, our level of concurrency
// is pretty low so we can save pulling in that extra dependency // is pretty low so we can save pulling in that extra dependency
rate_limiter: rate_limiter: RateLimiter<String, DashMapStateStore<String>, MonotonicClock, NoOpMiddleware>,
RateLimiter<String, Mutex<HashMap<String, InMemoryState>>, MonotonicClock, NoOpMiddleware>,
} }
impl HttpClient { impl HttpClient {

View file

@ -4,8 +4,7 @@ use std::{
io, io,
pin::Pin, pin::Pin,
process::exit, process::exit,
sync::OnceLock, sync::{Arc, OnceLock, RwLock, Weak},
sync::{Arc, Weak},
task::{Context, Poll}, task::{Context, Poll},
time::{Duration, SystemTime, UNIX_EPOCH}, time::{Duration, SystemTime, UNIX_EPOCH},
}; };
@ -34,7 +33,6 @@ use futures_core::TryStream;
use futures_util::StreamExt; use futures_util::StreamExt;
use librespot_protocol::authentication::AuthenticationType; use librespot_protocol::authentication::AuthenticationType;
use num_traits::FromPrimitive; use num_traits::FromPrimitive;
use parking_lot::RwLock;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use quick_xml::events::Event; use quick_xml::events::Event;
use thiserror::Error; use thiserror::Error;
@ -45,6 +43,8 @@ use tokio::{
use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_stream::wrappers::UnboundedReceiverStream;
use uuid::Uuid; use uuid::Uuid;
const SESSION_DATA_POISON_MSG: &str = "session data rwlock should not be poisoned";
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum SessionError { pub enum SessionError {
#[error(transparent)] #[error(transparent)]
@ -338,7 +338,11 @@ impl Session {
} }
pub fn time_delta(&self) -> i64 { pub fn time_delta(&self) -> i64 {
self.0.data.read().time_delta self.0
.data
.read()
.expect(SESSION_DATA_POISON_MSG)
.time_delta
} }
pub fn spawn<T>(&self, task: T) pub fn spawn<T>(&self, task: T)
@ -388,15 +392,32 @@ impl Session {
// you need more fields at once, in which case this can spare multiple `read` // you need more fields at once, in which case this can spare multiple `read`
// locks. // locks.
pub fn user_data(&self) -> UserData { pub fn user_data(&self) -> UserData {
self.0.data.read().user_data.clone() self.0
.data
.read()
.expect(SESSION_DATA_POISON_MSG)
.user_data
.clone()
} }
pub fn session_id(&self) -> String { pub fn session_id(&self) -> String {
self.0.data.read().session_id.clone() self.0
.data
.read()
.expect(SESSION_DATA_POISON_MSG)
.session_id
.clone()
} }
pub fn set_session_id(&self, session_id: &str) { pub fn set_session_id(&self, session_id: &str) {
session_id.clone_into(&mut self.0.data.write().session_id); session_id.clone_into(
&mut self
.0
.data
.write()
.expect(SESSION_DATA_POISON_MSG)
.session_id,
);
} }
pub fn device_id(&self) -> &str { pub fn device_id(&self) -> &str {
@ -404,63 +425,155 @@ impl Session {
} }
pub fn client_id(&self) -> String { pub fn client_id(&self) -> String {
self.0.data.read().client_id.clone() self.0
.data
.read()
.expect(SESSION_DATA_POISON_MSG)
.client_id
.clone()
} }
pub fn set_client_id(&self, client_id: &str) { pub fn set_client_id(&self, client_id: &str) {
client_id.clone_into(&mut self.0.data.write().client_id); client_id.clone_into(
&mut self
.0
.data
.write()
.expect(SESSION_DATA_POISON_MSG)
.client_id,
);
} }
pub fn client_name(&self) -> String { pub fn client_name(&self) -> String {
self.0.data.read().client_name.clone() self.0
.data
.read()
.expect(SESSION_DATA_POISON_MSG)
.client_name
.clone()
} }
pub fn set_client_name(&self, client_name: &str) { pub fn set_client_name(&self, client_name: &str) {
client_name.clone_into(&mut self.0.data.write().client_name); client_name.clone_into(
&mut self
.0
.data
.write()
.expect(SESSION_DATA_POISON_MSG)
.client_name,
);
} }
pub fn client_brand_name(&self) -> String { pub fn client_brand_name(&self) -> String {
self.0.data.read().client_brand_name.clone() self.0
.data
.read()
.expect(SESSION_DATA_POISON_MSG)
.client_brand_name
.clone()
} }
pub fn set_client_brand_name(&self, client_brand_name: &str) { pub fn set_client_brand_name(&self, client_brand_name: &str) {
client_brand_name.clone_into(&mut self.0.data.write().client_brand_name); client_brand_name.clone_into(
&mut self
.0
.data
.write()
.expect(SESSION_DATA_POISON_MSG)
.client_brand_name,
);
} }
pub fn client_model_name(&self) -> String { pub fn client_model_name(&self) -> String {
self.0.data.read().client_model_name.clone() self.0
.data
.read()
.expect(SESSION_DATA_POISON_MSG)
.client_model_name
.clone()
} }
pub fn set_client_model_name(&self, client_model_name: &str) { pub fn set_client_model_name(&self, client_model_name: &str) {
client_model_name.clone_into(&mut self.0.data.write().client_model_name); client_model_name.clone_into(
&mut self
.0
.data
.write()
.expect(SESSION_DATA_POISON_MSG)
.client_model_name,
);
} }
pub fn connection_id(&self) -> String { pub fn connection_id(&self) -> String {
self.0.data.read().connection_id.clone() self.0
.data
.read()
.expect(SESSION_DATA_POISON_MSG)
.connection_id
.clone()
} }
pub fn set_connection_id(&self, connection_id: &str) { pub fn set_connection_id(&self, connection_id: &str) {
connection_id.clone_into(&mut self.0.data.write().connection_id); connection_id.clone_into(
&mut self
.0
.data
.write()
.expect(SESSION_DATA_POISON_MSG)
.connection_id,
);
} }
pub fn username(&self) -> String { pub fn username(&self) -> String {
self.0.data.read().user_data.canonical_username.clone() self.0
.data
.read()
.expect(SESSION_DATA_POISON_MSG)
.user_data
.canonical_username
.clone()
} }
pub fn set_username(&self, username: &str) { pub fn set_username(&self, username: &str) {
username.clone_into(&mut self.0.data.write().user_data.canonical_username); username.clone_into(
&mut self
.0
.data
.write()
.expect(SESSION_DATA_POISON_MSG)
.user_data
.canonical_username,
);
} }
pub fn auth_data(&self) -> Vec<u8> { pub fn auth_data(&self) -> Vec<u8> {
self.0.data.read().auth_data.clone() self.0
.data
.read()
.expect(SESSION_DATA_POISON_MSG)
.auth_data
.clone()
} }
pub fn set_auth_data(&self, auth_data: &[u8]) { pub fn set_auth_data(&self, auth_data: &[u8]) {
auth_data.clone_into(&mut self.0.data.write().auth_data); auth_data.clone_into(
&mut self
.0
.data
.write()
.expect(SESSION_DATA_POISON_MSG)
.auth_data,
);
} }
pub fn country(&self) -> String { pub fn country(&self) -> String {
self.0.data.read().user_data.country.clone() self.0
.data
.read()
.expect(SESSION_DATA_POISON_MSG)
.user_data
.country
.clone()
} }
pub fn filter_explicit_content(&self) -> bool { pub fn filter_explicit_content(&self) -> bool {
@ -489,6 +602,7 @@ impl Session {
self.0 self.0
.data .data
.write() .write()
.expect(SESSION_DATA_POISON_MSG)
.user_data .user_data
.attributes .attributes
.insert(key.to_owned(), value.to_owned()) .insert(key.to_owned(), value.to_owned())
@ -497,11 +611,24 @@ impl Session {
pub fn set_user_attributes(&self, attributes: UserAttributes) { pub fn set_user_attributes(&self, attributes: UserAttributes) {
Self::check_catalogue(&attributes); Self::check_catalogue(&attributes);
self.0.data.write().user_data.attributes.extend(attributes) self.0
.data
.write()
.expect(SESSION_DATA_POISON_MSG)
.user_data
.attributes
.extend(attributes)
} }
pub fn get_user_attribute(&self, key: &str) -> Option<String> { pub fn get_user_attribute(&self, key: &str) -> Option<String> {
self.0.data.read().user_data.attributes.get(key).cloned() self.0
.data
.read()
.expect(SESSION_DATA_POISON_MSG)
.user_data
.attributes
.get(key)
.cloned()
} }
fn weak(&self) -> SessionWeak { fn weak(&self) -> SessionWeak {
@ -510,13 +637,13 @@ impl Session {
pub fn shutdown(&self) { pub fn shutdown(&self) {
debug!("Shutdown: Invalidating session"); debug!("Shutdown: Invalidating session");
self.0.data.write().invalid = true; self.0.data.write().expect(SESSION_DATA_POISON_MSG).invalid = true;
self.mercury().shutdown(); self.mercury().shutdown();
self.channel().shutdown(); self.channel().shutdown();
} }
pub fn is_invalid(&self) -> bool { pub fn is_invalid(&self) -> bool {
self.0.data.read().invalid self.0.data.read().expect(SESSION_DATA_POISON_MSG).invalid
} }
} }
@ -643,7 +770,7 @@ where
.unwrap_or(Duration::ZERO) .unwrap_or(Duration::ZERO)
.as_secs() as i64; .as_secs() as i64;
{ {
let mut data = session.0.data.write(); let mut data = session.0.data.write().expect(SESSION_DATA_POISON_MSG);
data.time_delta = server_timestamp.saturating_sub(timestamp); data.time_delta = server_timestamp.saturating_sub(timestamp);
} }
@ -668,7 +795,13 @@ where
Some(CountryCode) => { Some(CountryCode) => {
let country = String::from_utf8(data.as_ref().to_owned())?; let country = String::from_utf8(data.as_ref().to_owned())?;
info!("Country: {country:?}"); info!("Country: {country:?}");
session.0.data.write().user_data.country = country; session
.0
.data
.write()
.expect(SESSION_DATA_POISON_MSG)
.user_data
.country = country;
Ok(()) Ok(())
} }
Some(StreamChunkRes) | Some(ChannelError) => session.channel().dispatch(cmd, data), Some(StreamChunkRes) | Some(ChannelError) => session.channel().dispatch(cmd, data),
@ -713,7 +846,13 @@ where
trace!("Received product info: {user_attributes:#?}"); trace!("Received product info: {user_attributes:#?}");
Session::check_catalogue(&user_attributes); Session::check_catalogue(&user_attributes);
session.0.data.write().user_data.attributes = user_attributes; session
.0
.data
.write()
.expect(SESSION_DATA_POISON_MSG)
.user_data
.attributes = user_attributes;
Ok(()) Ok(())
} }
Some(SecretBlock) Some(SecretBlock)

View file

@ -51,7 +51,7 @@ serde_repr = "0.1"
serde_json = "1.0" serde_json = "1.0"
sha1 = "0.10" sha1 = "0.10"
thiserror = "2" thiserror = "2"
tokio = { version = "1", features = ["parking_lot", "sync", "rt"] } tokio = { version = "1", features = ["sync", "rt"] }
zbus = { version = "5", default-features = false, features = [ zbus = { version = "5", default-features = false, features = [
"tokio", "tokio",
], optional = true } ], optional = true }
@ -59,4 +59,4 @@ zbus = { version = "5", default-features = false, features = [
[dev-dependencies] [dev-dependencies]
futures = "0.3" futures = "0.3"
hex = "0.4" hex = "0.4"
tokio = { version = "1", features = ["macros", "parking_lot", "rt"] } tokio = { version = "1", features = ["macros", "rt"] }

View file

@ -53,11 +53,10 @@ librespot-metadata = { version = "0.7.1", path = "../metadata", default-features
futures-util = { version = "0.3", default-features = false, features = ["std"] } futures-util = { version = "0.3", default-features = false, features = ["std"] }
log = "0.4" log = "0.4"
parking_lot = { version = "0.12", features = ["deadlock_detection"] }
portable-atomic = "1" portable-atomic = "1"
shell-words = "1.1" shell-words = "1.1"
thiserror = "2" thiserror = "2"
tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "sync"] } tokio = { version = "1", features = ["rt-multi-thread", "sync"] }
zerocopy = { version = "0.8", features = ["derive"] } zerocopy = { version = "0.8", features = ["derive"] }
# Backends # Backends

View file

@ -1,3 +1,5 @@
use std::sync::{Arc, Mutex};
use gstreamer::{ use gstreamer::{
State, State,
event::{FlushStart, FlushStop}, event::{FlushStart, FlushStop},
@ -8,8 +10,7 @@ use gstreamer as gst;
use gstreamer_app as gst_app; use gstreamer_app as gst_app;
use gstreamer_audio as gst_audio; use gstreamer_audio as gst_audio;
use parking_lot::Mutex; const GSTREAMER_ASYNC_ERROR_POISON_MSG: &str = "gstreamer async error mutex should not be poisoned";
use std::sync::Arc;
use super::{Open, Sink, SinkAsBytes, SinkError, SinkResult}; use super::{Open, Sink, SinkAsBytes, SinkError, SinkResult};
@ -97,7 +98,9 @@ impl Open for GstreamerSink {
gst::MessageView::Eos(_) => { gst::MessageView::Eos(_) => {
println!("gst signaled end of stream"); println!("gst signaled end of stream");
let mut async_error_storage = async_error_clone.lock(); let mut async_error_storage = async_error_clone
.lock()
.expect(GSTREAMER_ASYNC_ERROR_POISON_MSG);
*async_error_storage = Some(String::from("gst signaled end of stream")); *async_error_storage = Some(String::from("gst signaled end of stream"));
} }
gst::MessageView::Error(err) => { gst::MessageView::Error(err) => {
@ -108,7 +111,9 @@ impl Open for GstreamerSink {
err.debug() err.debug()
); );
let mut async_error_storage = async_error_clone.lock(); let mut async_error_storage = async_error_clone
.lock()
.expect(GSTREAMER_ASYNC_ERROR_POISON_MSG);
*async_error_storage = Some(format!( *async_error_storage = Some(format!(
"Error from {:?}: {} ({:?})", "Error from {:?}: {} ({:?})",
err.src().map(|s| s.path_string()), err.src().map(|s| s.path_string()),
@ -138,7 +143,10 @@ impl Open for GstreamerSink {
impl Sink for GstreamerSink { impl Sink for GstreamerSink {
fn start(&mut self) -> SinkResult<()> { fn start(&mut self) -> SinkResult<()> {
*self.async_error.lock() = None; *self
.async_error
.lock()
.expect(GSTREAMER_ASYNC_ERROR_POISON_MSG) = None;
self.appsrc.send_event(FlushStop::new(true)); self.appsrc.send_event(FlushStop::new(true));
self.bufferpool self.bufferpool
.set_active(true) .set_active(true)
@ -150,7 +158,10 @@ impl Sink for GstreamerSink {
} }
fn stop(&mut self) -> SinkResult<()> { fn stop(&mut self) -> SinkResult<()> {
*self.async_error.lock() = None; *self
.async_error
.lock()
.expect(GSTREAMER_ASYNC_ERROR_POISON_MSG) = None;
self.appsrc.send_event(FlushStart::new()); self.appsrc.send_event(FlushStart::new());
self.pipeline self.pipeline
.set_state(State::Paused) .set_state(State::Paused)
@ -173,7 +184,11 @@ impl Drop for GstreamerSink {
impl SinkAsBytes for GstreamerSink { impl SinkAsBytes for GstreamerSink {
#[inline] #[inline]
fn write_bytes(&mut self, data: &[u8]) -> SinkResult<()> { fn write_bytes(&mut self, data: &[u8]) -> SinkResult<()> {
if let Some(async_error) = &*self.async_error.lock() { if let Some(async_error) = &*self
.async_error
.lock()
.expect(GSTREAMER_ASYNC_ERROR_POISON_MSG)
{
return Err(SinkError::OnWrite(async_error.to_string())); return Err(SinkError::OnWrite(async_error.to_string()));
} }

View file

@ -6,6 +6,7 @@ use std::{
mem, mem,
pin::Pin, pin::Pin,
process::exit, process::exit,
sync::Mutex,
sync::{ sync::{
Arc, Arc,
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
@ -32,7 +33,6 @@ use futures_util::{
stream::futures_unordered::FuturesUnordered, stream::futures_unordered::FuturesUnordered,
}; };
use librespot_metadata::track::Tracks; use librespot_metadata::track::Tracks;
use parking_lot::Mutex;
use symphonia::core::io::MediaSource; use symphonia::core::io::MediaSource;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
@ -46,6 +46,8 @@ pub const PCM_AT_0DBFS: f64 = 1.0;
// otherwise expect in Vorbis comments. This packet isn't well-formed and players may balk at it. // otherwise expect in Vorbis comments. This packet isn't well-formed and players may balk at it.
const SPOTIFY_OGG_HEADER_END: u64 = 0xa7; const SPOTIFY_OGG_HEADER_END: u64 = 0xa7;
const LOAD_HANDLES_POISON_MSG: &str = "load handles mutex should not be poisoned";
pub type PlayerResult = Result<(), Error>; pub type PlayerResult = Result<(), Error>;
pub struct Player { pub struct Player {
@ -2281,11 +2283,11 @@ impl PlayerInternal {
let _ = result_tx.send(data); let _ = result_tx.send(data);
} }
let mut load_handles = load_handles_clone.lock(); let mut load_handles = load_handles_clone.lock().expect(LOAD_HANDLES_POISON_MSG);
load_handles.remove(&thread::current().id()); load_handles.remove(&thread::current().id());
}); });
let mut load_handles = self.load_handles.lock(); let mut load_handles = self.load_handles.lock().expect(LOAD_HANDLES_POISON_MSG);
load_handles.insert(load_handle.thread().id(), load_handle); load_handles.insert(load_handle.thread().id(), load_handle);
result_rx.map_err(|_| ()) result_rx.map_err(|_| ())
@ -2320,7 +2322,7 @@ impl Drop for PlayerInternal {
let handles: Vec<thread::JoinHandle<()>> = { let handles: Vec<thread::JoinHandle<()>> = {
// waiting for the thread while holding the mutex would result in a deadlock // waiting for the thread while holding the mutex would result in a deadlock
let mut load_handles = self.load_handles.lock(); let mut load_handles = self.load_handles.lock().expect(LOAD_HANDLES_POISON_MSG);
load_handles load_handles
.drain() .drain()