mirror of
https://github.com/librespot-org/librespot.git
synced 2025-10-05 02:39:53 +02:00
Dealer: Rework context retrieval (#1414)
* connect: simplify `handle_command` for SpircCommand * connect: simplify `handle_player_event` * connect: `handle_player_event` update log entries * connect: set `playback_speed` according to player state * connect: reduce/group state updates by delaying them slightly * connect: load entire context at once * connect: use is_playing from connect_state * connect: move `ResolveContext` in own file * connect: handle metadata correct * connect: resolve context rework - resolved contexts independently, by that we don't block our main loop - move resolve logic into own file - polish handling for play and transfer * connect: rework aftermath * general logging and comment fixups * connect: fix incorrect stopping * connect: always handle player seek event * connect: adjust behavior - rename `handle_context` to `handle_next_context` - disconnect should only pause the playback - find_next should not exceed queue length * fix typo and `find_next` * connect: fixes for context and transfer - fix context_metadata and restriction incorrect reset - do some state updates earlier - add more logging * revert removal of state setup * `clear_next_tracks` should never clear queued items just mimics official client behavior * connect: make `playing_track` optional and handle it correctly * connect: adjust finish of context resolving * connect: set track position when shuffling * example: adjust to model change * connect: remove duplicate track * connect: provide all recently played tracks to autoplay request - removes previously added workaround * connect: apply review suggestions - use drain instead of for with pop - use for instead of loop - use or_else instead of match - use Self::Error instead of the value - free memory for metadata and restrictions * connect: impl trait for player context * connect: fix incorrect playing and paused * connect: apply options as official clients * protocol: move trait impls into impl_trait mod
This commit is contained in:
parent
c288cf7106
commit
f3bb380851
18 changed files with 1004 additions and 734 deletions
347
connect/src/context_resolver.rs
Normal file
347
connect/src/context_resolver.rs
Normal file
|
@ -0,0 +1,347 @@
|
|||
use crate::{
|
||||
core::{Error, Session},
|
||||
protocol::{
|
||||
autoplay_context_request::AutoplayContextRequest, context::Context,
|
||||
transfer_state::TransferState,
|
||||
},
|
||||
state::{
|
||||
context::{ContextType, UpdateContext},
|
||||
ConnectState,
|
||||
},
|
||||
};
|
||||
use std::cmp::PartialEq;
|
||||
use std::{
|
||||
collections::{HashMap, VecDeque},
|
||||
fmt::{Display, Formatter},
|
||||
hash::Hash,
|
||||
time::Duration,
|
||||
};
|
||||
use thiserror::Error as ThisError;
|
||||
use tokio::time::Instant;
|
||||
|
||||
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
|
||||
enum Resolve {
|
||||
Uri(String),
|
||||
Context(Context),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
|
||||
pub(super) enum ContextAction {
|
||||
Append,
|
||||
Replace,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
|
||||
pub(super) struct ResolveContext {
|
||||
resolve: Resolve,
|
||||
fallback: Option<String>,
|
||||
update: UpdateContext,
|
||||
action: ContextAction,
|
||||
}
|
||||
|
||||
impl ResolveContext {
|
||||
fn append_context(uri: impl Into<String>) -> Self {
|
||||
Self {
|
||||
resolve: Resolve::Uri(uri.into()),
|
||||
fallback: None,
|
||||
update: UpdateContext::Default,
|
||||
action: ContextAction::Append,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_uri(
|
||||
uri: impl Into<String>,
|
||||
fallback: impl Into<String>,
|
||||
update: UpdateContext,
|
||||
action: ContextAction,
|
||||
) -> Self {
|
||||
let fallback_uri = fallback.into();
|
||||
Self {
|
||||
resolve: Resolve::Uri(uri.into()),
|
||||
fallback: (!fallback_uri.is_empty()).then_some(fallback_uri),
|
||||
update,
|
||||
action,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_context(context: Context, update: UpdateContext, action: ContextAction) -> Self {
|
||||
Self {
|
||||
resolve: Resolve::Context(context),
|
||||
fallback: None,
|
||||
update,
|
||||
action,
|
||||
}
|
||||
}
|
||||
|
||||
/// the uri which should be used to resolve the context, might not be the context uri
|
||||
fn resolve_uri(&self) -> Option<&str> {
|
||||
// it's important to call this always, or at least for every ResolveContext
|
||||
// otherwise we might not even check if we need to fallback and just use the fallback uri
|
||||
match self.resolve {
|
||||
Resolve::Uri(ref uri) => ConnectState::valid_resolve_uri(uri),
|
||||
Resolve::Context(ref ctx) => ConnectState::get_context_uri_from_context(ctx),
|
||||
}
|
||||
.or(self.fallback.as_deref())
|
||||
}
|
||||
|
||||
/// the actual context uri
|
||||
fn context_uri(&self) -> &str {
|
||||
match self.resolve {
|
||||
Resolve::Uri(ref uri) => uri,
|
||||
Resolve::Context(ref ctx) => ctx.uri.as_deref().unwrap_or_default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for ResolveContext {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"resolve_uri: <{:?}>, context_uri: <{}>, update: <{:?}>",
|
||||
self.resolve_uri(),
|
||||
self.context_uri(),
|
||||
self.update,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
enum ContextResolverError {
|
||||
#[error("no next context to resolve")]
|
||||
NoNext,
|
||||
#[error("tried appending context with {0} pages")]
|
||||
UnexpectedPagesSize(usize),
|
||||
#[error("tried resolving not allowed context: {0:?}")]
|
||||
NotAllowedContext(String),
|
||||
}
|
||||
|
||||
impl From<ContextResolverError> for Error {
|
||||
fn from(value: ContextResolverError) -> Self {
|
||||
Error::failed_precondition(value)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ContextResolver {
|
||||
session: Session,
|
||||
queue: VecDeque<ResolveContext>,
|
||||
unavailable_contexts: HashMap<ResolveContext, Instant>,
|
||||
}
|
||||
|
||||
// time after which an unavailable context is retried
|
||||
const RETRY_UNAVAILABLE: Duration = Duration::from_secs(3600);
|
||||
|
||||
impl ContextResolver {
|
||||
pub fn new(session: Session) -> Self {
|
||||
Self {
|
||||
session,
|
||||
queue: VecDeque::new(),
|
||||
unavailable_contexts: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add(&mut self, resolve: ResolveContext) {
|
||||
let last_try = self
|
||||
.unavailable_contexts
|
||||
.get(&resolve)
|
||||
.map(|i| i.duration_since(Instant::now()));
|
||||
|
||||
let last_try = if matches!(last_try, Some(last_try) if last_try > RETRY_UNAVAILABLE) {
|
||||
let _ = self.unavailable_contexts.remove(&resolve);
|
||||
debug!(
|
||||
"context was requested {}s ago, trying again to resolve the requested context",
|
||||
last_try.expect("checked by condition").as_secs()
|
||||
);
|
||||
None
|
||||
} else {
|
||||
last_try
|
||||
};
|
||||
|
||||
if last_try.is_some() {
|
||||
debug!("tried loading unavailable context: {resolve}");
|
||||
return;
|
||||
} else if self.queue.contains(&resolve) {
|
||||
debug!("update for {resolve} is already added");
|
||||
return;
|
||||
} else {
|
||||
trace!(
|
||||
"added {} to resolver queue",
|
||||
resolve.resolve_uri().unwrap_or(resolve.context_uri())
|
||||
)
|
||||
}
|
||||
|
||||
self.queue.push_back(resolve)
|
||||
}
|
||||
|
||||
pub fn add_list(&mut self, resolve: Vec<ResolveContext>) {
|
||||
for resolve in resolve {
|
||||
self.add(resolve)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remove_used_and_invalid(&mut self) {
|
||||
if let Some((_, _, remove)) = self.find_next() {
|
||||
let _ = self.queue.drain(0..remove); // remove invalid
|
||||
}
|
||||
self.queue.pop_front(); // remove used
|
||||
}
|
||||
|
||||
pub fn clear(&mut self) {
|
||||
self.queue = VecDeque::new()
|
||||
}
|
||||
|
||||
fn find_next(&self) -> Option<(&ResolveContext, &str, usize)> {
|
||||
for idx in 0..self.queue.len() {
|
||||
let next = self.queue.get(idx)?;
|
||||
match next.resolve_uri() {
|
||||
None => {
|
||||
warn!("skipped {idx} because of invalid resolve_uri: {next}");
|
||||
continue;
|
||||
}
|
||||
Some(uri) => return Some((next, uri, idx)),
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub fn has_next(&self) -> bool {
|
||||
self.find_next().is_some()
|
||||
}
|
||||
|
||||
pub async fn get_next_context(
|
||||
&self,
|
||||
recent_track_uri: impl Fn() -> Vec<String>,
|
||||
) -> Result<Context, Error> {
|
||||
let (next, resolve_uri, _) = self.find_next().ok_or(ContextResolverError::NoNext)?;
|
||||
|
||||
match next.update {
|
||||
UpdateContext::Default => {
|
||||
let mut ctx = self.session.spclient().get_context(resolve_uri).await;
|
||||
if let Ok(ctx) = ctx.as_mut() {
|
||||
ctx.uri = Some(next.context_uri().to_string());
|
||||
ctx.url = ctx.uri.as_ref().map(|s| format!("context://{s}"));
|
||||
}
|
||||
|
||||
ctx
|
||||
}
|
||||
UpdateContext::Autoplay => {
|
||||
if resolve_uri.contains("spotify:show:") || resolve_uri.contains("spotify:episode:")
|
||||
{
|
||||
// autoplay is not supported for podcasts
|
||||
Err(ContextResolverError::NotAllowedContext(
|
||||
resolve_uri.to_string(),
|
||||
))?
|
||||
}
|
||||
|
||||
let request = AutoplayContextRequest {
|
||||
context_uri: Some(resolve_uri.to_string()),
|
||||
recent_track_uri: recent_track_uri(),
|
||||
..Default::default()
|
||||
};
|
||||
self.session.spclient().get_autoplay_context(&request).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn mark_next_unavailable(&mut self) {
|
||||
if let Some((next, _, _)) = self.find_next() {
|
||||
self.unavailable_contexts
|
||||
.insert(next.clone(), Instant::now());
|
||||
}
|
||||
}
|
||||
|
||||
pub fn apply_next_context(
|
||||
&self,
|
||||
state: &mut ConnectState,
|
||||
mut context: Context,
|
||||
) -> Result<Option<Vec<ResolveContext>>, Error> {
|
||||
let (next, _, _) = self.find_next().ok_or(ContextResolverError::NoNext)?;
|
||||
|
||||
let remaining = match next.action {
|
||||
ContextAction::Append if context.pages.len() == 1 => state
|
||||
.fill_context_from_page(context.pages.remove(0))
|
||||
.map(|_| None),
|
||||
ContextAction::Replace => {
|
||||
let remaining = state.update_context(context, next.update);
|
||||
if let Resolve::Context(ref ctx) = next.resolve {
|
||||
state.merge_context(Some(ctx.clone()));
|
||||
}
|
||||
|
||||
remaining
|
||||
}
|
||||
ContextAction::Append => {
|
||||
warn!("unexpected page size: {context:#?}");
|
||||
Err(ContextResolverError::UnexpectedPagesSize(context.pages.len()).into())
|
||||
}
|
||||
}?;
|
||||
|
||||
Ok(remaining.map(|remaining| {
|
||||
remaining
|
||||
.into_iter()
|
||||
.map(ResolveContext::append_context)
|
||||
.collect::<Vec<_>>()
|
||||
}))
|
||||
}
|
||||
|
||||
pub fn try_finish(
|
||||
&self,
|
||||
state: &mut ConnectState,
|
||||
transfer_state: &mut Option<TransferState>,
|
||||
) -> bool {
|
||||
let (next, _, _) = match self.find_next() {
|
||||
None => return false,
|
||||
Some(next) => next,
|
||||
};
|
||||
|
||||
// when there is only one update type, we are the last of our kind, so we should update the state
|
||||
if self
|
||||
.queue
|
||||
.iter()
|
||||
.filter(|resolve| resolve.update == next.update)
|
||||
.count()
|
||||
!= 1
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
match (next.update, state.active_context) {
|
||||
(UpdateContext::Default, ContextType::Default) | (UpdateContext::Autoplay, _) => {
|
||||
debug!(
|
||||
"last item of type <{:?}>, finishing state setup",
|
||||
next.update
|
||||
);
|
||||
}
|
||||
(UpdateContext::Default, _) => {
|
||||
debug!("skipped finishing default, because it isn't the active context");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
let active_ctx = state.get_context(state.active_context);
|
||||
let res = if let Some(transfer_state) = transfer_state.take() {
|
||||
state.finish_transfer(transfer_state)
|
||||
} else if state.shuffling_context() {
|
||||
state.shuffle()
|
||||
} else if matches!(active_ctx, Ok(ctx) if ctx.index.track == 0) {
|
||||
// has context, and context is not touched
|
||||
// when the index is not zero, the next index was already evaluated elsewhere
|
||||
let ctx = active_ctx.expect("checked by precondition");
|
||||
let idx = ConnectState::find_index_in_context(ctx, |t| {
|
||||
state.current_track(|c| t.uri == c.uri)
|
||||
})
|
||||
.ok();
|
||||
|
||||
state.reset_playback_to_position(idx)
|
||||
} else {
|
||||
state.fill_up_next_tracks()
|
||||
};
|
||||
|
||||
if let Err(why) = res {
|
||||
error!("setup of state failed: {why}, last used resolve {next:#?}")
|
||||
}
|
||||
|
||||
state.update_restrictions();
|
||||
state.update_queue_revision();
|
||||
|
||||
true
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue