diff --git a/src/audio_key.rs b/src/audio_key.rs index 118e051b..313cead1 100644 --- a/src/audio_key.rs +++ b/src/audio_key.rs @@ -1,47 +1,72 @@ -use std::collections::HashMap; +use std::collections::{HashMap, LinkedList}; use std::sync::{mpsc, Future}; use std::io::{Cursor, Write}; use byteorder::{BigEndian, ByteOrder, ReadBytesExt, WriteBytesExt}; use readall::ReadAllExt; +use std::mem; use util::{SpotifyId, FileId, IgnoreExt}; use session::Session; use connection::PacketHandler; pub type AudioKey = [u8; 16]; -type AudioKeyId = u32; + +#[derive(Debug,Hash,PartialEq,Eq,Clone)] +struct AudioKeyId(SpotifyId, FileId); + +enum AudioKeyStatus { + Loading(LinkedList>), + Loaded(AudioKey) +} pub struct AudioKeyManager { - next_seq: AudioKeyId, - callbacks: HashMap>, + next_seq: u32, + pending: HashMap, + cache: HashMap, } impl AudioKeyManager { pub fn new() -> AudioKeyManager { AudioKeyManager { next_seq: 1, - callbacks: HashMap::new(), + pending: HashMap::new(), + cache: HashMap::new() } } pub fn request(&mut self, session: &Session, track: SpotifyId, file: FileId) -> Future { - let (tx, rx) = mpsc::channel(); - let seq = self.next_seq; - self.next_seq += 1; + let id = AudioKeyId(track, file); + self.cache.get_mut(&id).map(|status| match status { + &mut AudioKeyStatus::Loaded(key) => { + Future::from_value(key.clone()) + } + &mut AudioKeyStatus::Loading(ref mut req) => { + let (tx, rx) = mpsc::channel(); + req.push_front(tx); + Future::from_receiver(rx) + } + }).unwrap_or_else(|| { + let seq = self.next_seq; + self.next_seq += 1; - let mut data : Vec = Vec::new(); - data.write(&file).unwrap(); - data.write(&track.to_raw()).unwrap(); - data.write_u32::(seq).unwrap(); - data.write_u16::(0x0000).unwrap(); + let mut data : Vec = Vec::new(); + data.write(&file).unwrap(); + data.write(&track.to_raw()).unwrap(); + data.write_u32::(seq).unwrap(); + data.write_u16::(0x0000).unwrap(); - session.send_packet(0xc, &data).unwrap(); + session.send_packet(0xc, &data).unwrap(); - self.callbacks.insert(seq, tx); + self.pending.insert(seq, id.clone()); - Future::from_receiver(rx) + let (tx, rx) = mpsc::channel(); + let mut req = LinkedList::new(); + req.push_front(tx); + self.cache.insert(id, AudioKeyStatus::Loading(req)); + Future::from_receiver(rx) + }) } } @@ -54,10 +79,15 @@ impl PacketHandler for AudioKeyManager { let mut key = [0u8; 16]; data.read_all(&mut key).unwrap(); - match self.callbacks.remove(&seq) { - Some(callback) => callback.send(key).ignore(), - None => () - }; + if let Some(status) = self.pending.remove(&seq).and_then(|id| { self.cache.get_mut(&id) }) { + let status = mem::replace(status, AudioKeyStatus::Loaded(key)); + + if let AudioKeyStatus::Loading(cbs) = status { + for cb in cbs { + cb.send(key).unwrap(); + } + } + } } }