diff --git a/Cargo.lock b/Cargo.lock index 1f4824f5..67573c29 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -467,6 +467,7 @@ version = "0.1.0" dependencies = [ "bit-set 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "lewton 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", "librespot-core 0.1.0", @@ -475,6 +476,7 @@ dependencies = [ "num-traits 0.1.43 (registry+https://github.com/rust-lang/crates.io-index)", "rust-crypto 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)", "tempfile 2.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "tremor 0.1.0 (git+https://github.com/plietar/rust-tremor)", "vorbis 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/audio/Cargo.toml b/audio/Cargo.toml index 5c61b694..59ebfe84 100644 --- a/audio/Cargo.toml +++ b/audio/Cargo.toml @@ -9,6 +9,7 @@ path = "../core" [dependencies] bit-set = "0.4.0" byteorder = "1.0" +bytes = "0.4" futures = "0.1.8" lewton = "0.9.3" log = "0.3.5" @@ -16,6 +17,7 @@ num-bigint = "0.1.35" num-traits = "0.1.36" rust-crypto = "0.2.36" tempfile = "2.1" +tokio = "0.1.2" tremor = { git = "https://github.com/plietar/rust-tremor", optional = true } vorbis = { version ="0.1.0", optional = true } diff --git a/audio/src/fetch.rs b/audio/src/fetch.rs index 1aa0c0c0..ffdbe4b1 100644 --- a/audio/src/fetch.rs +++ b/audio/src/fetch.rs @@ -1,5 +1,5 @@ -use bit_set::BitSet; use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; +use bytes::Bytes; use futures::sync::{mpsc, oneshot}; use futures::Stream; use futures::{Async, Future, Poll}; @@ -7,13 +7,20 @@ use std::cmp::min; use std::fs; use std::io::{self, Read, Seek, SeekFrom, Write}; use std::sync::{Arc, Condvar, Mutex}; +use std::time::{Duration, Instant}; use tempfile::NamedTempFile; +use range_set::{Range, RangeSet}; use core::channel::{Channel, ChannelData, ChannelError, ChannelHeaders}; use core::session::Session; use core::spotify_id::FileId; +use futures::sync::mpsc::unbounded; +use std::sync::atomic; +use std::sync::atomic::AtomicUsize; -const CHUNK_SIZE: usize = 0x20000; +const MINIMUM_CHUNK_SIZE: usize = 1024 * 16; +const MAXIMUM_CHUNK_SIZE: usize = 1024 * 128; +const MAXIMUM_ASSUMED_PING_TIME_SECONDS: u64 = 5; pub enum AudioFile { Cached(fs::File), @@ -27,37 +34,187 @@ pub enum AudioFileOpen { pub struct AudioFileOpenStreaming { session: Session, - data_rx: Option, + initial_data_rx: Option, + initial_data_length: Option, + initial_request_sent_time: Instant, headers: ChannelHeaders, file_id: FileId, complete_tx: Option>, } + +enum StreamLoaderCommand{ + Fetch(Range), // signal the stream loader to fetch a range of the file + RandomAccessMode(), // optimise download strategy for random access + StreamMode(), // optimise download strategy for streaming + StreamDataRate(usize), // when optimising for streaming, assume a streaming rate of this many bytes per second. + Close(), // terminate and don't load any more data +} + + +#[derive(Clone)] +pub struct StreamLoaderController { + channel_tx: Option>, + stream_shared: Option>, + file_size: usize, +} + + +impl StreamLoaderController { + pub fn len(&self) -> usize { + return self.file_size; + } + + pub fn range_available(&self, range: Range) -> bool { + if let Some(ref shared) = self.stream_shared { + let mut download_status = shared.download_status.lock().unwrap(); + if range.length <= download_status.downloaded.contained_length_from_value(range.start) { + return true; + } else { + return false; + } + } else { + if range.length <= self.len() - range.start { + return true; + } else { + return false; + } + } + } + + pub fn ping_time_ms(&self) -> usize { + if let Some(ref shared) = self.stream_shared { + return shared.ping_time_ms.load(atomic::Ordering::Relaxed); + } else { + return 0; + } + } + + fn send_stream_loader_command(&mut self, command: StreamLoaderCommand) { + if let Some(ref mut channel) = self.channel_tx { + // ignore the error in case the channel has been closed already. + let _ = channel.unbounded_send(command); + } + } + + pub fn fetch(&mut self, range: Range) { + // signal the stream loader to fetch a range of the file + self.send_stream_loader_command(StreamLoaderCommand::Fetch(range)); + } + + pub fn fetch_blocking(&mut self, mut range: Range) { + // signal the stream loader to tech a range of the file and block until it is loaded. + + // ensure the range is within the file's bounds. + if range.start >= self.len() { + range.length = 0; + } else if range.end() > self.len() { + range.length = self.len() - range.start; + } + + self.fetch(range); + + if let Some(ref shared) = self.stream_shared { + let mut download_status = shared.download_status.lock().unwrap(); + while range.length > download_status.downloaded.contained_length_from_value(range.start) { + download_status = shared.cond.wait_timeout(download_status, Duration::from_millis(1000)).unwrap().0; + if range.length > (download_status.downloaded.union(&download_status.requested).contained_length_from_value(range.start)) { + // For some reason, the requested range is neither downloaded nor requested. + // This could be due to a network error. Request it again. + // We can't use self.fetch here because self can't borrowed mutably, so we access the channel directly. + if let Some(ref mut channel) = self.channel_tx { + // ignore the error in case the channel has been closed already. + let _ = channel.unbounded_send(StreamLoaderCommand::Fetch(range)); + } + } + } + } + + } + + pub fn fetch_next(&mut self, length: usize) { + let range:Range = if let Some(ref shared) = self.stream_shared { + Range { + start: shared.read_position.load(atomic::Ordering::Relaxed), + length: length, + } + } else { + return; + }; + self.fetch(range); + } + + pub fn fetch_next_blocking(&mut self, length: usize) { + let range:Range = if let Some(ref shared) = self.stream_shared { + Range { + start: shared.read_position.load(atomic::Ordering::Relaxed), + length: length, + } + } else { + return; + }; + self.fetch_blocking(range); + } + + pub fn set_random_access_mode(&mut self) { + // optimise download strategy for random access + self.send_stream_loader_command(StreamLoaderCommand::RandomAccessMode()); + } + + pub fn set_stream_mode(&mut self) { + // optimise download strategy for streaming + self.send_stream_loader_command(StreamLoaderCommand::StreamMode()); + } + + pub fn set_stream_data_rate(&mut self, bytes_per_second: usize) { + // when optimising for streaming, assume a streaming rate of this many bytes per second. + self.send_stream_loader_command(StreamLoaderCommand::StreamDataRate(bytes_per_second)); + } + + pub fn close(&mut self) { + // terminate stream loading and don't load any more data for this file. + self.send_stream_loader_command(StreamLoaderCommand::Close()); + } + + +} + + pub struct AudioFileStreaming { read_file: fs::File, position: u64, - seek: mpsc::UnboundedSender, + + stream_loader_command_tx: mpsc::UnboundedSender, shared: Arc, } + +struct AudioFileDownloadStatus { + requested: RangeSet, + downloaded: RangeSet, +} + struct AudioFileShared { file_id: FileId, - chunk_count: usize, + file_size: usize, cond: Condvar, - bitmap: Mutex, + download_status: Mutex, + ping_time_ms: AtomicUsize, + read_position: AtomicUsize, } impl AudioFileOpenStreaming { fn finish(&mut self, size: usize) -> AudioFileStreaming { - let chunk_count = (size + CHUNK_SIZE - 1) / CHUNK_SIZE; let shared = Arc::new(AudioFileShared { file_id: self.file_id, - chunk_count: chunk_count, + file_size: size, cond: Condvar::new(), - bitmap: Mutex::new(BitSet::with_capacity(chunk_count)), + download_status: Mutex::new(AudioFileDownloadStatus {requested: RangeSet::new(), downloaded: RangeSet::new()}), + ping_time_ms: AtomicUsize::new(0), + read_position: AtomicUsize::new(0), }); let mut write_file = NamedTempFile::new().unwrap(); @@ -66,16 +223,20 @@ impl AudioFileOpenStreaming { let read_file = write_file.reopen().unwrap(); - let data_rx = self.data_rx.take().unwrap(); + let initial_data_rx = self.initial_data_rx.take().unwrap(); + let initial_data_length = self.initial_data_length.take().unwrap(); let complete_tx = self.complete_tx.take().unwrap(); - let (seek_tx, seek_rx) = mpsc::unbounded(); + //let (seek_tx, seek_rx) = mpsc::unbounded(); + let (stream_loader_command_tx, stream_loader_command_rx) = mpsc::unbounded::(); let fetcher = AudioFileFetch::new( self.session.clone(), shared.clone(), - data_rx, + initial_data_rx, + self.initial_request_sent_time, + initial_data_length, write_file, - seek_rx, + stream_loader_command_rx, complete_tx, ); self.session.spawn(move |_| fetcher); @@ -84,7 +245,8 @@ impl AudioFileOpenStreaming { read_file: read_file, position: 0, - seek: seek_tx, + //seek: seek_tx, + stream_loader_command_tx: stream_loader_command_tx, shared: shared, } @@ -139,14 +301,17 @@ impl AudioFile { debug!("Downloading file {}", file_id); let (complete_tx, complete_rx) = oneshot::channel(); - let (headers, data) = request_chunk(session, file_id, 0).split(); + let initial_data_length = MINIMUM_CHUNK_SIZE; + let (headers, data) = request_range(session, file_id, 0, initial_data_length).split(); let open = AudioFileOpenStreaming { session: session.clone(), file_id: file_id, headers: headers, - data_rx: Some(data), + initial_data_rx: Some(data), + initial_data_length: Some(initial_data_length), + initial_request_sent_time: Instant::now(), complete_tx: Some(complete_tx), }; @@ -167,13 +332,36 @@ impl AudioFile { AudioFileOpen::Streaming(open) } + + pub fn get_stream_loader_controller(&self) -> StreamLoaderController { + match self { + AudioFile::Streaming(stream) => { + return StreamLoaderController { + channel_tx: Some(stream.stream_loader_command_tx.clone()), + stream_shared: Some(stream.shared.clone()), + file_size: stream.shared.file_size, + } + } + AudioFile::Cached(ref file) => { + return StreamLoaderController { + channel_tx: None, + stream_shared: None, + file_size: file.metadata().unwrap().len() as usize, + } + } + } + } } -fn request_chunk(session: &Session, file: FileId, index: usize) -> Channel { - trace!("requesting chunk {}", index); - let start = (index * CHUNK_SIZE / 4) as u32; - let end = ((index + 1) * CHUNK_SIZE / 4) as u32; +fn request_range(session: &Session, file: FileId, offset: usize, length: usize) -> Channel { + trace!("requesting range starting at {} of length {}", offset, length); + + let start = offset / 4; + let mut end = (offset+length) / 4; + if (offset+length) % 4 != 0 { + end += 1; + } let (id, channel) = session.channel().allocate(); @@ -186,81 +374,405 @@ fn request_chunk(session: &Session, file: FileId, index: usize) -> Channel { data.write_u32::(0x00009C40).unwrap(); data.write_u32::(0x00020000).unwrap(); data.write(&file.0).unwrap(); - data.write_u32::(start).unwrap(); - data.write_u32::(end).unwrap(); + data.write_u32::(start as u32).unwrap(); + data.write_u32::(end as u32).unwrap(); session.send_packet(0x8, data); channel } + + +struct PartialFileData { + offset: usize, + data: Bytes, +} + +enum ReceivedData { + ResponseTimeMs(usize), + Data(PartialFileData), +} + +struct AudioFileFetchDataReceiver { + shared: Arc, + file_data_tx: mpsc::UnboundedSender, + data_rx: ChannelData, + data_offset: usize, + request_length: usize, + request_sent_time: Option, +} + +impl AudioFileFetchDataReceiver { + fn new( + shared: Arc, + file_data_tx: mpsc::UnboundedSender, + data_rx: ChannelData, + data_offset: usize, + request_length: usize, + request_sent_time: Instant, + ) -> AudioFileFetchDataReceiver { + + AudioFileFetchDataReceiver { + shared: shared, + data_rx: data_rx, + file_data_tx: file_data_tx, + data_offset: data_offset, + request_length: request_length, + request_sent_time: Some(request_sent_time), + } + } +} + + + +impl AudioFileFetchDataReceiver { + fn finish(&mut self) { + if self.request_length > 0 { + + let missing_range = Range::new(self.data_offset, self.request_length); + + let mut download_status = self.shared.download_status.lock().unwrap(); + download_status.requested.subtract_range(&missing_range); + self.shared.cond.notify_all(); + } + } +} + +impl Future for AudioFileFetchDataReceiver { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + loop { + trace!("Looping data_receiver for offset {} and length {}", self.data_offset, self.request_length); + match self.data_rx.poll() { + Ok(Async::Ready(Some(data))) => { + if let Some(request_sent_time) = self.request_sent_time { + let duration = Instant::now() - request_sent_time; + let mut duration_ms: u64; + if duration.as_secs() > MAXIMUM_ASSUMED_PING_TIME_SECONDS { + duration_ms = MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000; + }else { + duration_ms = duration.as_secs() *1000 + duration.subsec_millis() as u64; + } + let _ = self.file_data_tx.unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize)); + } + let data_size = data.len(); + trace!("data_receiver got {} bytes of data", data_size); + let _ = self.file_data_tx.unbounded_send(ReceivedData::Data(PartialFileData { offset: self.data_offset, data: data, })); + self.data_offset += data_size; + if self.request_length < data_size { + warn!("Received more data from server than requested."); + self.request_length = 0; + } else { + self.request_length -= data_size; + } + if self.request_length == 0 { + trace!("Data receiver completed at position {}", self.data_offset); + return Ok(Async::Ready(())); + } + } + Ok(Async::Ready(None)) => { + if self.request_length > 0 { + warn!("Received less data from server than requested."); + self.finish(); + } + return Ok(Async::Ready(())); + } + Ok(Async::NotReady) => { + //trace!("No more data for data_receiver at the moment."); + return Ok(Async::NotReady); + } + Err(ChannelError) => { + warn!("error from channel"); + self.finish(); + return Ok(Async::Ready(())); + } + } + } + } +} + + +enum DownloadStrategy { + RandomAccess(), + Streaming(), +} + struct AudioFileFetch { session: Session, shared: Arc, output: Option, - index: usize, - data_rx: ChannelData, + file_data_tx: mpsc::UnboundedSender, + file_data_rx: mpsc::UnboundedReceiver, - seek_rx: mpsc::UnboundedReceiver, + stream_loader_command_rx: mpsc::UnboundedReceiver, complete_tx: Option>, + download_strategy: DownloadStrategy, + streaming_data_rate: usize, + network_response_times_ms: Vec, } impl AudioFileFetch { fn new( session: Session, shared: Arc, - data_rx: ChannelData, + initial_data_rx: ChannelData, + initial_request_sent_time: Instant, + initial_data_length: usize, + output: NamedTempFile, - seek_rx: mpsc::UnboundedReceiver, + stream_loader_command_rx: mpsc::UnboundedReceiver, complete_tx: oneshot::Sender, ) -> AudioFileFetch { + + let (file_data_tx, file_data_rx) = unbounded::(); + + { + let requested_range = Range::new(0, initial_data_length); + let mut download_status = shared.download_status.lock().unwrap(); + download_status.requested.add_range(&requested_range); + } + + + let initial_data_receiver = AudioFileFetchDataReceiver::new( + shared.clone(), + file_data_tx.clone(), + initial_data_rx, + 0, + initial_data_length, + initial_request_sent_time, + ); + + session.spawn(move |_| initial_data_receiver); + AudioFileFetch { session: session, shared: shared, output: Some(output), - index: 0, - data_rx: data_rx, + file_data_tx: file_data_tx, + file_data_rx: file_data_rx, - seek_rx: seek_rx, + stream_loader_command_rx: stream_loader_command_rx, complete_tx: Some(complete_tx), + download_strategy: DownloadStrategy::RandomAccess(), // start with random access mode until someone tells us otherwise + streaming_data_rate: 40, // assume 360 kbit per second unless someone tells us otherwise. + network_response_times_ms: Vec::new(), } } - fn download(&mut self, mut new_index: usize) { - assert!(new_index < self.shared.chunk_count); + fn download_range(&mut self, mut offset: usize, mut length: usize) { + if length < MINIMUM_CHUNK_SIZE { + length = MINIMUM_CHUNK_SIZE; + } + + // ensure the values are within the bounds and align them by 4 for the spotify protocol. + if offset >= self.shared.file_size { + return; + } + + if length <= 0 { + return; + } + + if offset + length > self.shared.file_size { + length = self.shared.file_size - offset; + } + + if offset % 4 != 0 { + length += offset % 4; + offset -= offset % 4; + } + + if length % 4 != 0 { + length += 4 - (length % 4); + } + + let mut ranges_to_request = RangeSet::new(); + ranges_to_request.add_range(&Range::new(offset, length)); + + let mut download_status = self.shared.download_status.lock().unwrap(); + + ranges_to_request.subtract_range_set(&download_status.downloaded); + ranges_to_request.subtract_range_set(&download_status.requested); + + + for range in ranges_to_request.iter() { + let (_headers, data) = request_range(&self.session, self.shared.file_id, range.start, range.length).split(); + + download_status.requested.add_range(range); + + + let receiver = AudioFileFetchDataReceiver::new( + self.shared.clone(), + self.file_data_tx.clone(), + data, + range.start, + range.length, + Instant::now(), + ); + + self.session.spawn(move |_| receiver); + } + + } + + fn pre_fetch_more_data(&mut self) { + + // determine what is still missing + let mut missing_data = RangeSet::new(); + missing_data.add_range(&Range::new(0,self.shared.file_size)); { - let bitmap = self.shared.bitmap.lock().unwrap(); - while bitmap.contains(new_index) { - new_index = (new_index + 1) % self.shared.chunk_count; + let download_status = self.shared.download_status.lock().unwrap(); + missing_data.subtract_range_set(&download_status.downloaded); + missing_data.subtract_range_set(&download_status.requested); + } + + // download data from after the current read position first + let mut tail_end = RangeSet::new(); + let read_position = self.shared.read_position.load(atomic::Ordering::Relaxed); + tail_end.add_range(&Range::new(read_position, self.shared.file_size - read_position)); + let tail_end = tail_end.intersection(&missing_data); + + if ! tail_end.is_empty() { + let range = tail_end.get_range(0); + let offset = range.start; + let length = min(range.length, MAXIMUM_CHUNK_SIZE); + self.download_range(offset, length); + + } else if ! missing_data.is_empty() { + // ok, the tail is downloaded, download something fom the beginning. + let range = missing_data.get_range(0); + let offset = range.start; + let length = min(range.length, MAXIMUM_CHUNK_SIZE); + self.download_range(offset, length); + } + + } + + fn poll_file_data_rx(&mut self) -> Poll<(), ()> { + + loop { + match self.file_data_rx.poll() { + Ok(Async::Ready(None)) => { + trace!("File data channel closed."); + return Ok(Async::Ready(())); + } + Ok(Async::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms)))) => { + trace!("Received ping time information: {} ms.", response_time_ms); + + // record the response time + self.network_response_times_ms.push(response_time_ms); + + // prone old response times. Keep at most three. + while self.network_response_times_ms.len() > 3 { + self.network_response_times_ms.remove(0); + } + + // stats::median is experimental. So we calculate the median of up to three ourselves. + let ping_time_ms: usize = match self.network_response_times_ms.len() { + 1 => self.network_response_times_ms[0] as usize, + 2 => ((self.network_response_times_ms[0] + self.network_response_times_ms[1]) / 2) as usize, + 3 => { + let mut times = self.network_response_times_ms.clone(); + times.sort(); + times[1] + } + _ => unreachable!(), + }; + + // store our new estimate for everyone to see + self.shared.ping_time_ms.store(ping_time_ms, atomic::Ordering::Relaxed); + + }, + Ok(Async::Ready(Some(ReceivedData::Data(data)))) => { + + trace!("Writing data to file: offset {}, length {}", data.offset, data.data.len()); + + self.output + .as_mut() + .unwrap() + .seek(SeekFrom::Start(data.offset as u64)) + .unwrap(); + self.output.as_mut().unwrap().write_all(data.data.as_ref()).unwrap(); + + + + let mut full = false; + + { + let mut download_status = self.shared.download_status.lock().unwrap(); + + let received_range = Range::new(data.offset, data.data.len()); + download_status.downloaded.add_range(&received_range); + self.shared.cond.notify_all(); + + if download_status.downloaded.contained_length_from_value(0) >= self.shared.file_size { + full = true; + } + drop(download_status); + } + + if full { + self.finish(); + return Ok(Async::Ready(())); + } + + + } + Ok(Async::NotReady) => { + return Ok(Async::NotReady); + }, + Err(()) => unreachable!(), + } + + } + + } + + + fn poll_stream_loader_command_rx(&mut self) -> Poll<(), ()> { + + loop { + match self.stream_loader_command_rx.poll() { + Ok(Async::Ready(None)) => {} + Ok(Async::Ready(Some(StreamLoaderCommand::Fetch(request)))) => { + self.download_range(request.start, request.length); + } + Ok(Async::Ready(Some(StreamLoaderCommand::RandomAccessMode()))) => { + self.download_strategy = DownloadStrategy::RandomAccess(); + } + Ok(Async::Ready(Some(StreamLoaderCommand::StreamMode()))) => { + self.download_strategy = DownloadStrategy::Streaming(); + } + Ok(Async::Ready(Some(StreamLoaderCommand::StreamDataRate(rate)))) => { + self.streaming_data_rate = rate; + } + Ok(Async::Ready(Some(StreamLoaderCommand::Close()))) => { + return Ok(Async::Ready(())); + } + Ok(Async::NotReady) => { + return Ok(Async::NotReady) + }, + Err(()) => unreachable!(), } } - if self.index != new_index { - self.index = new_index; - - let offset = self.index * CHUNK_SIZE; - - self.output - .as_mut() - .unwrap() - .seek(SeekFrom::Start(offset as u64)) - .unwrap(); - - let (_headers, data) = request_chunk(&self.session, self.shared.file_id, self.index).split(); - self.data_rx = data; - } } fn finish(&mut self) { + trace!("====== FINISHED DOWNLOADING FILE! ======"); let mut output = self.output.take().unwrap(); let complete_tx = self.complete_tx.take().unwrap(); output.seek(SeekFrom::Start(0)).unwrap(); let _ = complete_tx.send(output); } + } impl Future for AudioFileFetch { @@ -268,80 +780,92 @@ impl Future for AudioFileFetch { type Error = (); fn poll(&mut self) -> Poll<(), ()> { - loop { - let mut progress = false; - match self.seek_rx.poll() { - Ok(Async::Ready(None)) => { - return Ok(Async::Ready(())); - } - Ok(Async::Ready(Some(offset))) => { - progress = true; - let index = offset as usize / CHUNK_SIZE; - self.download(index); - } - Ok(Async::NotReady) => (), - Err(()) => unreachable!(), + trace!("Polling AudioFileFetch"); + + match self.poll_stream_loader_command_rx() { + Ok(Async::NotReady) => (), + Ok(Async::Ready(_)) => { + return Ok(Async::Ready(())); } + Err(()) => unreachable!(), + } - match self.data_rx.poll() { - Ok(Async::Ready(Some(data))) => { - progress = true; - - self.output.as_mut().unwrap().write_all(data.as_ref()).unwrap(); - } - Ok(Async::Ready(None)) => { - progress = true; - - trace!("chunk {} / {} complete", self.index, self.shared.chunk_count); - - let full = { - let mut bitmap = self.shared.bitmap.lock().unwrap(); - bitmap.insert(self.index as usize); - self.shared.cond.notify_all(); - - bitmap.len() >= self.shared.chunk_count - }; - - if full { - self.finish(); - return Ok(Async::Ready(())); - } - - let new_index = (self.index + 1) % self.shared.chunk_count; - self.download(new_index); - } - Ok(Async::NotReady) => (), - Err(ChannelError) => { - warn!("error from channel"); - return Ok(Async::Ready(())); - } + match self.poll_file_data_rx() { + Ok(Async::NotReady) => (), + Ok(Async::Ready(_)) => { + return Ok(Async::Ready(())); } + Err(()) => unreachable!(), + } - if !progress { - return Ok(Async::NotReady); + + if let DownloadStrategy::Streaming() = self.download_strategy { + let bytes_pending: usize = { + let download_status = self.shared.download_status.lock().unwrap(); + download_status.requested.minus(&download_status.downloaded).len() + }; + + let ping_time = self.shared.ping_time_ms.load(atomic::Ordering::Relaxed); + + if bytes_pending < 2 * ping_time * self.streaming_data_rate { + self.pre_fetch_more_data(); } } + + + return Ok(Async::NotReady) } } impl Read for AudioFileStreaming { fn read(&mut self, output: &mut [u8]) -> io::Result { - let index = self.position as usize / CHUNK_SIZE; - let offset = self.position as usize % CHUNK_SIZE; - let len = min(output.len(), CHUNK_SIZE - offset); + let offset = self.position as usize; - let mut bitmap = self.shared.bitmap.lock().unwrap(); - while !bitmap.contains(index) { - bitmap = self.shared.cond.wait(bitmap).unwrap(); + if offset >= self.shared.file_size { + return Ok(0); } - drop(bitmap); - let read_len = try!(self.read_file.read(&mut output[..len])); + let length = min(output.len(), self.shared.file_size - offset); + + if length == 0 { + return Ok(0); + } + + + + let mut ranges_to_request = RangeSet::new(); + ranges_to_request.add_range(&Range::new(offset, length)); + + + let mut download_status = self.shared.download_status.lock().unwrap(); + ranges_to_request.subtract_range_set(&download_status.downloaded); + ranges_to_request.subtract_range_set(&download_status.requested); + + + for range in ranges_to_request.iter() { + debug!("requesting data at position {} (length : {})", range.start, range.length); + self.stream_loader_command_tx.unbounded_send(StreamLoaderCommand::Fetch(range.clone())).unwrap(); + } + + while !download_status.downloaded.contains(offset) { + download_status = self.shared.cond.wait_timeout(download_status, Duration::from_millis(1000)).unwrap().0; + } + let available_length = download_status.downloaded.contained_length_from_value(offset); + assert!(available_length > 0); + drop(download_status); + + + self.position = self.read_file.seek(SeekFrom::Start(offset as u64)).unwrap(); + let read_len = min(length, available_length); + let read_len = try!(self.read_file.read(&mut output[..read_len])); + self.position += read_len as u64; + self.shared.read_position.store(self.position as usize, atomic::Ordering::Relaxed); - Ok(read_len) + + return Ok(read_len); } } @@ -349,15 +873,7 @@ impl Seek for AudioFileStreaming { fn seek(&mut self, pos: SeekFrom) -> io::Result { self.position = try!(self.read_file.seek(pos)); // Do not seek past EOF - if (self.position as usize % CHUNK_SIZE) != 0 { - // Notify the fetch thread to get the correct block - // This can fail if fetch thread has completed, in which case the - // block is ready. Just ignore the error. - let _ = self.seek.unbounded_send(self.position); - } else { - warn!("Trying to seek past EOF"); - } - + self.shared.read_position.store(self.position as usize, atomic::Ordering::Relaxed); Ok(self.position) } } diff --git a/audio/src/lib.rs b/audio/src/lib.rs index 5b582dc0..f316e143 100644 --- a/audio/src/lib.rs +++ b/audio/src/lib.rs @@ -5,10 +5,12 @@ extern crate log; extern crate bit_set; extern crate byteorder; +extern crate bytes; extern crate crypto; extern crate num_bigint; extern crate num_traits; extern crate tempfile; +extern crate tokio; extern crate librespot_core as core; @@ -20,10 +22,13 @@ mod lewton_decoder; #[cfg(any(feature = "with-tremor", feature = "with-vorbis"))] mod libvorbis_decoder; +mod range_set; + pub use decrypt::AudioDecrypt; -pub use fetch::{AudioFile, AudioFileOpen}; +pub use fetch::{AudioFile, AudioFileOpen, StreamLoaderController}; #[cfg(not(any(feature = "with-tremor", feature = "with-vorbis")))] pub use lewton_decoder::{VorbisDecoder, VorbisError, VorbisPacket}; #[cfg(any(feature = "with-tremor", feature = "with-vorbis"))] pub use libvorbis_decoder::{VorbisDecoder, VorbisError, VorbisPacket}; + diff --git a/audio/src/range_set.rs b/audio/src/range_set.rs new file mode 100644 index 00000000..378725f6 --- /dev/null +++ b/audio/src/range_set.rs @@ -0,0 +1,241 @@ + +use std::cmp::{max,min}; +use std::slice::Iter; + + + +#[derive(Copy, Clone)] +pub struct Range { + pub start: usize, + pub length: usize, +} + +impl Range { + + pub fn new(start: usize, length: usize) -> Range { + return Range { + start: start, + length: length, + } + } + + pub fn end(&self) -> usize { + return self.start + self.length; + } + +} + +#[derive(Clone)] +pub struct RangeSet { + ranges: Vec, +} + + +impl RangeSet { + pub fn new() -> RangeSet { + RangeSet{ + ranges: Vec::::new(), + } + } + + pub fn is_empty(&self) -> bool { + return self.ranges.is_empty(); + } + + pub fn len(&self) -> usize { + let mut result = 0; + for range in self.ranges.iter() { + result += range.length; + } + return result; + } + + pub fn get_range(&self, index: usize) -> Range { + return self.ranges[index].clone(); + } + + pub fn iter(&self) -> Iter { + return self.ranges.iter(); + } + + pub fn contains(&self, value: usize) -> bool { + for range in self.ranges.iter() { + if value < range.start { + return false; + } else if range.start <= value && value < range.end() { + return true; + } + } + return false; + } + + pub fn contained_length_from_value(&self, value: usize) -> usize { + for range in self.ranges.iter() { + if value < range.start { + return 0; + } else if range.start <= value && value < range.end() { + return range.end() - value; + } + } + return 0; + + } + + #[allow(dead_code)] + pub fn contains_range_set(&self, other: &RangeSet) -> bool { + for range in other.ranges.iter() { + if self.contained_length_from_value(range.start) < range.length { + return false; + } + } + return true; + } + + + pub fn add_range(&mut self, range:&Range) { + + if range.length <= 0 { + // the interval is empty or invalid -> nothing to do. + return; + } + + + for index in 0..self.ranges.len() { + // the new range is clear of any ranges we already iterated over. + if range.end() < self.ranges[index].start{ + // the new range starts after anything we already passed and ends before the next range starts (they don't touch) -> insert it. + self.ranges.insert(index, range.clone()); + return; + + } else if range.start <= self.ranges[index].end() && self.ranges[index].start <= range.end() { + // the new range overlaps (or touches) the first range. They are to be merged. + // In addition we might have to merge further ranges in as well. + + let mut new_range = range.clone(); + + while index < self.ranges.len() && self.ranges[index].start <= new_range.end() { + let new_end = max(new_range.end(), self.ranges[index].end()); + new_range.start = min(new_range.start, self.ranges[index].start); + new_range.length = new_end - new_range.start; + self.ranges.remove(index); + } + + self.ranges.insert(index, new_range); + return; + + } + } + + // the new range is after everything else -> just add it + self.ranges.push(range.clone()); + } + + #[allow(dead_code)] + pub fn add_range_set(&mut self, other: &RangeSet) { + for range in other.ranges.iter() { + self.add_range(range); + } + } + + #[allow(dead_code)] + pub fn union(&self, other: &RangeSet) -> RangeSet { + let mut result = self.clone(); + result.add_range_set(other); + return result; + } + + pub fn subtract_range(&mut self, range: &Range) { + + if range.length <= 0 { + return; + } + + for index in 0..self.ranges.len() { + // the ranges we already passed don't overlap with the range to remove + + if range.end() <= self.ranges[index].start { + // the remaining ranges are past the one to subtract. -> we're done. + return + + } else if range.start <= self.ranges[index].start && self.ranges[index].start < range.end() { + // the range to subtract started before the current range and reaches into the current range + // -> we have to remove the beginning of the range or the entire range and do the same for following ranges. + + while index < self.ranges.len() && self.ranges[index].end() <= range.end() { + self.ranges.remove(index); + } + + if index < self.ranges.len() && self.ranges[index].start < range.end() { + self.ranges[index].start = range.end(); + } + + return; + + } else if range.end() < self.ranges[index].end() { + // the range to subtract punches a hole into the current range -> we need to create two smaller ranges. + + let first_range = Range { + start: self.ranges[index].start, + length: range.start - self.ranges[index].start, + }; + + self.ranges[index].start = range.end(); + + self.ranges.insert(index, first_range); + + return; + + } else if range.start < self.ranges[index].end() { + // the range truncates the existing range -> truncate the range. Let the for loop take care of overlaps with other ranges. + self.ranges[index].length = range.start - self.ranges[index].start; + + } + } + } + + pub fn subtract_range_set(&mut self, other: &RangeSet) { + for range in other.ranges.iter() { + self.subtract_range(range); + } + } + + pub fn minus(&self, other: &RangeSet) -> RangeSet { + let mut result = self.clone(); + result.subtract_range_set(other); + return result; + } + + pub fn intersection(&self, other: &RangeSet) -> RangeSet { + let mut result = RangeSet::new(); + + let mut self_index: usize = 0; + let mut other_index: usize = 0; + + while self_index < self.ranges.len() && other_index < other.ranges.len() { + if self.ranges[self_index].end() <= other.ranges[other_index].start { + // skip the interval + self_index += 1; + } else if other.ranges[other_index].end() <= self.ranges[self_index].start { + // skip the interval + other_index += 1; + } else { + // the two intervals overlap. Add the union and advance the index of the one that ends first. + let new_start = max(self.ranges[self_index].start, other.ranges[other_index].start); + let new_end = min(self.ranges[self_index].end(), other.ranges[other_index].end()); + assert!(new_start <= new_end); + result.add_range(&Range::new(new_start, new_end-new_start)); + if self.ranges[self_index].end() <= other.ranges[other_index].end() { + self_index += 1; + } else { + other_index += 1; + } + + } + + } + + return result; + } + +} + diff --git a/core/src/channel.rs b/core/src/channel.rs index 57655feb..3238a0a6 100644 --- a/core/src/channel.rs +++ b/core/src/channel.rs @@ -59,6 +59,8 @@ impl ChannelManager { let id: u16 = BigEndian::read_u16(data.split_to(2).as_ref()); + trace!("Received data for channel {}: {} bytes.", id, data.len()); + self.lock(|inner| { if let Entry::Occupied(entry) = inner.channels.entry(id) { let _ = entry.get().unbounded_send((cmd, data)); diff --git a/playback/src/player.rs b/playback/src/player.rs index a421c9ab..5d0e58ab 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -14,12 +14,14 @@ use config::{Bitrate, PlayerConfig}; use core::session::Session; use core::spotify_id::SpotifyId; -use audio::{AudioDecrypt, AudioFile}; +use audio::{AudioDecrypt, AudioFile, StreamLoaderController}; use audio::{VorbisDecoder, VorbisPacket}; use audio_backend::Sink; use metadata::{FileFormat, Metadata, Track}; use mixer::AudioFilter; + + pub struct Player { commands: Option>, thread_handle: Option>, @@ -202,12 +204,14 @@ enum PlayerState { decoder: Decoder, end_of_track: oneshot::Sender<()>, normalisation_factor: f32, + stream_loader_controller: StreamLoaderController, }, Playing { track_id: SpotifyId, decoder: Decoder, end_of_track: oneshot::Sender<()>, normalisation_factor: f32, + stream_loader_controller: StreamLoaderController, }, EndOfTrack { track_id: SpotifyId, @@ -234,6 +238,15 @@ impl PlayerState { } } + fn stream_loader_controller(&mut self) -> Option<&mut StreamLoaderController> { + use self::PlayerState::*; + match *self { + Stopped | EndOfTrack { .. } => None, + Paused { ref mut stream_loader_controller, .. } | Playing { ref mut stream_loader_controller, .. } => Some(stream_loader_controller), + Invalid => panic!("invalid state"), + } + } + fn playing_to_end_of_track(&mut self) { use self::PlayerState::*; match mem::replace(self, Invalid) { @@ -257,12 +270,14 @@ impl PlayerState { decoder, end_of_track, normalisation_factor, + stream_loader_controller, } => { *self = Playing { track_id: track_id, decoder: decoder, end_of_track: end_of_track, normalisation_factor: normalisation_factor, + stream_loader_controller: stream_loader_controller, }; } _ => panic!("invalid state"), @@ -277,12 +292,14 @@ impl PlayerState { decoder, end_of_track, normalisation_factor, + stream_loader_controller, } => { *self = Paused { track_id: track_id, decoder: decoder, end_of_track: end_of_track, normalisation_factor: normalisation_factor, + stream_loader_controller: stream_loader_controller, }; } _ => panic!("invalid state"), @@ -403,7 +420,7 @@ impl PlayerInternal { } match self.load_track(track_id, position as i64) { - Some((decoder, normalisation_factor)) => { + Some((decoder, normalisation_factor, stream_loader_controller)) => { if play { match self.state { PlayerState::Playing { @@ -427,6 +444,7 @@ impl PlayerInternal { decoder: decoder, end_of_track: end_of_track, normalisation_factor: normalisation_factor, + stream_loader_controller: stream_loader_controller, }; } else { self.state = PlayerState::Paused { @@ -434,6 +452,7 @@ impl PlayerInternal { decoder: decoder, end_of_track: end_of_track, normalisation_factor: normalisation_factor, + stream_loader_controller: stream_loader_controller, }; match self.state { PlayerState::Playing { @@ -460,6 +479,9 @@ impl PlayerInternal { } PlayerCommand::Seek(position) => { + if let Some(stream_loader_controller) = self.state.stream_loader_controller() { + stream_loader_controller.set_random_access_mode(); + } if let Some(decoder) = self.state.decoder() { match decoder.seek(position as i64) { Ok(_) => (), @@ -468,6 +490,17 @@ impl PlayerInternal { } else { warn!("Player::seek called from invalid state"); } + + // If we're playing, ensure, that we have enough data leaded to avoid a buffer underrun. + let stream_data_rate = self.stream_data_rate(); + if let Some(stream_loader_controller) = self.state.stream_loader_controller() { + stream_loader_controller.set_stream_mode(); + if let PlayerState::Playing{..} = self.state { + let wait_for_data_length = (2 * stream_loader_controller.ping_time_ms() * stream_data_rate) / 1000; + stream_loader_controller.fetch_next_blocking(wait_for_data_length); + } + } + } PlayerCommand::Play => { @@ -526,7 +559,15 @@ impl PlayerInternal { } } - fn load_track(&self, track_id: SpotifyId, position: i64) -> Option<(Decoder, f32)> { + fn stream_data_rate(&self) -> usize { + match self.config.bitrate { + Bitrate::Bitrate96 => 12 * 1024, + Bitrate::Bitrate160 => 20 * 1024, + Bitrate::Bitrate320 => 40 * 1024, + } + } + + fn load_track(&self, track_id: SpotifyId, position: i64) -> Option<(Decoder, f32, StreamLoaderController)> { let track = Track::get(&self.session, track_id).wait().unwrap(); info!( @@ -565,6 +606,21 @@ impl PlayerInternal { let encrypted_file = encrypted_file.wait().unwrap(); + + let mut stream_loader_controller = encrypted_file.get_stream_loader_controller(); + + // tell the stream loader how to optimise its strategy. + stream_loader_controller.set_stream_data_rate(self.stream_data_rate()); + + if position == 0 { + // No need to seek -> we stream from the beginning + stream_loader_controller.set_stream_mode(); + } else { + // we need to seek -> we set stream mode after the initial seek. + stream_loader_controller.set_random_access_mode(); + } + + let key = key.wait().unwrap(); let mut decrypted_file = AudioDecrypt::new(key, encrypted_file); @@ -585,11 +641,12 @@ impl PlayerInternal { Ok(_) => (), Err(err) => error!("Vorbis error: {:?}", err), } + stream_loader_controller.set_stream_mode(); } info!("Track \"{}\" loaded", track.name); - Some((decoder, normalisation_factor)) + Some((decoder, normalisation_factor, stream_loader_controller)) } }