diff --git a/src/audio_decrypt.rs b/src/audio_decrypt.rs index dd938c99..dfc384ae 100644 --- a/src/audio_decrypt.rs +++ b/src/audio_decrypt.rs @@ -1,7 +1,9 @@ use crypto::aes; use crypto::symmetriccipher::SynchronousStreamCipher; -use readall::ReadAllExt; use std::io; +use std::ops::Add; +use num::FromPrimitive; +use gmp::Mpz; use audio_key::AudioKey; @@ -16,16 +18,10 @@ pub struct AudioDecrypt { } impl AudioDecrypt { - pub fn new(key: AudioKey, mut reader: T) -> AudioDecrypt { - let mut cipher = aes::ctr(aes::KeySize::KeySize128, + pub fn new(key: AudioKey, reader: T) -> AudioDecrypt { + let cipher = aes::ctr(aes::KeySize::KeySize128, &key, AUDIO_AESIV); - - let mut buf = [0; 0xa7]; - let mut buf2 = [0; 0xa7]; - reader.read_all(&mut buf).unwrap(); - cipher.process(&buf, &mut buf2); - AudioDecrypt { cipher: cipher, key: key, @@ -45,9 +41,23 @@ impl io::Read for AudioDecrypt { } } -impl io::Seek for AudioDecrypt { - fn seek(&mut self, _pos: io::SeekFrom) -> io::Result { - Err(io::Error::new(io::ErrorKind::Other, "Cannot seek")) +impl io::Seek for AudioDecrypt { + fn seek(&mut self, pos: io::SeekFrom) -> io::Result { + let newpos = try!(self.reader.seek(pos)); + let skip = newpos % 16; + + let iv = Mpz::from_bytes_be(AUDIO_AESIV) + .add(Mpz::from_u64(newpos / 16).unwrap()) + .to_bytes_be(); + self.cipher = aes::ctr(aes::KeySize::KeySize128, + &self.key, + &iv); + + let buf = vec![0u8; skip as usize]; + let mut buf2 = vec![0u8; skip as usize]; + self.cipher.process(&buf, &mut buf2); + + Ok(newpos as u64) } } diff --git a/src/audio_file.rs b/src/audio_file.rs index 9c3a56be..5d79ff5e 100644 --- a/src/audio_file.rs +++ b/src/audio_file.rs @@ -1,23 +1,24 @@ use byteorder::{ByteOrder, BigEndian}; use std::cmp::min; use std::collections::BitSet; -use std::io; +use std::io::{self, SeekFrom}; use std::slice::bytes::copy_memory; -use std::sync::{mpsc, Arc, Condvar, Mutex}; +use std::sync::{Arc, Condvar, Mutex}; +use std::sync::mpsc::{self, TryRecvError}; use stream::{StreamRequest, StreamEvent}; use util::FileId; +use std::thread; + +const CHUNK_SIZE : usize = 0x40000; -const CHUNK_SIZE: usize = 0x40000; #[derive(Clone)] -pub struct AudioFileRef(Arc); - -struct AudioFile { +pub struct AudioFile { file: FileId, size: usize, + seek: mpsc::Sender, - data: Mutex, - cond: Condvar + data: Arc<(Mutex, Condvar)>, } struct AudioFileData { @@ -25,8 +26,8 @@ struct AudioFileData { bitmap: BitSet, } -impl AudioFileRef { - pub fn new(file: FileId, streams: mpsc::Sender) -> AudioFileRef { +impl AudioFile { + pub fn new(file: FileId, streams: mpsc::Sender) -> AudioFile { let (tx, rx) = mpsc::channel(); streams.send(StreamRequest { @@ -51,70 +52,99 @@ impl AudioFileRef { } size.unwrap() as usize }; + + let bufsize = size + (CHUNK_SIZE - size % CHUNK_SIZE); + let (tx, rx) = mpsc::channel(); - AudioFileRef(Arc::new(AudioFile { + let ret = AudioFile { file: file, size: size, + seek: tx, - data: Mutex::new(AudioFileData { - buffer: vec![0u8; size + (CHUNK_SIZE - size % CHUNK_SIZE)], - bitmap: BitSet::with_capacity(size / CHUNK_SIZE) - }), - cond: Condvar::new(), - })) + data: Arc::new((Mutex::new(AudioFileData { + buffer: vec![0u8; bufsize], + bitmap: BitSet::with_capacity(bufsize / CHUNK_SIZE as usize) + }), Condvar::new())), + }; + + let f = ret.clone(); + + thread::spawn( move || { f.fetch(streams, rx); }); + + ret } - pub fn fetch(&self, streams: mpsc::Sender) { - let &AudioFileRef(ref inner) = self; + fn fetch_chunk(&self, streams: &mpsc::Sender, index: usize) { + let (tx, rx) = mpsc::channel(); + streams.send(StreamRequest { + id: self.file, + offset: (index * CHUNK_SIZE / 4) as u32, + size: (CHUNK_SIZE / 4) as u32, + callback: tx + }).unwrap(); - let mut index : usize = 0; + let mut offset = 0usize; + for event in rx.iter() { + match event { + StreamEvent::Header(_,_) => (), + StreamEvent::Data(data) => { + let mut handle = self.data.0.lock().unwrap(); + copy_memory(&data, &mut handle.buffer[index * CHUNK_SIZE + offset ..]); + offset += data.len(); - while index * CHUNK_SIZE < inner.size { - let (tx, rx) = mpsc::channel(); - - streams.send(StreamRequest { - id: inner.file, - offset: (index * CHUNK_SIZE / 4) as u32, - size: (CHUNK_SIZE / 4) as u32, - callback: tx - }).unwrap(); - - let mut offset = 0; - for event in rx.iter() { - match event { - StreamEvent::Header(_,_) => (), - StreamEvent::Data(data) => { - let mut handle = inner.data.lock().unwrap(); - copy_memory(&data, &mut handle.buffer[index * CHUNK_SIZE + offset..]); - offset += data.len(); - - if offset >= CHUNK_SIZE { - break - } + if offset >= CHUNK_SIZE { + break } } } - + } + + { + let mut handle = self.data.0.lock().unwrap(); + handle.bitmap.insert(index as usize); + self.data.1.notify_all(); + } + } + + fn fetch(&self, streams: mpsc::Sender, seek: mpsc::Receiver) { + let mut index = 0; + loop { + index = if index * CHUNK_SIZE < self.size { + match seek.try_recv() { + Ok(position) => position as usize / CHUNK_SIZE, + Err(TryRecvError::Empty) => index, + Err(TryRecvError::Disconnected) => break + } + } else { + match seek.recv() { + Ok(position) => position as usize / CHUNK_SIZE, + Err(_) => break + } + }; + { - let mut handle = inner.data.lock().unwrap(); - handle.bitmap.insert(index); - inner.cond.notify_all(); + let handle = self.data.0.lock().unwrap(); + while handle.bitmap.contains(&index) && index * CHUNK_SIZE < self.size { + index += 1; + } } - index += 1; + if index * CHUNK_SIZE < self.size { + self.fetch_chunk(&streams, index) + } } } } pub struct AudioFileReader { - file: AudioFileRef, - position: usize + file: AudioFile, + position: usize, } impl AudioFileReader { - pub fn new(file: &AudioFileRef) -> AudioFileReader { + pub fn new(file: AudioFile) -> AudioFileReader { AudioFileReader { - file: file.clone(), + file: file, position: 0 } } @@ -126,11 +156,10 @@ impl io::Read for AudioFileReader { let offset = self.position % CHUNK_SIZE; let len = min(output.len(), CHUNK_SIZE-offset); - let &AudioFileRef(ref inner) = &self.file; - let mut handle = inner.data.lock().unwrap(); + let mut handle = self.file.data.0.lock().unwrap(); while !handle.bitmap.contains(&index) { - handle = inner.cond.wait(handle).unwrap(); + handle = self.file.data.1.wait(handle).unwrap(); } copy_memory(&handle.buffer[self.position..self.position+len], output); @@ -141,8 +170,16 @@ impl io::Read for AudioFileReader { } impl io::Seek for AudioFileReader { - fn seek(&mut self, _pos: io::SeekFrom) -> io::Result { - Err(io::Error::new(io::ErrorKind::Other, "Cannot seek")) + fn seek(&mut self, pos: io::SeekFrom) -> io::Result { + let newpos = match pos { + SeekFrom::Start(offset) => offset as i64, + SeekFrom::End(offset) => self.file.size as i64 + offset, + SeekFrom::Current(offset) => self.position as i64 + offset, + }; + + self.position = min(newpos as usize, self.file.size); + self.file.seek.send(self.position as u64).unwrap(); + Ok(self.position as u64) } } diff --git a/src/connection.rs b/src/connection.rs index ea5670c2..8d4e6cce 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -9,7 +9,6 @@ use std::result; use std::sync::mpsc; use keys::SharedKeys; -use util; #[derive(Debug)] pub enum Error { @@ -69,7 +68,7 @@ impl PlainConnection { pub fn recv_packet(&mut self) -> Result> { let size = try!(self.stream.read_u32::()) as usize; - let mut buffer = util::alloc_buffer(size); + let mut buffer = vec![0u8; size]; BigEndian::write_u32(&mut buffer, size as u32); try!(self.stream.read_all(&mut buffer[4..])); diff --git a/src/player.rs b/src/player.rs index 1a27e31a..668328c8 100644 --- a/src/player.rs +++ b/src/player.rs @@ -1,13 +1,13 @@ use portaudio; use std::sync::mpsc; -use std::thread; use vorbis; use audio_key::{AudioKeyRequest, AudioKeyResponse}; use metadata::TrackRef; use session::Session; -use audio_file::{AudioFileRef, AudioFileReader}; +use audio_file::{AudioFile, AudioFileReader}; use audio_decrypt::AudioDecrypt; +use util::Subfile; pub struct Player; @@ -28,14 +28,13 @@ impl Player { key }; - let reader = { - let file = AudioFileRef::new(file_id, session.stream.clone()); - let f = file.clone(); - let s = session.stream.clone(); - thread::spawn( move || { f.fetch(s) }); - AudioDecrypt::new(key, AudioFileReader::new(&file)) - }; - + let mut decoder = + vorbis::Decoder::new( + Subfile::new( + AudioDecrypt::new(key, + AudioFileReader::new( + AudioFile::new(file_id, session.stream.clone()))), 0xa7)).unwrap(); + //decoder.time_seek(60f64).unwrap(); portaudio::initialize().unwrap(); @@ -48,8 +47,6 @@ impl Player { ).unwrap(); stream.start().unwrap(); - let mut decoder = vorbis::Decoder::new(reader).unwrap(); - for pkt in decoder.packets() { match pkt { Ok(packet) => { diff --git a/src/stream.rs b/src/stream.rs index 6398e72a..581b4514 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -92,44 +92,51 @@ impl StreamManager { let mut packet = Cursor::new(&data as &[u8]); let id : ChannelId = packet.read_u16::().unwrap(); - let channel = match self.channels.get_mut(&id) { - Some(ch) => ch, - None => { return; } - }; + let mut close = false; + { + let channel = match self.channels.get_mut(&id) { + Some(ch) => ch, + None => { return; } + }; - match channel.mode { - ChannelMode::Header => { - let mut length = 0; + match channel.mode { + ChannelMode::Header => { + let mut length = 0; - while packet.position() < data.len() as u64 { - length = packet.read_u16::().unwrap(); - if length > 0 { - let header_id = packet.read_u8().unwrap(); - channel.callback.send(StreamEvent::Header( - header_id, - data.clone() - .offset(packet.position() as usize) - .limit(length as usize - 1) - )).unwrap(); + while packet.position() < data.len() as u64 { + length = packet.read_u16::().unwrap(); + if length > 0 { + let header_id = packet.read_u8().unwrap(); + channel.callback.send(StreamEvent::Header( + header_id, + data.clone() + .offset(packet.position() as usize) + .limit(length as usize - 1) + )).unwrap(); - packet.seek(SeekFrom::Current(length as i64 - 1)).unwrap(); + packet.seek(SeekFrom::Current(length as i64 - 1)).unwrap(); + } + } + + if length == 0 { + channel.mode = ChannelMode::Data; } } - - if length == 0 { - channel.mode = ChannelMode::Data; - } - } - ChannelMode::Data => { - if packet.position() < data.len() as u64 { - channel.callback.send(StreamEvent::Data( - data.clone().offset(packet.position() as usize))).unwrap(); - } else { - // TODO: close the channel + ChannelMode::Data => { + if packet.position() < data.len() as u64 { + channel.callback.send(StreamEvent::Data( + data.clone().offset(packet.position() as usize))).unwrap(); + } else { + close = true; + } } } } + + if close { + self.channels.remove(&id); + } } } diff --git a/src/util/mod.rs b/src/util/mod.rs index ff3d9b6d..23d54ef4 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -3,10 +3,12 @@ use rand::{Rng,Rand}; mod int128; mod spotify_id; mod arcvec; +mod subfile; pub use util::int128::u128; pub use util::spotify_id::{SpotifyId, FileId}; pub use util::arcvec::ArcVec; +pub use util::subfile::Subfile; #[macro_export] macro_rules! eprintln( @@ -37,15 +39,6 @@ pub fn rand_vec(rng: &mut G, size: usize) -> Vec { return vec } -pub fn alloc_buffer(size: usize) -> Vec { - let mut vec = Vec::with_capacity(size); - unsafe { - vec.set_len(size); - } - - vec -} - pub mod version { include!(concat!(env!("OUT_DIR"), "/version.rs")); diff --git a/src/util/subfile.rs b/src/util/subfile.rs new file mode 100644 index 00000000..dca3442e --- /dev/null +++ b/src/util/subfile.rs @@ -0,0 +1,40 @@ +use std::io::{Read, Seek, SeekFrom, Result}; + +pub struct Subfile { + stream: T, + offset: u64 +} + +impl Subfile { + pub fn new(mut stream: T, offset: u64) -> Subfile { + stream.seek(SeekFrom::Start(offset)).unwrap(); + Subfile { + stream: stream, + offset: offset + } + } +} + +impl Read for Subfile { + fn read(&mut self, buf: &mut [u8]) -> Result { + self.stream.read(buf) + } +} + +impl Seek for Subfile { + fn seek(&mut self, mut pos: SeekFrom) -> Result { + pos = match pos { + SeekFrom::Start(offset) => SeekFrom::Start(offset + self.offset), + x => x + }; + + let newpos = try!(self.stream.seek(pos)); + + if newpos > self.offset { + return Ok(newpos - self.offset) + } else { + return Ok(0) + } + } +} +