1
0
Fork 0
mirror of https://github.com/librespot-org/librespot.git synced 2025-10-04 18:29:45 +02:00

Cache audio files to disk.

This commit is contained in:
Paul Lietar 2015-07-07 22:40:31 +01:00
parent 9ae452e22d
commit e452abce43
11 changed files with 293 additions and 164 deletions

View file

@ -1,21 +1,31 @@
use byteorder::{ByteOrder, BigEndian};
use std::cmp::min;
use std::collections::{BitSet, HashMap};
use std::io::{self, SeekFrom};
use std::slice::bytes::copy_memory;
use std::collections::BitSet;
use std::sync::{Arc, Condvar, Mutex};
use std::sync::mpsc::{self, TryRecvError};
use std::thread;
use std::fs;
use std::io::{self, Read, Write, Seek, SeekFrom};
use std::path::PathBuf;
use tempfile::TempFile;
use stream::StreamEvent;
use util::FileId;
use util::{FileId, IgnoreExt, ZeroFile, mkdir_existing};
use session::Session;
const CHUNK_SIZE : usize = 0x10000;
const CHUNK_SIZE : usize = 0x20000;
pub struct AudioFile<'s> {
position: usize,
pub enum AudioFile<'s> {
Direct(fs::File),
Loading(AudioFileLoading<'s>)
}
struct AudioFileLoading<'s> {
read_file: TempFile,
position: u64,
seek: mpsc::Sender<u64>,
shared: Arc<AudioFileShared>,
#[allow(dead_code)]
@ -25,123 +35,17 @@ pub struct AudioFile<'s> {
struct AudioFileShared {
file_id: FileId,
size: usize,
data: Mutex<AudioFileData>,
cond: Condvar
chunk_count: usize,
cond: Condvar,
bitmap: Mutex<BitSet>,
}
struct AudioFileData {
buffer: Vec<u8>,
bitmap: BitSet,
}
impl <'s> AudioFileLoading<'s> {
fn new(session: &Session, file_id: FileId) -> AudioFileLoading {
let mut files_iter = TempFile::shared(2).unwrap().into_iter();
let read_file = files_iter.next().unwrap();
let mut write_file = files_iter.next().unwrap();
impl <'s> AudioFile <'s> {
fn new(session: &Session, shared: Arc<AudioFileShared>) -> AudioFile {
let shared_ = shared.clone();
let (seek_tx, seek_rx) = mpsc::channel();
let file = AudioFile {
thread: thread::scoped( move || { AudioFile::fetch(session, shared_, seek_rx); }),
position: 0,
seek: seek_tx,
shared: shared,
};
file
}
fn fetch_chunk(session: &Session, shared: &Arc<AudioFileShared>, index: usize) {
let rx = session.stream(shared.file_id,
(index * CHUNK_SIZE / 4) as u32,
(CHUNK_SIZE / 4) as u32);
let mut offset = 0usize;
for event in rx.iter() {
match event {
StreamEvent::Header(_,_) => (),
StreamEvent::Data(data) => {
let mut handle = shared.data.lock().unwrap();
copy_memory(&data, &mut handle.buffer[index * CHUNK_SIZE + offset ..]);
offset += data.len();
if offset >= CHUNK_SIZE {
break
}
}
}
}
{
let mut handle = shared.data.lock().unwrap();
handle.bitmap.insert(index as usize);
shared.cond.notify_all();
}
}
fn fetch(session: &Session, shared: Arc<AudioFileShared>, seek: mpsc::Receiver<u64>) {
let mut index = 0;
loop {
index = if index * CHUNK_SIZE < shared.size {
match seek.try_recv() {
Ok(position) => position as usize / CHUNK_SIZE,
Err(TryRecvError::Empty) => index,
Err(TryRecvError::Disconnected) => break
}
} else {
match seek.recv() {
Ok(position) => position as usize / CHUNK_SIZE,
Err(_) => break
}
};
{
let handle = shared.data.lock().unwrap();
while handle.bitmap.contains(&index) && index * CHUNK_SIZE < shared.size {
index += 1;
}
}
if index * CHUNK_SIZE < shared.size {
AudioFile::fetch_chunk(session, &shared, index)
}
}
}
}
impl <'s> io::Read for AudioFile <'s> {
fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
let index = self.position / CHUNK_SIZE;
let offset = self.position % CHUNK_SIZE;
let len = min(output.len(), CHUNK_SIZE-offset);
let mut handle = self.shared.data.lock().unwrap();
while !handle.bitmap.contains(&index) {
handle = self.shared.cond.wait(handle).unwrap();
}
copy_memory(&handle.buffer[self.position..self.position+len], output);
self.position += len;
Ok(len)
}
}
impl <'s> io::Seek for AudioFile <'s> {
fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
let newpos = match pos {
SeekFrom::Start(offset) => offset as i64,
SeekFrom::End(offset) => self.shared.size as i64 + offset,
SeekFrom::Current(offset) => self.position as i64 + offset,
};
self.position = min(newpos as usize, self.shared.size);
self.seek.send(self.position as u64).unwrap();
Ok(self.position as u64)
}
}
impl AudioFileShared {
fn new(session: &Session, file_id: FileId) -> Arc<AudioFileShared> {
let size = session.stream(file_id, 0, 1).into_iter()
.filter_map(|event| {
match event {
@ -152,42 +56,178 @@ impl AudioFileShared {
}
}).next().unwrap();
let bufsize = size + (CHUNK_SIZE - size % CHUNK_SIZE);
let chunk_count = (size + CHUNK_SIZE / 2) / CHUNK_SIZE;
Arc::new(AudioFileShared {
let shared = Arc::new(AudioFileShared {
file_id: file_id,
size: size,
data: Mutex::new(AudioFileData {
buffer: vec![0u8; bufsize],
bitmap: BitSet::with_capacity(bufsize / CHUNK_SIZE as usize)
}),
chunk_count: chunk_count,
cond: Condvar::new(),
})
}
}
bitmap: Mutex::new(BitSet::with_capacity(chunk_count)),
});
pub struct AudioFileManager {
cache: HashMap<FileId, Arc<AudioFileShared>>
}
io::copy(&mut ZeroFile::new(size as u64), &mut write_file).unwrap();
impl AudioFileManager {
pub fn new() -> AudioFileManager {
AudioFileManager {
cache: HashMap::new()
let (seek_tx, seek_rx) = mpsc::channel();
AudioFileLoading {
read_file: read_file,
position: 0,
seek: seek_tx,
shared: shared.clone(),
thread: thread::scoped(move || {
AudioFileLoading::fetch(session, shared, write_file, seek_rx)
})
}
}
pub fn request<'a> (&mut self, session: &'a Session, file_id: FileId) -> AudioFile<'a> {
let shared = self.cache
.get(&file_id)
.cloned()
.unwrap_or_else(|| {
println!("Cache miss");
let shared = AudioFileShared::new(session, file_id.clone());
self.cache.insert(file_id, shared.clone());
shared
});
AudioFile::new(session, shared)
fn fetch(session: &Session, shared: Arc<AudioFileShared>,
mut write_file: TempFile, seek_rx: mpsc::Receiver<u64>) {
let mut index = 0;
loop {
match seek_rx.try_recv() {
Ok(position) => {
index = position as usize / CHUNK_SIZE;
}
Err(TryRecvError::Disconnected) => break,
Err(TryRecvError::Empty) => (),
}
let bitmap = shared.bitmap.lock().unwrap();
if bitmap.len() >= shared.chunk_count {
drop(bitmap);
AudioFileLoading::store(session, &shared, &mut write_file);
break;
}
while bitmap.contains(&index) {
index = (index + 1) % shared.chunk_count;
}
drop(bitmap);
AudioFileLoading::fetch_chunk(session, &shared, &mut write_file, index);
}
}
fn fetch_chunk(session: &Session, shared: &Arc<AudioFileShared>,
write_file: &mut TempFile, index: usize) {
let rx = session.stream(shared.file_id,
(index * CHUNK_SIZE / 4) as u32,
(CHUNK_SIZE / 4) as u32);
println!("Chunk {}", index);
write_file.seek(SeekFrom::Start((index * CHUNK_SIZE) as u64)).unwrap();
let mut size = 0usize;
for event in rx.iter() {
match event {
StreamEvent::Header(..) => (),
StreamEvent::Data(data) => {
write_file.write_all(&data).unwrap();
size += data.len();
if size >= CHUNK_SIZE {
break
}
}
}
}
let mut bitmap = shared.bitmap.lock().unwrap();
bitmap.insert(index as usize);
shared.cond.notify_all();
}
fn store(session: &Session, shared: &AudioFileShared, write_file: &mut TempFile) {
write_file.seek(SeekFrom::Start(0)).unwrap();
mkdir_existing(&AudioFileManager::cache_dir(session, shared.file_id)).unwrap();
let mut f = fs::File::create(AudioFileManager::cache_path(session, shared.file_id)).unwrap();
io::copy(write_file, &mut f).unwrap();
}
}
impl <'s> Read for AudioFileLoading<'s> {
fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
let index = self.position as usize / CHUNK_SIZE;
let offset = self.position as usize % CHUNK_SIZE;
let len = min(output.len(), CHUNK_SIZE-offset);
let mut bitmap = self.shared.bitmap.lock().unwrap();
while !bitmap.contains(&index) {
bitmap = self.shared.cond.wait(bitmap).unwrap();
}
drop(bitmap);
let len = try!(self.read_file.read(&mut output[..len]));
self.position += len as u64;
Ok(len)
}
}
impl <'s> Seek for AudioFileLoading<'s> {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
self.position = try!(self.read_file.seek(pos));
/*
* Notify the fetch thread to get the correct block
* This can fail if fetch thread has completed, in which case the
* block is ready. Just ignore the error.
*/
self.seek.send(self.position).ignore();
Ok(self.position as u64)
}
}
impl <'s> Read for AudioFile<'s> {
fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
match *self {
AudioFile::Direct(ref mut file) => file.read(output),
AudioFile::Loading(ref mut loading) => loading.read(output),
}
}
}
impl <'s> Seek for AudioFile<'s> {
fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
match *self {
AudioFile::Direct(ref mut file) => file.seek(pos),
AudioFile::Loading(ref mut loading) => loading.seek(pos),
}
}
}
pub struct AudioFileManager;
impl AudioFileManager {
pub fn new() -> AudioFileManager {
AudioFileManager
}
pub fn cache_dir(session: &Session, file_id: FileId) -> PathBuf {
let name = file_id.to_base16();
session.config.cache_location.join(&name[0..2])
}
pub fn cache_path(session: &Session, file_id: FileId) -> PathBuf {
let name = file_id.to_base16();
AudioFileManager::cache_dir(session, file_id).join(&name[2..])
}
pub fn request<'a> (&mut self, session: &'a Session, file_id: FileId) -> AudioFile<'a> {
match fs::File::open(AudioFileManager::cache_path(session, file_id)) {
Ok(f) => AudioFile::Direct(f),
Err(..) => AudioFile::Loading(AudioFileLoading::new(session, file_id))
}
}
}