1
0
Fork 0
mirror of https://github.com/librespot-org/librespot.git synced 2025-10-03 09:49:31 +02:00

Change to parking_lot and remove remaining panics

This commit is contained in:
Roderick van Domburg 2021-12-26 22:55:45 +01:00
parent 62461be1fc
commit b4f7a9e35e
No known key found for this signature in database
GPG key ID: A9EF5222A26F0451
13 changed files with 200 additions and 121 deletions

View file

@ -20,11 +20,11 @@ bytes = "1"
chrono = "0.4"
form_urlencoded = "1.0"
futures-core = { version = "0.3", default-features = false }
futures-util = { version = "0.3", default-features = false, features = ["alloc", "bilock", "unstable", "sink"] }
futures-util = { version = "0.3", default-features = false, features = ["alloc", "bilock", "sink", "unstable"] }
hmac = "0.11"
httparse = "1.3"
http = "0.2"
hyper = { version = "0.14", features = ["client", "tcp", "http1", "http2"] }
hyper = { version = "0.14", features = ["client", "http1", "http2", "tcp"] }
hyper-proxy = { version = "0.9.1", default-features = false, features = ["rustls"] }
hyper-rustls = { version = "0.22", default-features = false, features = ["native-tokio"] }
log = "0.4"
@ -34,10 +34,11 @@ num-derive = "0.3"
num-integer = "0.1"
num-traits = "0.2"
once_cell = "1.5.2"
parking_lot = { version = "0.11", features = ["deadlock_detection"] }
pbkdf2 = { version = "0.8", default-features = false, features = ["hmac"] }
priority-queue = "1.1"
protobuf = "2.14.0"
quick-xml = { version = "0.22", features = [ "serialize" ] }
quick-xml = { version = "0.22", features = ["serialize"] }
rand = "0.8"
rustls = "0.19"
rustls-native-certs = "0.5"
@ -46,7 +47,7 @@ serde_json = "1.0"
sha-1 = "0.9"
shannon = "0.2.0"
thiserror = "1.0"
tokio = { version = "1.5", features = ["io-util", "macros", "net", "rt", "time", "sync"] }
tokio = { version = "1.5", features = ["io-util", "macros", "net", "parking_lot", "rt", "sync", "time"] }
tokio-stream = "0.1.1"
tokio-tungstenite = { version = "0.14", default-features = false, features = ["rustls-tls"] }
tokio-util = { version = "0.6", features = ["codec"] }
@ -59,4 +60,4 @@ vergen = "3.0.4"
[dev-dependencies]
env_logger = "0.8"
tokio = {version = "1.0", features = ["macros"] }
tokio = { version = "1.0", features = ["macros", "parking_lot"] }

View file

