diff --git a/src/lib.rs b/src/lib.rs index 66a8c52d..0459ca75 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,6 @@ #![crate_name = "librespot"] -#![feature(plugin,zero_one,iter_arith,mpsc_select)] +#![feature(plugin,zero_one,iter_arith)] #![plugin(protobuf_macros)] #![plugin(json_macros)] diff --git a/src/player.rs b/src/player.rs index 1f0ca4e6..17078475 100644 --- a/src/player.rs +++ b/src/player.rs @@ -1,6 +1,6 @@ use eventual::{self, Async}; use portaudio; -use std::sync::{mpsc, Mutex, Arc, Condvar, MutexGuard}; +use std::sync::{mpsc, Mutex, Arc, MutexGuard}; use std::thread; use vorbis; @@ -10,8 +10,11 @@ use audio_decrypt::AudioDecrypt; use util::{self, SpotifyId, Subfile}; use spirc::PlayStatus; +pub type PlayerObserver = Box; + pub struct Player { - state: Arc<(Mutex, Condvar)>, + state: Arc>, + observers: Arc>>, commands: mpsc::Sender, } @@ -27,7 +30,9 @@ pub struct PlayerState { } struct PlayerInternal { - state: Arc<(Mutex, Condvar)>, + state: Arc>, + observers: Arc>>, + session: Session, commands: mpsc::Receiver, } @@ -45,20 +50,22 @@ impl Player { pub fn new(session: Session) -> Player { let (cmd_tx, cmd_rx) = mpsc::channel(); - let state = Arc::new((Mutex::new(PlayerState { + let state = Arc::new(Mutex::new(PlayerState { status: PlayStatus::kPlayStatusStop, position_ms: 0, position_measured_at: 0, update_time: util::now_ms(), volume: 0x8000, end_of_track: false, - }), - Condvar::new())); + })); + + let observers = Arc::new(Mutex::new(Vec::new())); let internal = PlayerInternal { session: session, commands: cmd_rx, state: state.clone(), + observers: observers.clone(), }; thread::spawn(move || internal.run()); @@ -66,6 +73,7 @@ impl Player { Player { commands: cmd_tx, state: state, + observers: observers, } } @@ -94,31 +102,15 @@ impl Player { } pub fn state(&self) -> MutexGuard { - self.state.0.lock().unwrap() + self.state.lock().unwrap() } pub fn volume(&self, vol: u16) { self.command(PlayerCommand::Volume(vol)); } - pub fn updates(&self) -> mpsc::Receiver { - let state = self.state.clone(); - let (update_tx, update_rx) = mpsc::channel(); - - thread::spawn(move || { - let mut guard = state.0.lock().unwrap(); - let mut last_update; - loop { - last_update = guard.update_time; - update_tx.send(guard.update_time).unwrap(); - - while last_update >= guard.update_time { - guard = state.1.wait(guard).unwrap(); - } - } - }); - - update_rx + pub fn add_observer(&self, observer: PlayerObserver) { + self.observers.lock().unwrap().push(observer); } } @@ -135,7 +127,7 @@ impl PlayerInternal { let mut decoder = None; loop { - let playing = self.state.0.lock().unwrap().status == PlayStatus::kPlayStatusPlay; + let playing = self.state.lock().unwrap().status == PlayStatus::kPlayStatusPlay; let cmd = if playing { self.commands.try_recv().ok() } else { @@ -252,14 +244,14 @@ impl PlayerInternal { None => (), } - if self.state.0.lock().unwrap().status == PlayStatus::kPlayStatusPlay { + if self.state.lock().unwrap().status == PlayStatus::kPlayStatusPlay { match decoder.as_mut().unwrap().packets().next() { Some(Ok(packet)) => { let buffer = packet.data .iter() .map(|&x| { (x as i32 - * self.state.0.lock().unwrap().volume as i32 + * self.state.lock().unwrap().volume as i32 / 0xFFFF) as i16 }) .collect::>(); @@ -307,11 +299,15 @@ impl PlayerInternal { fn update(&self, f: F) where F: FnOnce(&mut MutexGuard) -> bool { - let mut guard = self.state.0.lock().unwrap(); + let mut guard = self.state.lock().unwrap(); let update = f(&mut guard); + + let observers = self.observers.lock().unwrap(); if update { guard.update_time = util::now_ms(); - self.state.1.notify_all(); + for observer in observers.iter() { + observer(&guard); + } } } } diff --git a/src/spirc.rs b/src/spirc.rs index 09483baa..7f1c72d2 100644 --- a/src/spirc.rs +++ b/src/spirc.rs @@ -6,16 +6,19 @@ use session::Session; use util::SpotifyId; use util::version::version_string; use mercury::{MercuryRequest, MercuryMethod}; -use player::Player; +use player::{Player, PlayerState}; + +use std::sync::{Mutex, Arc}; use librespot_protocol as protocol; pub use librespot_protocol::spirc::PlayStatus; -pub struct SpircManager { +pub struct SpircManager(Arc>); + +struct SpircInternal { player: Player, session: Session, - state_update_id: i64, seq_nr: u32, name: String, @@ -41,11 +44,10 @@ impl SpircManager { let ident = session.0.data.read().unwrap().device_id.clone(); let name = session.0.config.device_name.clone(); - SpircManager { + SpircManager(Arc::new(Mutex::new(SpircInternal { player: player, session: session, - state_update_id: 0, seq_nr: 0, name: name, @@ -64,56 +66,64 @@ impl SpircManager { tracks: Vec::new(), index: 0, - } + }))) } pub fn run(&mut self) { - let rx = self.session.mercury_sub(format!("hm://remote/user/{}/", - self.session - .0 - .data - .read() - .unwrap() - .canonical_username - .clone())); - let updates = self.player.updates(); + let rx = { + let mut internal = self.0.lock().unwrap(); - self.notify(true, None); + let rx = internal.session.mercury_sub(internal.uri()); - loop { - select! { - pkt = rx.recv() => { - let frame = protobuf::parse_from_bytes::( - pkt.unwrap().payload.first().unwrap()).unwrap(); + internal.notify(true, None); - println!("{:?} {} {} {} {}", - frame.get_typ(), - frame.get_device_state().get_name(), - frame.get_ident(), - frame.get_seq_nr(), - frame.get_state_update_id()); - if frame.get_ident() != self.ident && - (frame.get_recipient().len() == 0 || - frame.get_recipient().contains(&self.ident)) { - self.handle(frame); - } - }, - update_time = updates.recv() => { - let end_of_track = self.player.state().end_of_track(); - if end_of_track { - self.index = (self.index + 1) % self.tracks.len() as u32; - let track = self.tracks[self.index as usize]; - self.player.load(track, true, 0); - } else { - self.state_update_id = update_time.unwrap(); - self.notify(false, None); - } + // Use a weak pointer to avoid creating an Rc cycle between the player and the + // SpircManager + let _self = Arc::downgrade(&self.0); + internal.player.add_observer(Box::new(move |state| { + if let Some(_self) = _self.upgrade() { + let mut internal = _self.lock().unwrap(); + internal.on_update(state); } - } + })); + + rx + }; + + for pkt in rx { + let data = pkt.payload.first().unwrap(); + let frame = protobuf::parse_from_bytes::(data).unwrap(); + + println!("{:?} {} {} {} {}", + frame.get_typ(), + frame.get_device_state().get_name(), + frame.get_ident(), + frame.get_seq_nr(), + frame.get_state_update_id()); + + self.0.lock().unwrap().handle(frame); + } + } +} + +impl SpircInternal { + fn on_update(&mut self, player_state: &PlayerState) { + let end_of_track = player_state.end_of_track(); + if end_of_track { + self.index = (self.index + 1) % self.tracks.len() as u32; + let track = self.tracks[self.index as usize]; + self.player.load(track, true, 0); + } else { + self.notify_with_player_state(false, None, player_state); } } fn handle(&mut self, frame: protocol::spirc::Frame) { + if frame.get_ident() == self.ident || + (frame.get_recipient().len() > 0 && !frame.get_recipient().contains(&self.ident)) { + return; + } + if frame.get_recipient().len() > 0 { self.last_command_ident = frame.get_ident().to_owned(); self.last_command_msgid = frame.get_seq_nr(); @@ -179,7 +189,12 @@ impl SpircManager { .map(|track| SpotifyId::from_raw(track.get_gid())) .collect(); } + + // FIXME: this entire function is duplicated in notify_with_player_state, but the borrow + // checker makes it hard to refactor fn notify(&mut self, hello: bool, recipient: Option<&str>) { + let player_state = self.player.state(); + let mut pkt = protobuf_init!(protocol::spirc::Frame::new(), { version: 1, ident: self.ident.clone(), @@ -191,22 +206,21 @@ impl SpircManager { protocol::spirc::MessageType::kMessageTypeNotify }, - device_state: self.device_state(), + device_state: self.device_state(&player_state), recipient: protobuf::RepeatedField::from_vec( recipient.map(|r| vec![r.to_owned()] ).unwrap_or(vec![]) ), - state_update_id: self.state_update_id as i64 + state_update_id: player_state.update_time() as i64 }); if self.is_active { - pkt.set_state(self.spirc_state()); + pkt.set_state(self.spirc_state(&player_state)); } self.session .mercury(MercuryRequest { method: MercuryMethod::SEND, - uri: format!("hm://remote/user/{}", - self.session.0.data.read().unwrap().canonical_username.clone()), + uri: self.uri(), content_type: None, payload: vec![pkt.write_to_bytes().unwrap()], }) @@ -214,12 +228,47 @@ impl SpircManager { .unwrap(); } - fn spirc_state(&self) -> protocol::spirc::State { - let state = self.player.state(); - let (position_ms, position_measured_at) = state.position(); + fn notify_with_player_state(&mut self, + hello: bool, + recipient: Option<&str>, + player_state: &PlayerState) { + let mut pkt = protobuf_init!(protocol::spirc::Frame::new(), { + version: 1, + ident: self.ident.clone(), + protocol_version: "2.0.0".to_owned(), + seq_nr: { self.seq_nr += 1; self.seq_nr }, + typ: if hello { + protocol::spirc::MessageType::kMessageTypeHello + } else { + protocol::spirc::MessageType::kMessageTypeNotify + }, + + device_state: self.device_state(&player_state), + recipient: protobuf::RepeatedField::from_vec( + recipient.map(|r| vec![r.to_owned()] ).unwrap_or(vec![]) + ), + state_update_id: player_state.update_time() as i64 + }); + + if self.is_active { + pkt.set_state(self.spirc_state(&player_state)); + } + + self.session + .mercury(MercuryRequest { + method: MercuryMethod::SEND, + uri: self.uri(), + content_type: None, + payload: vec![pkt.write_to_bytes().unwrap()], + }) + .fire(); + } + + fn spirc_state(&self, player_state: &PlayerState) -> protocol::spirc::State { + let (position_ms, position_measured_at) = player_state.position(); protobuf_init!(protocol::spirc::State::new(), { - status: state.status(), + status: player_state.status(), position_ms: position_ms, position_measured_at: position_measured_at as u64, @@ -240,12 +289,12 @@ impl SpircManager { }) } - fn device_state(&self) -> protocol::spirc::DeviceState { + fn device_state(&self, player_state: &PlayerState) -> protocol::spirc::DeviceState { protobuf_init!(protocol::spirc::DeviceState::new(), { sw_version: version_string(), is_active: self.is_active, can_play: self.can_play, - volume: self.player.state().volume() as u32, + volume: player_state.volume() as u32, name: self.name.clone(), error_code: 0, became_active_at: if self.is_active { self.became_active_at as i64 } else { 0 }, @@ -299,4 +348,9 @@ impl SpircManager { ], }) } + + fn uri(&self) -> String { + format!("hm://remote/user/{}", + self.session.0.data.read().unwrap().canonical_username.clone()) + } }