diff --git a/core/src/session.rs b/core/src/session.rs index fd706798..b0eca0c0 100644 --- a/core/src/session.rs +++ b/core/src/session.rs @@ -39,6 +39,8 @@ struct SessionInternal { mercury: OnceCell, cache: Option>, + handle: tokio::runtime::Handle, + session_id: usize, } @@ -65,7 +67,13 @@ impl Session { cache.save_credentials(&reusable_credentials); } - let session = Session::create(conn, config, cache, reusable_credentials.username); + let session = Session::create( + conn, + config, + cache, + reusable_credentials.username, + tokio::runtime::Handle::current(), + ); Ok(session) } @@ -75,6 +83,7 @@ impl Session { config: SessionConfig, cache: Option, username: String, + handle: tokio::runtime::Handle, ) -> Session { let (sink, stream) = transport.split(); @@ -100,6 +109,8 @@ impl Session { channel: OnceCell::new(), mercury: OnceCell::new(), + handle, + session_id: session_id, })); @@ -139,7 +150,7 @@ impl Session { T: Future + Send + 'static, T::Output: Send + 'static, { - tokio::spawn(task); + self.0.handle.spawn(task); } fn debug_info(&self) { diff --git a/playback/src/player.rs b/playback/src/player.rs index 6f6a85ae..3ee5c989 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -7,7 +7,6 @@ use crate::audio::{ use crate::audio_backend::Sink; use crate::config::NormalisationType; use crate::config::{Bitrate, PlayerConfig}; -use crate::librespot_core::tokio; use crate::metadata::{AudioItem, FileFormat}; use crate::mixer::AudioFilter; use librespot_core::session::Session; @@ -15,25 +14,22 @@ use librespot_core::spotify_id::SpotifyId; use librespot_core::util::SeqGenerator; use byteorder::{LittleEndian, ReadBytesExt}; -use futures::{ - channel::{mpsc, oneshot}, - future, Future, Stream, StreamExt, -}; -use std::io::{Read, Seek, SeekFrom}; -use std::mem; +use futures::channel::{mpsc, oneshot}; +use futures::{future, Future, Stream, StreamExt, TryFutureExt}; +use std::borrow::Cow; + +use std::cmp::max; +use std::io::{self, Read, Seek, SeekFrom}; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::time::{Duration, Instant}; -use std::{borrow::Cow, io}; -use std::{ - cmp::max, - pin::Pin, - task::{Context, Poll}, -}; +use std::{mem, thread}; const PRELOAD_NEXT_TRACK_BEFORE_END_DURATION_MS: u32 = 30000; pub struct Player { commands: Option>, - task_handle: Option>, + thread_handle: Option>, play_request_id_generator: SeqGenerator, } @@ -251,33 +247,33 @@ impl Player { let (cmd_tx, cmd_rx) = mpsc::unbounded(); let (event_sender, event_receiver) = mpsc::unbounded(); - debug!("new Player[{}]", session.session_id()); + let handle = thread::spawn(move || { + debug!("new Player[{}]", session.session_id()); - let internal = PlayerInternal { - session: session, - config: config, - commands: cmd_rx, + let internal = PlayerInternal { + session: session, + config: config, + commands: cmd_rx, - state: PlayerState::Stopped, - preload: PlayerPreload::None, - sink: sink_builder(), - sink_status: SinkStatus::Closed, - sink_event_callback: None, - audio_filter: audio_filter, - event_senders: [event_sender].to_vec(), - }; + state: PlayerState::Stopped, + preload: PlayerPreload::None, + sink: sink_builder(), + sink_status: SinkStatus::Closed, + sink_event_callback: None, + audio_filter: audio_filter, + event_senders: [event_sender].to_vec(), + }; - // While PlayerInternal is written as a future, it still contains blocking code. - // It must be run by using wait() in a dedicated thread. - let handle = tokio::spawn(async move { - internal.await; + // While PlayerInternal is written as a future, it still contains blocking code. + // It must be run by using wait() in a dedicated thread. + futures::executor::block_on(internal); debug!("PlayerInternal thread finished."); }); ( Player { commands: Some(cmd_tx), - task_handle: Some(handle), + thread_handle: Some(handle), play_request_id_generator: SeqGenerator::new(0), }, event_receiver, @@ -351,13 +347,11 @@ impl Drop for Player { fn drop(&mut self) { debug!("Shutting down player thread ..."); self.commands = None; - if let Some(handle) = self.task_handle.take() { - tokio::spawn(async { - match handle.await { - Ok(_) => (), - Err(_) => error!("Player thread panicked!"), - } - }); + if let Some(handle) = self.thread_handle.take() { + match handle.join() { + Ok(_) => (), + Err(_) => error!("Player thread panicked!"), + } } } } @@ -436,15 +430,23 @@ impl PlayerState { #[allow(dead_code)] fn is_stopped(&self) -> bool { - matches!(self, Self::Stopped) + use self::PlayerState::*; + match *self { + Stopped => true, + _ => false, + } } fn is_loading(&self) -> bool { - matches!(self, Self::Loading { .. }) + use self::PlayerState::*; + match *self { + Loading { .. } => true, + _ => false, + } } fn decoder(&mut self) -> Option<&mut Decoder> { - use PlayerState::*; + use self::PlayerState::*; match *self { Stopped | EndOfTrack { .. } | Loading { .. } => None, Paused { @@ -1243,9 +1245,10 @@ impl PlayerInternal { loaded_track .stream_loader_controller .set_random_access_mode(); - let _ = tokio::task::block_in_place(|| { - loaded_track.decoder.seek(position_ms as i64) - }); + let _ = loaded_track.decoder.seek(position_ms as i64); // This may be blocking. + // But most likely the track is fully + // loaded already because we played + // to the end of it. loaded_track.stream_loader_controller.set_stream_mode(); loaded_track.stream_position_pcm = Self::position_ms_to_pcm(position_ms); } @@ -1278,7 +1281,7 @@ impl PlayerInternal { // we can use the current decoder. Ensure it's at the correct position. if Self::position_ms_to_pcm(position_ms) != *stream_position_pcm { stream_loader_controller.set_random_access_mode(); - let _ = tokio::task::block_in_place(|| decoder.seek(position_ms as i64)); + let _ = decoder.seek(position_ms as i64); // This may be blocking. stream_loader_controller.set_stream_mode(); *stream_position_pcm = Self::position_ms_to_pcm(position_ms); } @@ -1346,9 +1349,7 @@ impl PlayerInternal { loaded_track .stream_loader_controller .set_random_access_mode(); - let _ = tokio::task::block_in_place(|| { - loaded_track.decoder.seek(position_ms as i64) - }); + let _ = loaded_track.decoder.seek(position_ms as i64); // This may be blocking loaded_track.stream_loader_controller.set_stream_mode(); } self.start_playback(track_id, play_request_id, *loaded_track, play); @@ -1563,7 +1564,7 @@ impl PlayerInternal { } } - pub fn load_track( + fn load_track( &self, spotify_id: SpotifyId, position_ms: u32, @@ -1574,22 +1575,23 @@ impl PlayerInternal { // easily. Instead we spawn a thread to do the work and return a one-shot channel as the // future to work with. - let session = self.session.clone(); - let config = self.config.clone(); + let loader = PlayerTrackLoader { + session: self.session.clone(), + config: self.config.clone(), + }; - async move { - let loader = PlayerTrackLoader { session, config }; + let (result_tx, result_rx) = oneshot::channel(); - let (result_tx, result_rx) = oneshot::channel(); - - tokio::spawn(async move { - if let Some(data) = loader.load_track(spotify_id, position_ms).await { + std::thread::spawn(move || { + futures::executor::block_on(loader.load_track(spotify_id, position_ms)).and_then( + move |data| { let _ = result_tx.send(data); - } - }); + Some(()) + }, + ); + }); - result_rx.await.map_err(|_| ()) - } + result_rx.map_err(|_| ()) } fn preload_data_before_playback(&mut self) { @@ -1615,9 +1617,7 @@ impl PlayerInternal { * bytes_per_second as f64) as usize, (READ_AHEAD_BEFORE_PLAYBACK_SECONDS * bytes_per_second as f64) as usize, ); - tokio::task::block_in_place(|| { - stream_loader_controller.fetch_next_blocking(wait_for_data_length) - }); + stream_loader_controller.fetch_next_blocking(wait_for_data_length); } } }