@ -4,10 +4,11 @@ use std::{
fs::{self, File},
io::{self, Read, Write},
path::{Path, PathBuf},
sync::{Arc, Mutex},
sync::Arc,
time::SystemTime,
};
use parking_lot::Mutex;
use priority_queue::PriorityQueue;
use thiserror::Error;
@ -187,50 +188,42 @@ impl FsSizeLimiter {
}
}
fn add(&self, file: &Path, size: u64) -> Result<(), Error> {
self.limiter
.lock()
.unwrap()
.add(file, size, SystemTime::now());
Ok(())
fn add(&self, file: &Path, size: u64) {
self.limiter.lock().add(file, size, SystemTime::now());
}
fn touch(&self, file: &Path) -> Result<bool, Error> {
Ok(self.limiter.lock().unwrap().update(file, SystemTime::now()))
fn touch(&self, file: &Path) -> bool {
self.limiter.lock().update(file, SystemTime::now())
}
fn remove(&self, file: &Path) -> Result<bool, Error> {
Ok(self.limiter.lock().unwrap().remove(file))
fn remove(&self, file: &Path) -> bool {
self.limiter.lock().remove(file)
}
fn prune_internal<F: FnMut() -> Result<Option<PathBuf>, Error>>(
mut pop: F,
) -> Result<(), Error> {
fn prune_internal<F: FnMut() -> Option<PathBuf>>(mut pop: F) -> Result<(), Error> {
let mut first = true;
let mut count = 0;
let mut last_error = None;
while let Ok(result) = pop() {
if let Some(file) = result {
if first {
debug!("Cache dir exceeds limit, removing least recently used files.");
first = false;
}
let res = fs::remove_file(&file);
if let Err(e) = res {
warn!("Could not remove file {:?} from cache dir: {}", file, e);
last_error = Some(e);
} else {
count += 1;
}
while let Some(file) = pop() {
if first {
debug!("Cache dir exceeds limit, removing least recently used files.");
first = false;
}
if count > 0 {
info!("Removed {} cache files.", count);
let res = fs::remove_file(&file);
if let Err(e) = res {
warn!("Could not remove file {:?} from cache dir: {}", file, e);
last_error = Some(e);
} else {
count += 1;
}
}
if count > 0 {
info!("Removed {} cache files.", count);
}
if let Some(err) = last_error {
Err(err.into())
} else {
@ -239,14 +232,14 @@ impl FsSizeLimiter {
}
fn prune(&self) -> Result<(), Error> {
Self::prune_internal(|| Ok(self.limiter.lock().unwrap().pop()))
Self::prune_internal(|| self.limiter.lock().pop())
}
fn new(path: &Path, limit: u64) -> Result<Self, Error> {
let mut limiter = SizeLimiter::new(limit);
Self::init_dir(&mut limiter, path);
Self::prune_internal(|| Ok(limiter.pop()))?;
Self::prune_internal(|| limiter.pop())?;
Ok(Self {
limiter: Mutex::new(limiter),
@ -388,8 +381,8 @@ impl Cache {
match File::open(&path) {
Ok(file) => {
if let Some(limiter) = self.size_limiter.as_deref() {
if let Err(e) = limiter.touch(&path) {
error!("limiter could not touch {:?}: {}", path, e);
if !limiter.touch(&path) {
error!("limiter could not touch {:?}", path);
}
}
Some(file)
@ -411,8 +404,8 @@ impl Cache {
.and_then(|mut file| io::copy(contents, &mut file))
{
if let Some(limiter) = self.size_limiter.as_deref() {
limiter.add(&path, size)?;
limiter.prune()?
limiter.add(&path, size);
limiter.prune()?;
}
return Ok(());
}
@ -426,7 +419,7 @@ impl Cache {
fs::remove_file(&path)?;
if let Some(limiter) = self.size_limiter.as_deref() {
limiter.remove(&path)?;
limiter.remove(&path);
}
Ok(())

View file

@ -1,20 +1,20 @@
macro_rules! component {
($name:ident : $inner:ident { $($key:ident : $ty:ty = $value:expr,)* }) => {
#[derive(Clone)]
pub struct $name(::std::sync::Arc<($crate::session::SessionWeak, ::std::sync::Mutex<$inner>)>);
pub struct $name(::std::sync::Arc<($crate::session::SessionWeak, ::parking_lot::Mutex<$inner>)>);
impl $name {
#[allow(dead_code)]
pub(crate) fn new(session: $crate::session::SessionWeak) -> $name {
debug!(target:"librespot::component", "new {}", stringify!($name));
$name(::std::sync::Arc::new((session, ::std::sync::Mutex::new($inner {
$name(::std::sync::Arc::new((session, ::parking_lot::Mutex::new($inner {
$($key : $value,)*
}))))
}
#[allow(dead_code)]
fn lock<F: FnOnce(&mut $inner) -> R, R>(&self, f: F) -> R {
let mut inner = (self.0).1.lock().unwrap();
let mut inner = (self.0).1.lock();
f(&mut inner)
}

View file

@ -6,7 +6,7 @@ use std::{
pin::Pin,
sync::{
atomic::{self, AtomicBool},
Arc, Mutex,
Arc,
},
task::Poll,
time::Duration,
@ -14,6 +14,7 @@ use std::{
use futures_core::{Future, Stream};
use futures_util::{future::join_all, SinkExt, StreamExt};
use parking_lot::Mutex;
use thiserror::Error;
use tokio::{
select,
@ -310,7 +311,6 @@ impl DealerShared {
if let Some(split) = split_uri(&msg.uri) {
self.message_handlers
.lock()
.unwrap()
.retain(split, &mut |tx| tx.send(msg.clone()).is_ok());
}
}
@ -330,7 +330,7 @@ impl DealerShared {
};
{
let handler_map = self.request_handlers.lock().unwrap();
let handler_map = self.request_handlers.lock();
if let Some(handler) = handler_map.get(split) {
handler.handle_request(request, responder);
@ -349,7 +349,9 @@ impl DealerShared {
}
async fn closed(&self) {
self.notify_drop.acquire().await.unwrap_err();
if self.notify_drop.acquire().await.is_ok() {
error!("should never have gotten a permit");
}
}
fn is_closed(&self) -> bool {
@ -367,19 +369,15 @@ impl Dealer {
where
H: RequestHandler,
{
add_handler(
&mut self.shared.request_handlers.lock().unwrap(),
uri,
handler,
)
add_handler(&mut self.shared.request_handlers.lock(), uri, handler)
}
pub fn remove_handler(&self, uri: &str) -> Option<Box<dyn RequestHandler>> {
remove_handler(&mut self.shared.request_handlers.lock().unwrap(), uri)
remove_handler(&mut self.shared.request_handlers.lock(), uri)
}
pub fn subscribe(&self, uris: &[&str]) -> Result<Subscription, Error> {
subscribe(&mut self.shared.message_handlers.lock().unwrap(), uris)
subscribe(&mut self.shared.message_handlers.lock(), uris)
}
pub async fn close(mut self) {

View file

@ -6,7 +6,7 @@ use std::{
process::exit,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, RwLock, Weak,
Arc, Weak,
},
task::{Context, Poll},
time::{SystemTime, UNIX_EPOCH},
@ -18,6 +18,7 @@ use futures_core::TryStream;
use futures_util::{future, ready, StreamExt, TryStreamExt};
use num_traits::FromPrimitive;
use once_cell::sync::OnceCell;
use parking_lot::RwLock;
use quick_xml::events::Event;
use thiserror::Error;
use tokio::sync::mpsc;
@ -138,8 +139,7 @@ impl Session {
connection::authenticate(&mut transport, credentials, &session.config().device_id)
.await?;
info!("Authenticated as \"{}\" !", reusable_credentials.username);
session.0.data.write().unwrap().user_data.canonical_username =
reusable_credentials.username.clone();
session.0.data.write().user_data.canonical_username = reusable_credentials.username.clone();
if let Some(cache) = session.cache() {
cache.save_credentials(&reusable_credentials);
}
@ -200,7 +200,7 @@ impl Session {
}
pub fn time_delta(&self) -> i64 {
self.0.data.read().unwrap().time_delta
self.0.data.read().time_delta
}
pub fn spawn<T>(&self, task: T)
@ -253,7 +253,7 @@ impl Session {
}
.as_secs() as i64;
self.0.data.write().unwrap().time_delta = server_timestamp - timestamp;
self.0.data.write().time_delta = server_timestamp - timestamp;
self.debug_info();
self.send_packet(Pong, vec![0, 0, 0, 0])
@ -261,7 +261,7 @@ impl Session {
Some(CountryCode) => {
let country = String::from_utf8(data.as_ref().to_owned())?;
info!("Country: {:?}", country);
self.0.data.write().unwrap().user_data.country = country;
self.0.data.write().user_data.country = country;
Ok(())
}
Some(StreamChunkRes) | Some(ChannelError) => self.channel().dispatch(cmd, data),
@ -306,7 +306,7 @@ impl Session {
trace!("Received product info: {:#?}", user_attributes);
Self::check_catalogue(&user_attributes);
self.0.data.write().unwrap().user_data.attributes = user_attributes;
self.0.data.write().user_data.attributes = user_attributes;
Ok(())
}
Some(PongAck)
@ -335,7 +335,7 @@ impl Session {
}
pub fn user_data(&self) -> UserData {
self.0.data.read().unwrap().user_data.clone()
self.0.data.read().user_data.clone()
}
pub fn device_id(&self) -> &str {
@ -343,21 +343,15 @@ impl Session {
}
pub fn connection_id(&self) -> String {
self.0.data.read().unwrap().connection_id.clone()
self.0.data.read().connection_id.clone()
}
pub fn set_connection_id(&self, connection_id: String) {
self.0.data.write().unwrap().connection_id = connection_id;
self.0.data.write().connection_id = connection_id;
}
pub fn username(&self) -> String {
self.0
.data
.read()
.unwrap()
.user_data
.canonical_username
.clone()
self.0.data.read().user_data.canonical_username.clone()
}
pub fn set_user_attribute(&self, key: &str, value: &str) -> Option<String> {
@ -368,7 +362,6 @@ impl Session {
self.0
.data
.write()
.unwrap()
.user_data
.attributes
.insert(key.to_owned(), value.to_owned())
@ -377,13 +370,7 @@ impl Session {
pub fn set_user_attributes(&self, attributes: UserAttributes) {
Self::check_catalogue(&attributes);
self.0
.data
.write()
.unwrap()
.user_data
.attributes
.extend(attributes)
self.0.data.write().user_data.attributes.extend(attributes)
}
fn weak(&self) -> SessionWeak {
@ -395,14 +382,14 @@ impl Session {
}
pub fn shutdown(&self) {
debug!("Invalidating session[{}]", self.0.session_id);
self.0.data.write().unwrap().invalid = true;
debug!("Invalidating session [{}]", self.0.session_id);
self.0.data.write().invalid = true;
self.mercury().shutdown();
self.channel().shutdown();
}
pub fn is_invalid(&self) -> bool {
self.0.data.read().unwrap().invalid
self.0.data.read().invalid
}
}
@ -415,7 +402,8 @@ impl SessionWeak {
}
pub(crate) fn upgrade(&self) -> Session {
self.try_upgrade().expect("Session died") // TODO
self.try_upgrade()
.expect("session was dropped and so should have this component")
}
}