diff --git a/Cargo.lock b/Cargo.lock index 50778598..dfb1aa95 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -727,6 +727,12 @@ dependencies = [ "system-deps", ] +[[package]] +name = "hashbrown" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04" + [[package]] name = "headers" version = "0.3.4" @@ -912,6 +918,16 @@ dependencies = [ "libc", ] +[[package]] +name = "indexmap" +version = "1.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "824845a0bf897a9042383849b02c1bc219c2383772efcd5c6f9766fa4b81aef3" +dependencies = [ + "autocfg", + "hashbrown", +] + [[package]] name = "instant" version = "0.1.9" @@ -1207,6 +1223,7 @@ dependencies = [ "num-traits", "once_cell", "pbkdf2", + "priority-queue", "protobuf", "rand", "serde", @@ -1723,6 +1740,16 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc5c99d529f0d30937f6f4b8a86d988047327bb88d04d2c4afc356de74722131" +[[package]] +name = "priority-queue" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f16f1277a63996195ef38361e2c909314614c6f25f2ac4968f87dfd94a625d3d" +dependencies = [ + "autocfg", + "indexmap", +] + [[package]] name = "proc-macro-crate" version = "0.1.5" diff --git a/core/Cargo.toml b/core/Cargo.toml index 29f4f332..8700875a 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -31,6 +31,7 @@ num-integer = "0.1" num-traits = "0.2" once_cell = "1.5.2" pbkdf2 = { version = "0.7", default-features = false, features = ["hmac"] } +priority-queue = "1.1" protobuf = "~2.14.0" rand = "0.8" serde = { version = "1.0", features = ["derive"] } diff --git a/core/src/cache.rs b/core/src/cache.rs index 55c9ab01..d96e79a2 100644 --- a/core/src/cache.rs +++ b/core/src/cache.rs @@ -1,30 +1,173 @@ -use std::fs; -use std::fs::File; +use std::cmp::Reverse; +use std::collections::HashMap; +use std::fs::{self, File}; use std::io::{self, Error, ErrorKind, Read, Write}; use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; +use std::time::SystemTime; + +use priority_queue::PriorityQueue; use crate::authentication::Credentials; use crate::spotify_id::FileId; +struct SizeLimiter { + queue: PriorityQueue>, + sizes: HashMap, + size_limit: u64, + in_use: u64, +} + +impl SizeLimiter { + fn new(limit: u64) -> Self { + Self { + queue: PriorityQueue::new(), + sizes: HashMap::new(), + size_limit: limit, + in_use: 0, + } + } + + /// Adds an entry to this data structure. + /// + /// If this file is already contained, it will be updated accordingly. + fn add(&mut self, file: &Path, size: u64, accessed: SystemTime) { + self.in_use += size; + self.queue.push(file.to_owned(), Reverse(accessed)); + if let Some(old_size) = self.sizes.insert(file.to_owned(), size) { + // It's important that decreasing happens after + // increasing the size, to prevent an overflow. + self.in_use -= old_size; + } + } + + /// Returns the least recently accessed file if the size of the cache exceeds + /// the limit. + /// + /// The entry is removed from the data structure, but the caller is responsible + /// to delete the file in the file system. + fn pop(&mut self) -> Option { + if self.in_use > self.size_limit { + let (next, _) = self.queue.pop()?; + // panic safety: It is guaranteed that `queue` and `sizes` have the same keys. + let size = self.sizes.remove(&next).unwrap(); + self.in_use -= size; + Some(next) + } else { + None + } + } + + fn update(&mut self, file: &Path, access_time: SystemTime) -> bool { + self.queue + .change_priority(file, Reverse(access_time)) + .is_some() + } + + fn remove(&mut self, file: &Path) { + if self.queue.remove(file).is_none() { + return; + } + let size = self.sizes.remove(file).unwrap(); + self.in_use -= size; + } +} + +struct FsSizeLimiter { + limiter: Mutex, +} + +impl FsSizeLimiter { + fn get_metadata(file: &Path) -> io::Result<(SystemTime, u64)> { + let metadata = file.metadata()?; + let access_time = metadata + .accessed() + .or_else(|_| metadata.created()) + .unwrap_or_else(|_| SystemTime::now()); + let size = metadata.len(); + + Ok((access_time, size)) + } + + fn init_dir(limiter: &mut SizeLimiter, path: &Path) { + for entry in fs::read_dir(path).into_iter().flatten().flatten() { + if let Ok(file_type) = entry.file_type() { + if file_type.is_dir() { + Self::init_dir(limiter, &entry.path()) + } else if file_type.is_file() { + let path = entry.path(); + if let Ok((access_time, size)) = Self::get_metadata(&path) { + limiter.add(&path, size, access_time); + } + } + } + } + } + + fn add(&self, file: &Path, size: u64) { + self.limiter + .lock() + .unwrap() + .add(file, size, SystemTime::now()); + } + + fn touch(&self, file: &Path) -> bool { + self.limiter.lock().unwrap().update(file, SystemTime::now()) + } + + fn remove(&self, file: &Path) { + self.limiter.lock().unwrap().remove(file); + } + + fn shrink(&self) { + while let Some(file) = self.limiter.lock().unwrap().pop() { + let _ = fs::remove_file(file); + } + } + + fn new(path: &Path, limit: u64) -> Self { + let mut limiter = SizeLimiter::new(limit); + Self::init_dir(&mut limiter, path); + + while let Some(file) = limiter.pop() { + let _ = fs::remove_file(file); + } + + Self { + limiter: Mutex::new(limiter), + } + } +} + /// A cache for volume, credentials and audio files. #[derive(Clone)] pub struct Cache { credentials_location: Option, volume_location: Option, audio_location: Option, + size_limiter: Option>, } +pub struct RemoveFileError(()); + impl Cache { pub fn new>( system_location: Option

, audio_location: Option

, + size_limit: Option, ) -> io::Result { if let Some(location) = &system_location { fs::create_dir_all(location)?; } + let mut size_limiter = None; + if let Some(location) = &audio_location { fs::create_dir_all(location)?; + if let Some(limit) = size_limit { + let limiter = FsSizeLimiter::new(location.as_ref(), limit); + size_limiter = Some(Arc::new(limiter)); + } } let audio_location = audio_location.map(|p| p.as_ref().to_owned()); @@ -37,6 +180,7 @@ impl Cache { credentials_location, volume_location, audio_location, + size_limiter, }; Ok(cache) @@ -121,13 +265,21 @@ impl Cache { } pub fn file(&self, file: FileId) -> Option { - File::open(self.file_path(file)?) - .map_err(|e| { + let path = self.file_path(file)?; + match File::open(&path) { + Ok(file) => { + if let Some(limiter) = self.size_limiter.as_deref() { + limiter.touch(&path); + } + Some(file) + } + Err(e) => { if e.kind() != ErrorKind::NotFound { warn!("Error reading file from cache: {}", e) } - }) - .ok() + None + } + } } pub fn save_file(&self, file: FileId, contents: &mut F) { @@ -142,37 +294,25 @@ impl Cache { .and_then(|_| File::create(&path)) .and_then(|mut file| io::copy(contents, &mut file)); - if let Err(e) = result { - if e.kind() == ErrorKind::Other { - // Perhaps there's no space left in the cache - // TODO: try to narrow down the error (platform-dependently) - info!("An error occured while writing to cache, trying to flush the cache"); - - if fs::remove_dir_all(self.audio_location.as_ref().unwrap()) - .and_then(|_| fs::create_dir_all(parent)) - .and_then(|_| File::create(&path)) - .and_then(|mut file| io::copy(contents, &mut file)) - .is_ok() - { - // It worked, there's no need to print a warning - return; - } + if let Ok(size) = result { + if let Some(limiter) = self.size_limiter.as_deref() { + limiter.add(&path, size); + limiter.shrink(); } - - warn!("Cannot save file to cache: {}", e) } } - pub fn remove_file(&self, file: FileId) -> bool { - if let Some(path) = self.file_path(file) { - if let Err(err) = fs::remove_file(path) { - warn!("Unable to remove file from cache: {}", err); - false - } else { - true - } + pub fn remove_file(&self, file: FileId) -> Result<(), RemoveFileError> { + let path = self.file_path(file).ok_or(RemoveFileError(()))?; + + if let Err(err) = fs::remove_file(&path) { + warn!("Unable to remove file from cache: {}", err); + Err(RemoveFileError(())) } else { - false + if let Some(limiter) = self.size_limiter.as_deref() { + limiter.remove(&path); + } + Ok(()) } } } diff --git a/playback/src/player.rs b/playback/src/player.rs index 3f0778f9..e13d9cbc 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -793,8 +793,7 @@ impl PlayerTrackLoader { e ); - // unwrap safety: The file is cached, so session must have a cache - if !self.session.cache().unwrap().remove_file(file_id) { + if self.session.cache().unwrap().remove_file(file_id).is_err() { return None; } diff --git a/src/main.rs b/src/main.rs index 78e7e2f9..7f5ecdbd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -367,7 +367,7 @@ fn get_setup(args: &[String]) -> Setup { .map(|p| p.into()); } - match Cache::new(system_dir, audio_dir) { + match Cache::new(system_dir, audio_dir, Some(50_000_000)) { Ok(cache) => Some(cache), Err(e) => { warn!("Cannot create cache: {}", e);