diff --git a/connect/src/spirc.rs b/connect/src/spirc.rs index 9d412714..6c766a4c 100644 --- a/connect/src/spirc.rs +++ b/connect/src/spirc.rs @@ -301,6 +301,10 @@ impl Future for SpircTask { loop { let mut progress = false; + if self.session.is_invalid() { + return Ok(Async::Ready(())); + } + if !self.shutdown { match self.subscription.poll().unwrap() { Async::Ready(Some(frame)) => { diff --git a/core/src/session.rs b/core/src/session.rs index ad0bf27a..335cf0e3 100644 --- a/core/src/session.rs +++ b/core/src/session.rs @@ -20,6 +20,7 @@ use mercury::MercuryManager; struct SessionData { country: String, canonical_username: String, + invalid: bool, } struct SessionInternal { @@ -77,7 +78,9 @@ impl Session { reusable_credentials.username.clone(), ); - handle.spawn(task.map_err(|e| panic!(e))); + handle.spawn(task.map_err(|e| { + error!("{:?}", e); + })); session }); @@ -104,6 +107,7 @@ impl Session { data: RwLock::new(SessionData { country: String::new(), canonical_username: username, + invalid: false, }), tx_connection: sender_tx, @@ -212,6 +216,15 @@ impl Session { pub fn session_id(&self) -> usize { self.0.session_id } + + pub fn shutdown(&self) { + debug!("Invalidating session[{}]", self.0.session_id); + self.0.data.write().unwrap().invalid = true; + } + + pub fn is_invalid(&self) -> bool { + self.0.data.read().unwrap().invalid + } } #[derive(Clone)] @@ -240,6 +253,7 @@ where impl Future for DispatchTask where S: Stream, + ::Error: ::std::fmt::Debug, { type Item = (); type Error = S::Error; @@ -251,7 +265,15 @@ where }; loop { - let (cmd, data) = try_ready!(self.0.poll()).expect("connection closed"); + let (cmd, data) = match self.0.poll() { + Ok(Async::Ready(t)) => t, + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(e) => { + session.shutdown(); + return Err(From::from(e)); + } + }.expect("connection closed"); + session.dispatch(cmd, data); } } diff --git a/playback/src/player.rs b/playback/src/player.rs index d8036830..dd994235 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -341,6 +341,10 @@ impl PlayerInternal { self.handle_packet(packet, current_normalisation_factor); } } + + if self.session.is_invalid() { + return; + } } }