use crate::{ context_resolver::{ContextAction, ContextResolver, ResolveContext}, core::{ authentication::Credentials, dealer::{ manager::{BoxedStream, BoxedStreamResult, Reply, RequestReply}, protocol::{Command, Message, Request}, }, session::UserAttributes, Error, Session, SpotifyId, }, model::{LoadRequest, PlayingTrack, SpircPlayStatus}, playback::{ mixer::Mixer, player::{Player, PlayerEvent, PlayerEventChannel}, }, protocol::{ connect::{Cluster, ClusterUpdate, LogoutCommand, SetVolumeCommand}, context::Context, explicit_content_pubsub::UserAttributesUpdate, playlist4_external::PlaylistModificationInfo, social_connect_v2::SessionUpdate, transfer_state::TransferState, user_attributes::UserAttributesMutation, }, state::{ context::{ContextType, ResetContext}, provider::IsProvider, {ConnectConfig, ConnectState}, }, LoadContextOptions, LoadRequestOptions, }; use futures_util::StreamExt; use protobuf::MessageField; use std::{ future::Future, sync::atomic::{AtomicUsize, Ordering}, sync::Arc, time::{Duration, SystemTime, UNIX_EPOCH}, }; use thiserror::Error; use tokio::{sync::mpsc, time::sleep}; #[derive(Debug, Error)] enum SpircError { #[error("response payload empty")] NoData, #[error("{0} had no uri")] NoUri(&'static str), #[error("message pushed for another URI")] InvalidUri(String), #[error("failed to put connect state for new device")] FailedDealerSetup, #[error("unknown endpoint: {0:#?}")] UnknownEndpoint(serde_json::Value), } impl From for Error { fn from(err: SpircError) -> Self { use SpircError::*; match err { NoData | NoUri(_) => Error::unavailable(err), InvalidUri(_) | FailedDealerSetup => Error::aborted(err), UnknownEndpoint(_) => Error::unimplemented(err), } } } struct SpircTask { player: Arc, mixer: Arc, /// the state management object connect_state: ConnectState, play_request_id: Option, play_status: SpircPlayStatus, connection_id_update: BoxedStreamResult, connect_state_update: BoxedStreamResult, connect_state_volume_update: BoxedStreamResult, connect_state_logout_request: BoxedStreamResult, playlist_update: BoxedStreamResult, session_update: BoxedStreamResult, connect_state_command: BoxedStream, user_attributes_update: BoxedStreamResult, user_attributes_mutation: BoxedStreamResult, commands: Option>, player_events: Option, context_resolver: ContextResolver, shutdown: bool, session: Session, /// is set when transferring, and used after resolving the contexts to finish the transfer pub transfer_state: Option, /// when set to true, it will update the volume after [VOLUME_UPDATE_DELAY], /// when no other future resolves, otherwise resets the delay update_volume: bool, /// when set to true, it will update the volume after [UPDATE_STATE_DELAY], /// when no other future resolves, otherwise resets the delay update_state: bool, spirc_id: usize, } static SPIRC_COUNTER: AtomicUsize = AtomicUsize::new(0); #[derive(Debug)] enum SpircCommand { Play, PlayPause, Pause, Prev, Next, VolumeUp, VolumeDown, Shutdown, Shuffle(bool), Repeat(bool), RepeatTrack(bool), Disconnect { pause: bool }, SetPosition(u32), SetVolume(u16), Activate, Load(LoadRequest), } const CONTEXT_FETCH_THRESHOLD: usize = 2; // delay to update volume after a certain amount of time, instead on each update request const VOLUME_UPDATE_DELAY: Duration = Duration::from_millis(500); // to reduce updates to remote, we group some request by waiting for a set amount of time const UPDATE_STATE_DELAY: Duration = Duration::from_millis(200); /// The spotify connect handle pub struct Spirc { commands: mpsc::UnboundedSender, } impl Spirc { /// Initializes a new spotify connect device /// /// The returned tuple consists out of a handle to the [`Spirc`] that /// can control the local connect device when active. And a [`Future`] /// which represents the [`Spirc`] event loop that processes the whole /// connect device logic. pub async fn new( config: ConnectConfig, session: Session, credentials: Credentials, player: Arc, mixer: Arc, ) -> Result<(Spirc, impl Future), Error> { fn extract_connection_id(msg: Message) -> Result { let connection_id = msg .headers .get("Spotify-Connection-Id") .ok_or_else(|| SpircError::InvalidUri(msg.uri.clone()))?; Ok(connection_id.to_owned()) } let spirc_id = SPIRC_COUNTER.fetch_add(1, Ordering::AcqRel); debug!("new Spirc[{}]", spirc_id); let connect_state = ConnectState::new(config, &session); let connection_id_update = session .dealer() .listen_for("hm://pusher/v1/connections/", extract_connection_id)?; let connect_state_update = session .dealer() .listen_for("hm://connect-state/v1/cluster", Message::from_raw)?; let connect_state_volume_update = session .dealer() .listen_for("hm://connect-state/v1/connect/volume", Message::from_raw)?; let connect_state_logout_request = session .dealer() .listen_for("hm://connect-state/v1/connect/logout", Message::from_raw)?; let playlist_update = session .dealer() .listen_for("hm://playlist/v2/playlist/", Message::from_raw)?; let session_update = session .dealer() .listen_for("social-connect/v2/session_update", Message::from_json)?; let user_attributes_update = session .dealer() .listen_for("spotify:user:attributes:update", Message::from_raw)?; // can be trigger by toggling autoplay in a desktop client let user_attributes_mutation = session .dealer() .listen_for("spotify:user:attributes:mutated", Message::from_raw)?; let connect_state_command = session .dealer() .handle_for("hm://connect-state/v1/player/command")?; // pre-acquire client_token, preventing multiple request while running let _ = session.spclient().client_token().await?; // Connect *after* all message listeners are registered session.connect(credentials, true).await?; // pre-acquire access_token (we need to be authenticated to retrieve a token) let _ = session.login5().auth_token().await?; let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); let player_events = player.get_player_event_channel(); let mut task = SpircTask { player, mixer, connect_state, play_request_id: None, play_status: SpircPlayStatus::Stopped, connection_id_update, connect_state_update, connect_state_volume_update, connect_state_logout_request, playlist_update, session_update, connect_state_command, user_attributes_update, user_attributes_mutation, commands: Some(cmd_rx), player_events: Some(player_events), context_resolver: ContextResolver::new(session.clone()), shutdown: false, session, transfer_state: None, update_volume: false, update_state: false, spirc_id, }; let spirc = Spirc { commands: cmd_tx }; let initial_volume = task.connect_state.device_info().volume; task.connect_state.set_volume(0); match initial_volume.try_into() { Ok(volume) => { task.set_volume(volume); // we don't want to update the volume initially, // we just want to set the mixer to the correct volume task.update_volume = false; } Err(why) => error!("failed to update initial volume: {why}"), }; Ok((spirc, task.run())) } /// Safely shutdowns the spirc. /// /// This pauses the playback, disconnects the connect device and /// bring the future initially returned to an end. pub fn shutdown(&self) -> Result<(), Error> { Ok(self.commands.send(SpircCommand::Shutdown)?) } /// Resumes the playback /// /// Does nothing if we are not the active device, or it isn't paused. pub fn play(&self) -> Result<(), Error> { Ok(self.commands.send(SpircCommand::Play)?) } /// Resumes or pauses the playback /// /// Does nothing if we are not the active device. pub fn play_pause(&self) -> Result<(), Error> { Ok(self.commands.send(SpircCommand::PlayPause)?) } /// Pauses the playback /// /// Does nothing if we are not the active device, or if it isn't playing. pub fn pause(&self) -> Result<(), Error> { Ok(self.commands.send(SpircCommand::Pause)?) } /// Seeks to the beginning or skips to the previous track. /// /// Seeks to the beginning when the current track position /// is greater than 3 seconds. /// /// Does nothing if we are not the active device. pub fn prev(&self) -> Result<(), Error> { Ok(self.commands.send(SpircCommand::Prev)?) } /// Skips to the next track. /// /// Does nothing if we are not the active device. pub fn next(&self) -> Result<(), Error> { Ok(self.commands.send(SpircCommand::Next)?) } /// Increases the volume by configured steps of [ConnectConfig]. /// /// Does nothing if we are not the active device. pub fn volume_up(&self) -> Result<(), Error> { Ok(self.commands.send(SpircCommand::VolumeUp)?) } /// Decreases the volume by configured steps of [ConnectConfig]. /// /// Does nothing if we are not the active device. pub fn volume_down(&self) -> Result<(), Error> { Ok(self.commands.send(SpircCommand::VolumeDown)?) } /// Shuffles the playback according to the value. /// /// If true shuffles/reshuffles the playback. Otherwise, does /// nothing (if not shuffled) or unshuffles the playback while /// resuming at the position of the current track. /// /// Does nothing if we are not the active device. pub fn shuffle(&self, shuffle: bool) -> Result<(), Error> { Ok(self.commands.send(SpircCommand::Shuffle(shuffle))?) } /// Repeats the playback context according to the value. /// /// Does nothing if we are not the active device. pub fn repeat(&self, repeat: bool) -> Result<(), Error> { Ok(self.commands.send(SpircCommand::Repeat(repeat))?) } /// Repeats the current track if true. /// /// Does nothing if we are not the active device. /// /// Skipping to the next track disables the repeating. pub fn repeat_track(&self, repeat: bool) -> Result<(), Error> { Ok(self.commands.send(SpircCommand::RepeatTrack(repeat))?) } /// Update the volume to the given value. /// /// Does nothing if we are not the active device. pub fn set_volume(&self, volume: u16) -> Result<(), Error> { Ok(self.commands.send(SpircCommand::SetVolume(volume))?) } /// Updates the position to the given value. /// /// Does nothing if we are not the active device. /// /// If value is greater than the track duration, /// the update is ignored. pub fn set_position_ms(&self, position_ms: u32) -> Result<(), Error> { Ok(self.commands.send(SpircCommand::SetPosition(position_ms))?) } /// Load a new context and replace the current. /// /// Does nothing if we are not the active device. /// /// Does not overwrite the queue. pub fn load(&self, command: LoadRequest) -> Result<(), Error> { Ok(self.commands.send(SpircCommand::Load(command))?) } /// Disconnects the current device and pauses the playback according the value. /// /// Does nothing if we are not the active device. pub fn disconnect(&self, pause: bool) -> Result<(), Error> { Ok(self.commands.send(SpircCommand::Disconnect { pause })?) } /// Acquires the control as active connect device. /// /// Does nothing if we are not the active device. pub fn activate(&self) -> Result<(), Error> { Ok(self.commands.send(SpircCommand::Activate)?) } } impl SpircTask { async fn run(mut self) { // simplify unwrapping of received item or parsed result macro_rules! unwrap { ( $next:expr, |$some:ident| $use_some:expr ) => { match $next { Some($some) => $use_some, None => { error!("{} selected, but none received", stringify!($next)); break; } } }; ( $next:expr, match |$ok:ident| $use_ok:expr ) => { unwrap!($next, |$ok| match $ok { Ok($ok) => $use_ok, Err(why) => error!("could not parse {}: {}", stringify!($ok), why), }) }; } if let Err(why) = self.session.dealer().start().await { error!("starting dealer failed: {why}"); return; } while !self.session.is_invalid() && !self.shutdown { let commands = self.commands.as_mut(); let player_events = self.player_events.as_mut(); // when state and volume update have a higher priority than context resolving // because of that the context resolving has to wait, so that the other tasks can finish let allow_context_resolving = !self.update_state && !self.update_volume; tokio::select! { // startup of the dealer requires a connection_id, which is retrieved at the very beginning connection_id_update = self.connection_id_update.next() => unwrap! { connection_id_update, match |connection_id| if let Err(why) = self.handle_connection_id_update(connection_id).await { error!("failed handling connection id update: {why}"); break; } }, // main dealer update of any remote device updates cluster_update = self.connect_state_update.next() => unwrap! { cluster_update, match |cluster_update| if let Err(e) = self.handle_cluster_update(cluster_update).await { error!("could not dispatch connect state update: {}", e); } }, // main dealer request handling (dealer expects an answer) request = self.connect_state_command.next() => unwrap! { request, |request| if let Err(e) = self.handle_connect_state_request(request).await { error!("couldn't handle connect state command: {}", e); } }, // volume request handling is send separately (it's more like a fire forget) volume_update = self.connect_state_volume_update.next() => unwrap! { volume_update, match |volume_update| match volume_update.volume.try_into() { Ok(volume) => self.set_volume(volume), Err(why) => error!("can't update volume, failed to parse i32 to u16: {why}") } }, logout_request = self.connect_state_logout_request.next() => unwrap! { logout_request, |logout_request| { error!("received logout request, currently not supported: {logout_request:#?}"); // todo: call logout handling } }, playlist_update = self.playlist_update.next() => unwrap! { playlist_update, match |playlist_update| if let Err(why) = self.handle_playlist_modification(playlist_update) { error!("failed to handle playlist modification: {why}") } }, user_attributes_update = self.user_attributes_update.next() => unwrap! { user_attributes_update, match |attributes| self.handle_user_attributes_update(attributes) }, user_attributes_mutation = self.user_attributes_mutation.next() => unwrap! { user_attributes_mutation, match |attributes| self.handle_user_attributes_mutation(attributes) }, session_update = self.session_update.next() => unwrap! { session_update, match |session_update| self.handle_session_update(session_update) }, cmd = async { commands?.recv().await }, if commands.is_some() => if let Some(cmd) = cmd { if let Err(e) = self.handle_command(cmd).await { debug!("could not dispatch command: {}", e); } }, event = async { player_events?.recv().await }, if player_events.is_some() => if let Some(event) = event { if let Err(e) = self.handle_player_event(event) { error!("could not dispatch player event: {}", e); } }, _ = async { sleep(UPDATE_STATE_DELAY).await }, if self.update_state => { self.update_state = false; if let Err(why) = self.notify().await { error!("state update: {why}") } }, _ = async { sleep(VOLUME_UPDATE_DELAY).await }, if self.update_volume => { self.update_volume = false; info!("delayed volume update for all devices: volume is now {}", self.connect_state.device_info().volume); if let Err(why) = self.connect_state.notify_volume_changed(&self.session).await { error!("error updating connect state for volume update: {why}") } // for some reason the web-player does need two separate updates, so that the // position of the current track is retained, other clients also send a state // update before they send the volume update if let Err(why) = self.notify().await { error!("error updating connect state for volume update: {why}") } }, // context resolver handling, the idea/reason behind it the following: // // when we request a context that has multiple pages (for example an artist) // resolving all pages at once can take around ~1-30sec, when we resolve // everything at once that would block our main loop for that time // // to circumvent this behavior, we request each context separately here and // finish after we received our last item of a type next_context = async { self.context_resolver.get_next_context(|| { self.connect_state.recent_track_uris() }).await }, if allow_context_resolving && self.context_resolver.has_next() => { let update_state = self.handle_next_context(next_context); if update_state { if let Err(why) = self.notify().await { error!("update after context resolving failed: {why}") } } }, else => break } } if !self.shutdown && self.connect_state.is_active() { warn!("unexpected shutdown"); if let Err(why) = self.handle_disconnect().await { error!("error during disconnecting: {why}") } } self.session.dealer().close().await; } fn handle_next_context(&mut self, next_context: Result) -> bool { let next_context = match next_context { Err(why) => { self.context_resolver.mark_next_unavailable(); self.context_resolver.remove_used_and_invalid(); error!("{why}"); return false; } Ok(ctx) => ctx, }; debug!("handling next context {:?}", next_context.uri); match self .context_resolver .apply_next_context(&mut self.connect_state, next_context) { Ok(remaining) => { if let Some(remaining) = remaining { self.context_resolver.add_list(remaining) } } Err(why) => { error!("{why}") } } let update_state = if self .context_resolver .try_finish(&mut self.connect_state, &mut self.transfer_state) { self.add_autoplay_resolving_when_required(); true } else { false }; self.context_resolver.remove_used_and_invalid(); update_state } // todo: is the time_delta still necessary? fn now_ms(&self) -> i64 { let dur = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_else(|err| err.duration()); dur.as_millis() as i64 + 1000 * self.session.time_delta() } async fn handle_command(&mut self, cmd: SpircCommand) -> Result<(), Error> { trace!("Received SpircCommand::{:?}", cmd); match cmd { SpircCommand::Shutdown => { trace!("Received SpircCommand::Shutdown"); self.handle_pause(); self.handle_disconnect().await?; self.shutdown = true; if let Some(rx) = self.commands.as_mut() { rx.close() } } SpircCommand::Activate if !self.connect_state.is_active() => { trace!("Received SpircCommand::{:?}", cmd); self.handle_activate(); return self.notify().await; } SpircCommand::Activate => warn!( "SpircCommand::{:?} will be ignored while already active", cmd ), _ if !self.connect_state.is_active() => { warn!("SpircCommand::{:?} will be ignored while Not Active", cmd) } SpircCommand::Disconnect { pause } => { if pause { self.handle_pause() } return self.handle_disconnect().await; } SpircCommand::Play => self.handle_play(), SpircCommand::PlayPause => self.handle_play_pause(), SpircCommand::Pause => self.handle_pause(), SpircCommand::Prev => self.handle_prev()?, SpircCommand::Next => self.handle_next(None)?, SpircCommand::VolumeUp => self.handle_volume_up(), SpircCommand::VolumeDown => self.handle_volume_down(), SpircCommand::Shuffle(shuffle) => self.handle_shuffle(shuffle)?, SpircCommand::Repeat(repeat) => self.handle_repeat_context(repeat)?, SpircCommand::RepeatTrack(repeat) => self.handle_repeat_track(repeat), SpircCommand::SetPosition(position) => self.handle_seek(position), SpircCommand::SetVolume(volume) => self.set_volume(volume), SpircCommand::Load(command) => self.handle_load(command, None).await?, }; self.notify().await } fn handle_player_event(&mut self, event: PlayerEvent) -> Result<(), Error> { if let PlayerEvent::TrackChanged { audio_item } = event { self.connect_state.update_duration(audio_item.duration_ms); self.update_state = true; return Ok(()); } // update play_request_id if let PlayerEvent::PlayRequestIdChanged { play_request_id } = event { self.play_request_id = Some(play_request_id); return Ok(()); } let is_current_track = matches! { (event.get_play_request_id(), self.play_request_id), (Some(event_id), Some(current_id)) if event_id == current_id }; // we only process events if the play_request_id matches. If it doesn't, it is // an event that belongs to a previous track and only arrives now due to a race // condition. In this case we have updated the state already and don't want to // mess with it. if !is_current_track { return Ok(()); } match event { PlayerEvent::EndOfTrack { .. } => { let next_track = self .connect_state .repeat_track() .then(|| self.connect_state.current_track(|t| t.uri.clone())); self.handle_next(next_track)? } PlayerEvent::Loading { .. } => match self.play_status { SpircPlayStatus::LoadingPlay { position_ms } => { self.connect_state .update_position(position_ms, self.now_ms()); trace!("==> LoadingPlay"); } SpircPlayStatus::LoadingPause { position_ms } => { self.connect_state .update_position(position_ms, self.now_ms()); trace!("==> LoadingPause"); } _ => { self.connect_state.update_position(0, self.now_ms()); trace!("==> Loading"); } }, PlayerEvent::Seeked { position_ms, .. } => { trace!("==> Seeked"); self.connect_state .update_position(position_ms, self.now_ms()) } PlayerEvent::Playing { position_ms, .. } | PlayerEvent::PositionCorrection { position_ms, .. } => { trace!("==> Playing"); let new_nominal_start_time = self.now_ms() - position_ms as i64; match self.play_status { SpircPlayStatus::Playing { ref mut nominal_start_time, .. } => { if (*nominal_start_time - new_nominal_start_time).abs() > 100 { *nominal_start_time = new_nominal_start_time; self.connect_state .update_position(position_ms, self.now_ms()); } else { return Ok(()); } } SpircPlayStatus::LoadingPlay { .. } | SpircPlayStatus::LoadingPause { .. } => { self.connect_state .update_position(position_ms, self.now_ms()); self.play_status = SpircPlayStatus::Playing { nominal_start_time: new_nominal_start_time, preloading_of_next_track_triggered: false, }; } _ => return Ok(()), } } PlayerEvent::Paused { position_ms: new_position_ms, .. } => { trace!("==> Paused"); match self.play_status { SpircPlayStatus::Paused { .. } | SpircPlayStatus::Playing { .. } => { self.connect_state .update_position(new_position_ms, self.now_ms()); self.play_status = SpircPlayStatus::Paused { position_ms: new_position_ms, preloading_of_next_track_triggered: false, }; } SpircPlayStatus::LoadingPlay { .. } | SpircPlayStatus::LoadingPause { .. } => { self.connect_state .update_position(new_position_ms, self.now_ms()); self.play_status = SpircPlayStatus::Paused { position_ms: new_position_ms, preloading_of_next_track_triggered: false, }; } _ => return Ok(()), } } PlayerEvent::Stopped { .. } => { trace!("==> Stopped"); match self.play_status { SpircPlayStatus::Stopped => return Ok(()), _ => self.play_status = SpircPlayStatus::Stopped, } } PlayerEvent::TimeToPreloadNextTrack { .. } => { self.handle_preload_next_track(); return Ok(()); } PlayerEvent::Unavailable { track_id, .. } => { self.handle_unavailable(track_id)?; if self.connect_state.current_track(|t| &t.uri) == &track_id.to_uri()? { self.handle_next(None)? } } _ => return Ok(()), } self.update_state = true; Ok(()) } async fn handle_connection_id_update(&mut self, connection_id: String) -> Result<(), Error> { trace!("Received connection ID update: {:?}", connection_id); self.session.set_connection_id(&connection_id); let cluster = match self .connect_state .notify_new_device_appeared(&self.session) .await { Ok(res) => Cluster::parse_from_bytes(&res).ok(), Err(why) => { error!("{why:?}"); None } } .ok_or(SpircError::FailedDealerSetup)?; debug!( "successfully put connect state for {} with connection-id {connection_id}", self.session.device_id() ); let same_session = cluster.player_state.session_id == self.session.session_id() || cluster.player_state.session_id.is_empty(); if !cluster.active_device_id.is_empty() || !same_session { info!( "active device is <{}> with session <{}>", cluster.active_device_id, cluster.player_state.session_id ); return Ok(()); } else if cluster.transfer_data.is_empty() { debug!("got empty transfer state, do nothing"); return Ok(()); } else { info!( "trying to take over control automatically, session_id: {}", cluster.player_state.session_id ) } use protobuf::Message; match TransferState::parse_from_bytes(&cluster.transfer_data) { Ok(transfer_state) => self.handle_transfer(transfer_state)?, Err(why) => error!("failed to take over control: {why}"), } Ok(()) } fn handle_user_attributes_update(&mut self, update: UserAttributesUpdate) { trace!("Received attributes update: {:#?}", update); let attributes: UserAttributes = update .pairs .iter() .map(|(key, value)| (key.to_owned(), value.to_owned())) .collect(); self.session.set_user_attributes(attributes) } fn handle_user_attributes_mutation(&mut self, mutation: UserAttributesMutation) { for attribute in mutation.fields.iter() { let key = &attribute.name; if key == "autoplay" && self.session.config().autoplay.is_some() { trace!("Autoplay override active. Ignoring mutation."); continue; } if let Some(old_value) = self.session.user_data().attributes.get(key) { let new_value = match old_value.as_ref() { "0" => "1", "1" => "0", _ => old_value, }; self.session.set_user_attribute(key, new_value); trace!( "Received attribute mutation, {} was {} is now {}", key, old_value, new_value ); if key == "filter-explicit-content" && new_value == "1" { self.player .emit_filter_explicit_content_changed_event(matches!(new_value, "1")); } if key == "autoplay" && old_value != new_value { self.player .emit_auto_play_changed_event(matches!(new_value, "1")); self.add_autoplay_resolving_when_required() } } else { trace!( "Received attribute mutation for {} but key was not found!", key ); } } } async fn handle_cluster_update( &mut self, mut cluster_update: ClusterUpdate, ) -> Result<(), Error> { let reason = cluster_update.update_reason.enum_value(); let device_ids = cluster_update.devices_that_changed.join(", "); debug!( "cluster update: {reason:?} from {device_ids}, active device: {}", cluster_update.cluster.active_device_id ); if let Some(cluster) = cluster_update.cluster.take() { let became_inactive = self.connect_state.is_active() && cluster.active_device_id != self.session.device_id(); if became_inactive { info!("device became inactive"); self.connect_state.became_inactive(&self.session).await?; self.handle_stop() } else if self.connect_state.is_active() { // fixme: workaround fix, because of missing information why it behaves like it does // background: when another device sends a connect-state update, some player's position de-syncs // tried: providing session_id, playback_id, track-metadata "track_player" self.update_state = true; } } else if self.connect_state.is_active() { self.connect_state.became_inactive(&self.session).await?; } Ok(()) } async fn handle_connect_state_request( &mut self, (request, sender): RequestReply, ) -> Result<(), Error> { self.connect_state.set_last_command(request.clone()); debug!( "handling: '{}' from {}", request.command, request.sent_by_device_id ); let response = match self.handle_request(request).await { Ok(_) => Reply::Success, Err(why) => { error!("failed to handle request: {why}"); Reply::Failure } }; sender.send(response).map_err(Into::into) } async fn handle_request(&mut self, request: Request) -> Result<(), Error> { use Command::*; match request.command { // errors and unknown commands Transfer(transfer) if transfer.data.is_none() => { warn!("transfer endpoint didn't contain any data to transfer"); Err(SpircError::NoData)? } Unknown(unknown) => Err(SpircError::UnknownEndpoint(unknown))?, // implicit update of the connect_state UpdateContext(update_context) => { if matches!(update_context.context.uri, Some(ref uri) if uri != self.connect_state.context_uri()) { debug!( "ignoring context update for <{:?}>, because it isn't the current context <{}>", update_context.context.uri, self.connect_state.context_uri() ) } else { self.context_resolver.add(ResolveContext::from_context( update_context.context, ContextType::Default, ContextAction::Replace, )) } return Ok(()); } // modification and update of the connect_state Transfer(transfer) => { self.handle_transfer(transfer.data.expect("by condition checked"))?; return self.notify().await; } Play(play) => { let context_uri = play .context .uri .clone() .ok_or(SpircError::NoUri("context"))?; let context_options = play .options .player_options_override .map(Into::into) .map(LoadContextOptions::Options); self.handle_load( LoadRequest::from_context_uri( context_uri, LoadRequestOptions { start_playing: true, seek_to: play.options.seek_to.unwrap_or_default(), playing_track: play.options.skip_to.and_then(|s| s.try_into().ok()), context_options, }, ), Some(play.context), ) .await?; self.connect_state.set_origin(play.play_origin) } Pause(_) => self.handle_pause(), SeekTo(seek_to) => { // for some reason the position is stored in value, not in position trace!("seek to {seek_to:?}"); self.handle_seek(seek_to.value) } SetShufflingContext(shuffle) => self.handle_shuffle(shuffle.value)?, SetRepeatingContext(repeat_context) => { self.handle_repeat_context(repeat_context.value)? } SetRepeatingTrack(repeat_track) => self.handle_repeat_track(repeat_track.value), AddToQueue(add_to_queue) => self.connect_state.add_to_queue(add_to_queue.track, true), SetQueue(set_queue) => self.connect_state.handle_set_queue(set_queue), SetOptions(set_options) => { if let Some(repeat_context) = set_options.repeating_context { self.handle_repeat_context(repeat_context)? } if let Some(repeat_track) = set_options.repeating_track { self.handle_repeat_track(repeat_track) } let shuffle = set_options.shuffling_context; if let Some(shuffle) = shuffle { self.handle_shuffle(shuffle)?; } } SkipNext(skip_next) => self.handle_next(skip_next.track.map(|t| t.uri))?, SkipPrev(_) => self.handle_prev()?, Resume(_) if matches!(self.play_status, SpircPlayStatus::Stopped) => { self.load_track(true, 0)? } Resume(_) => self.handle_play(), } self.update_state = true; Ok(()) } fn handle_transfer(&mut self, mut transfer: TransferState) -> Result<(), Error> { let mut ctx_uri = match transfer.current_session.context.uri { None => Err(SpircError::NoUri("transfer context"))?, Some(ref uri) => uri.clone(), }; self.connect_state .reset_context(ResetContext::WhenDifferent(&ctx_uri)); match self.connect_state.current_track_from_transfer(&transfer) { Err(why) => warn!("didn't find initial track: {why}"), Ok(track) => { debug!("found initial track <{}>", track.uri); self.connect_state.set_track(track) } }; let autoplay = self.connect_state.current_track(|t| t.is_autoplay()); if autoplay { ctx_uri = ctx_uri.replace("station:", ""); } let fallback = self.connect_state.current_track(|t| &t.uri).clone(); self.context_resolver.add(ResolveContext::from_uri( ctx_uri.clone(), &fallback, ContextType::Default, ContextAction::Replace, )); let timestamp = self.now_ms(); let state = &mut self.connect_state; state.set_active(true); state.handle_initial_transfer(&mut transfer); // adjust active context, so resolve knows for which context it should set up the state state.active_context = if autoplay { ContextType::Autoplay } else { ContextType::Default }; // update position if the track continued playing let transfer_timestamp = transfer.playback.timestamp.unwrap_or_default(); let position = match transfer.playback.position_as_of_timestamp { Some(position) if transfer.playback.is_paused.unwrap_or_default() => position.into(), // update position if the track continued playing Some(position) if position > 0 => { let time_since_position_update = timestamp - transfer_timestamp; i64::from(position) + time_since_position_update } _ => 0, }; let is_playing = !transfer.playback.is_paused(); if self.connect_state.current_track(|t| t.is_autoplay()) || autoplay { debug!("currently in autoplay context, async resolving autoplay for {ctx_uri}"); self.context_resolver.add(ResolveContext::from_uri( ctx_uri, fallback, ContextType::Autoplay, ContextAction::Replace, )) } self.transfer_state = Some(transfer); self.load_track(is_playing, position.try_into()?) } async fn handle_disconnect(&mut self) -> Result<(), Error> { self.context_resolver.clear(); self.play_status = SpircPlayStatus::Stopped {}; self.connect_state .update_position_in_relation(self.now_ms()); self.notify().await?; self.connect_state.became_inactive(&self.session).await?; // this should clear the active session id, leaving an empty state self.session .spclient() .delete_connect_state_request() .await?; self.player .emit_session_disconnected_event(self.session.connection_id(), self.session.username()); Ok(()) } fn handle_stop(&mut self) { self.player.stop(); self.connect_state.update_position(0, self.now_ms()); self.connect_state.clear_next_tracks(); if let Err(why) = self.connect_state.reset_playback_to_position(None) { warn!("failed filling up next_track during stopping: {why}") } } fn handle_activate(&mut self) { self.connect_state.set_active(true); self.player .emit_session_connected_event(self.session.connection_id(), self.session.username()); self.player.emit_session_client_changed_event( self.session.client_id(), self.session.client_name(), self.session.client_brand_name(), self.session.client_model_name(), ); self.player .emit_volume_changed_event(self.connect_state.device_info().volume as u16); self.player .emit_auto_play_changed_event(self.session.autoplay()); self.player .emit_filter_explicit_content_changed_event(self.session.filter_explicit_content()); self.player .emit_shuffle_changed_event(self.connect_state.shuffling_context()); self.player.emit_repeat_changed_event( self.connect_state.repeat_context(), self.connect_state.repeat_track(), ); } async fn handle_load( &mut self, cmd: LoadRequest, context: Option, ) -> Result<(), Error> { self.connect_state .reset_context(ResetContext::WhenDifferent(&cmd.context_uri)); self.connect_state.reset_options(); if !self.connect_state.is_active() { self.handle_activate(); } let fallback = if let Some(ref ctx) = context { match ConnectState::get_context_uri_from_context(ctx) { Some(ctx_uri) => ctx_uri, None => Err(SpircError::InvalidUri(cmd.context_uri.clone()))?, } } else { &cmd.context_uri }; let update_context = if matches!(cmd.context_options, Some(LoadContextOptions::Autoplay)) { ContextType::Autoplay } else { ContextType::Default }; self.connect_state.set_active_context(update_context); let current_context_uri = self.connect_state.context_uri(); if current_context_uri == &cmd.context_uri && fallback == cmd.context_uri { debug!("context <{current_context_uri}> didn't change, no resolving required") } else { debug!("resolving context for load command"); self.context_resolver.clear(); self.context_resolver.add(ResolveContext::from_uri( &cmd.context_uri, fallback, update_context, ContextAction::Replace, )); let context = self.context_resolver.get_next_context(Vec::new).await; self.handle_next_context(context); } // for play commands with skip by uid, the context of the command contains // tracks with uri and uid, so we merge the new context with the resolved/existing context self.connect_state.merge_context(context); // load here, so that we clear the queue only after we definitely retrieved a new context self.connect_state.clear_next_tracks(); self.connect_state.clear_restrictions(); debug!("play track <{:?}>", cmd.playing_track); let index = match cmd.playing_track { None => None, Some(ref playing_track) => Some(match playing_track { PlayingTrack::Index(i) => *i as usize, PlayingTrack::Uri(uri) => { let ctx = self.connect_state.get_context(ContextType::Default)?; ConnectState::find_index_in_context(ctx, |t| &t.uri == uri)? } PlayingTrack::Uid(uid) => { let ctx = self.connect_state.get_context(ContextType::Default)?; ConnectState::find_index_in_context(ctx, |t| &t.uid == uid)? } }), }; if let Some(LoadContextOptions::Options(ref options)) = cmd.context_options { debug!( "loading with shuffle: <{}>, repeat track: <{}> context: <{}>", options.shuffle, options.repeat, options.repeat_track ); self.connect_state.set_shuffle(options.shuffle); self.connect_state.set_repeat_context(options.repeat); self.connect_state.set_repeat_track(options.repeat_track); } if matches!(cmd.context_options, Some(LoadContextOptions::Options(ref o)) if o.shuffle) { if let Some(index) = index { self.connect_state.set_current_track(index)?; } else { self.connect_state.set_current_track_random()?; } if self.context_resolver.has_next() { self.connect_state.update_queue_revision() } else { self.connect_state.shuffle(None)?; self.add_autoplay_resolving_when_required(); } } else { self.connect_state .set_current_track(index.unwrap_or_default())?; self.connect_state.reset_playback_to_position(index)?; self.add_autoplay_resolving_when_required(); } if self.connect_state.current_track(MessageField::is_some) { self.load_track(cmd.start_playing, cmd.seek_to)?; } else { info!("No active track, stopping"); self.handle_stop() } Ok(()) } fn handle_play(&mut self) { match self.play_status { SpircPlayStatus::Paused { position_ms, preloading_of_next_track_triggered, } => { self.player.play(); self.connect_state .update_position(position_ms, self.now_ms()); self.play_status = SpircPlayStatus::Playing { nominal_start_time: self.now_ms() - position_ms as i64, preloading_of_next_track_triggered, }; } SpircPlayStatus::LoadingPause { position_ms } => { self.player.play(); self.play_status = SpircPlayStatus::LoadingPlay { position_ms }; } _ => return, } // Synchronize the volume from the mixer. This is useful on // systems that can switch sources from and back to librespot. let current_volume = self.mixer.volume(); self.set_volume(current_volume); } fn handle_play_pause(&mut self) { match self.play_status { SpircPlayStatus::Paused { .. } | SpircPlayStatus::LoadingPause { .. } => { self.handle_play() } SpircPlayStatus::Playing { .. } | SpircPlayStatus::LoadingPlay { .. } => { self.handle_pause() } _ => (), } } fn handle_pause(&mut self) { match self.play_status { SpircPlayStatus::Playing { nominal_start_time, preloading_of_next_track_triggered, } => { self.player.pause(); let position_ms = (self.now_ms() - nominal_start_time) as u32; self.connect_state .update_position(position_ms, self.now_ms()); self.play_status = SpircPlayStatus::Paused { position_ms, preloading_of_next_track_triggered, }; } SpircPlayStatus::LoadingPlay { position_ms } => { self.player.pause(); self.play_status = SpircPlayStatus::LoadingPause { position_ms }; } _ => (), } } fn handle_seek(&mut self, position_ms: u32) { let duration = self.connect_state.player().duration; if i64::from(position_ms) > duration { warn!("tried to seek to {position_ms}ms of {duration}ms"); return; } self.connect_state .update_position(position_ms, self.now_ms()); self.player.seek(position_ms); let now = self.now_ms(); match self.play_status { SpircPlayStatus::Stopped => (), SpircPlayStatus::LoadingPause { position_ms: ref mut position, } | SpircPlayStatus::LoadingPlay { position_ms: ref mut position, } | SpircPlayStatus::Paused { position_ms: ref mut position, .. } => *position = position_ms, SpircPlayStatus::Playing { ref mut nominal_start_time, .. } => *nominal_start_time = now - position_ms as i64, }; } fn handle_shuffle(&mut self, shuffle: bool) -> Result<(), Error> { self.player.emit_shuffle_changed_event(shuffle); self.connect_state.handle_shuffle(shuffle) } fn handle_repeat_context(&mut self, repeat: bool) -> Result<(), Error> { self.player .emit_repeat_changed_event(repeat, self.connect_state.repeat_track()); self.connect_state.handle_set_repeat_context(repeat) } fn handle_repeat_track(&mut self, repeat: bool) { self.player .emit_repeat_changed_event(self.connect_state.repeat_context(), repeat); self.connect_state.set_repeat_track(repeat); } fn handle_preload_next_track(&mut self) { // Requests the player thread to preload the next track match self.play_status { SpircPlayStatus::Paused { ref mut preloading_of_next_track_triggered, .. } | SpircPlayStatus::Playing { ref mut preloading_of_next_track_triggered, .. } => { *preloading_of_next_track_triggered = true; } _ => (), } if let Some(track_id) = self.connect_state.preview_next_track() { self.player.preload(track_id); } } // Mark unavailable tracks so we can skip them later fn handle_unavailable(&mut self, track_id: SpotifyId) -> Result<(), Error> { self.connect_state.mark_unavailable(track_id)?; self.handle_preload_next_track(); Ok(()) } fn add_autoplay_resolving_when_required(&mut self) { let require_load_new = !self .connect_state .has_next_tracks(Some(CONTEXT_FETCH_THRESHOLD)) && self.session.autoplay(); if !require_load_new { return; } let current_context = self.connect_state.context_uri(); let fallback = self.connect_state.current_track(|t| &t.uri); let has_tracks = self .connect_state .get_context(ContextType::Autoplay) .map(|c| !c.tracks.is_empty()) .unwrap_or_default(); let resolve = ResolveContext::from_uri( current_context, fallback, ContextType::Autoplay, if has_tracks { ContextAction::Append } else { ContextAction::Replace }, ); self.context_resolver.add(resolve); } fn handle_next(&mut self, track_uri: Option) -> Result<(), Error> { let continue_playing = self.connect_state.is_playing(); let current_uri = self.connect_state.current_track(|t| &t.uri); let mut has_next_track = matches!(track_uri, Some(ref track_uri) if current_uri == track_uri); if !has_next_track { has_next_track = loop { let index = self.connect_state.next_track()?; let current_uri = self.connect_state.current_track(|t| &t.uri); if matches!(track_uri, Some(ref track_uri) if current_uri != track_uri) { continue; } else { break index.is_some(); } }; }; if has_next_track { self.add_autoplay_resolving_when_required(); self.load_track(continue_playing, 0) } else { info!("Not playing next track because there are no more tracks left in queue."); self.handle_stop(); Ok(()) } } fn handle_prev(&mut self) -> Result<(), Error> { // Previous behaves differently based on the position // Under 3s it goes to the previous song (starts playing) // Over 3s it seeks to zero (retains previous play status) if self.position() < 3000 { let repeat_context = self.connect_state.repeat_context(); match self.connect_state.prev_track()? { None if repeat_context => self.connect_state.reset_playback_to_position(None)?, None => { self.connect_state.reset_playback_to_position(None)?; self.handle_stop() } Some(_) => self.load_track(self.connect_state.is_playing(), 0)?, } } else { self.handle_seek(0); } Ok(()) } fn handle_volume_up(&mut self) { let volume = (self.connect_state.device_info().volume as u16) .saturating_add(self.connect_state.volume_step_size); self.set_volume(volume); } fn handle_volume_down(&mut self) { let volume = (self.connect_state.device_info().volume as u16) .saturating_sub(self.connect_state.volume_step_size); self.set_volume(volume); } fn handle_playlist_modification( &mut self, playlist_modification_info: PlaylistModificationInfo, ) -> Result<(), Error> { let uri = playlist_modification_info .uri .ok_or(SpircError::NoUri("playlist modification"))?; let uri = String::from_utf8(uri)?; if self.connect_state.context_uri() != &uri { debug!("ignoring playlist modification update for playlist <{uri}>, because it isn't the current context"); return Ok(()); } debug!("playlist modification for current context: {uri}"); self.context_resolver.add(ResolveContext::from_uri( uri, self.connect_state.current_track(|t| &t.uri), ContextType::Default, ContextAction::Replace, )); Ok(()) } fn handle_session_update(&mut self, mut session_update: SessionUpdate) { let reason = session_update.reason.enum_value(); let mut session = match session_update.session.take() { None => return, Some(session) => session, }; let active_device = session.host_active_device_id.take(); if matches!(active_device, Some(ref device) if device == self.session.device_id()) { info!( "session update: <{:?}> for self, current session_id {}, new session_id {}", reason, self.session.session_id(), session.session_id ); if self.session.session_id() != session.session_id { self.session.set_session_id(&session.session_id); self.connect_state.set_session_id(session.session_id); } } else { debug!("session update: <{reason:?}> from active session host: <{active_device:?}>"); } // this seems to be used for jams or handling the current session_id // // handling this event was intended to keep the playback when other clients (primarily // mobile) connects, otherwise they would steel the current playback when there was no // session_id provided on the initial PutStateReason::NEW_DEVICE state update // // by generating an initial session_id from the get-go we prevent that behavior and // currently don't need to handle this event, might still be useful for later "jam" support } fn position(&mut self) -> u32 { match self.play_status { SpircPlayStatus::Stopped => 0, SpircPlayStatus::LoadingPlay { position_ms } | SpircPlayStatus::LoadingPause { position_ms } | SpircPlayStatus::Paused { position_ms, .. } => position_ms, SpircPlayStatus::Playing { nominal_start_time, .. } => (self.now_ms() - nominal_start_time) as u32, } } fn load_track(&mut self, start_playing: bool, position_ms: u32) -> Result<(), Error> { if self.connect_state.current_track(MessageField::is_none) { debug!("current track is none, stopping playback"); self.handle_stop(); return Ok(()); } let current_uri = self.connect_state.current_track(|t| &t.uri); let id = SpotifyId::from_uri(current_uri)?; self.player.load(id, start_playing, position_ms); self.connect_state .update_position(position_ms, self.now_ms()); if start_playing { self.play_status = SpircPlayStatus::LoadingPlay { position_ms }; } else { self.play_status = SpircPlayStatus::LoadingPause { position_ms }; } self.connect_state.set_status(&self.play_status); Ok(()) } async fn notify(&mut self) -> Result<(), Error> { self.connect_state.set_status(&self.play_status); if self.connect_state.is_playing() { self.connect_state .update_position_in_relation(self.now_ms()); } self.connect_state.set_now(self.now_ms() as u64); self.connect_state .send_state(&self.session) .await .map(|_| ()) } fn set_volume(&mut self, volume: u16) { debug!("SpircTask::set_volume({})", volume); let old_volume = self.connect_state.device_info().volume; let new_volume = volume as u32; if old_volume != new_volume || self.mixer.volume() != volume { self.update_volume = true; self.connect_state.set_volume(new_volume); self.mixer.set_volume(volume); if let Some(cache) = self.session.cache() { cache.save_volume(volume) } if self.connect_state.is_active() { self.player.emit_volume_changed_event(volume); } } } } impl Drop for SpircTask { fn drop(&mut self) { debug!("drop Spirc[{}]", self.spirc_id); } }