1
0
Fork 0
mirror of https://github.com/librespot-org/librespot.git synced 2025-10-06 03:50:06 +02:00

Change panics into Result<_, librespot_core::Error>

This commit is contained in:
Roderick van Domburg 2021-12-26 21:18:42 +01:00
parent a297c68913
commit 62461be1fc
No known key found for this signature in database
GPG key ID: A9EF5222A26F0451
69 changed files with 2041 additions and 1331 deletions

View file

@ -1,8 +1,11 @@
use std::io;
use aes_ctr::cipher::generic_array::GenericArray;
use aes_ctr::cipher::{NewStreamCipher, SyncStreamCipher, SyncStreamCipherSeek};
use aes_ctr::Aes128Ctr;
use aes_ctr::{
cipher::{
generic_array::GenericArray, NewStreamCipher, SyncStreamCipher, SyncStreamCipherSeek,
},
Aes128Ctr,
};
use librespot_core::audio_key::AudioKey;

View file

@ -1,54 +1,57 @@
mod receive;
use std::cmp::{max, min};
use std::fs;
use std::io::{self, Read, Seek, SeekFrom};
use std::sync::atomic::{self, AtomicUsize};
use std::sync::{Arc, Condvar, Mutex};
use std::time::{Duration, Instant};
use std::{
cmp::{max, min},
fs,
io::{self, Read, Seek, SeekFrom},
sync::{
atomic::{self, AtomicUsize},
Arc, Condvar, Mutex,
},
time::{Duration, Instant},
};
use futures_util::future::IntoStream;
use futures_util::{StreamExt, TryFutureExt};
use hyper::client::ResponseFuture;
use hyper::header::CONTENT_RANGE;
use hyper::Body;
use futures_util::{future::IntoStream, StreamExt, TryFutureExt};
use hyper::{client::ResponseFuture, header::CONTENT_RANGE, Body, Response, StatusCode};
use tempfile::NamedTempFile;
use thiserror::Error;
use tokio::sync::{mpsc, oneshot};
use librespot_core::cdn_url::{CdnUrl, CdnUrlError};
use librespot_core::file_id::FileId;
use librespot_core::session::Session;
use librespot_core::spclient::SpClientError;
use librespot_core::{cdn_url::CdnUrl, Error, FileId, Session};
use self::receive::audio_file_fetch;
use crate::range_set::{Range, RangeSet};
pub type AudioFileResult = Result<(), AudioFileError>;
pub type AudioFileResult = Result<(), librespot_core::Error>;
#[derive(Error, Debug)]
pub enum AudioFileError {
#[error("could not complete CDN request: {0}")]
Cdn(#[from] hyper::Error),
#[error("channel was disconnected")]
#[error("other end of channel disconnected")]
Channel,
#[error("empty response")]
Empty,
#[error("I/O error: {0}")]
Io(#[from] io::Error),
#[error("output file unavailable")]
#[error("required header not found")]
Header,
#[error("streamer received no data")]
NoData,
#[error("no output available")]
Output,
#[error("error parsing response")]
Parsing,
#[error("mutex was poisoned")]
Poisoned,
#[error("could not complete API request: {0}")]
SpClient(#[from] SpClientError),
#[error("streamer did not report progress")]
Timeout,
#[error("could not get CDN URL: {0}")]
Url(#[from] CdnUrlError),
#[error("invalid status code {0}")]
StatusCode(StatusCode),
#[error("wait timeout exceeded")]
WaitTimeout,
}
impl From<AudioFileError> for Error {
fn from(err: AudioFileError) -> Self {
match err {
AudioFileError::Channel => Error::aborted(err),
AudioFileError::Header => Error::unavailable(err),
AudioFileError::NoData => Error::unavailable(err),
AudioFileError::Output => Error::aborted(err),
AudioFileError::StatusCode(_) => Error::failed_precondition(err),
AudioFileError::WaitTimeout => Error::deadline_exceeded(err),
}
}
}
/// The minimum size of a block that is requested from the Spotify servers in one request.
@ -124,7 +127,7 @@ pub enum AudioFile {
#[derive(Debug)]
pub struct StreamingRequest {
streamer: IntoStream<ResponseFuture>,
initial_body: Option<Body>,
initial_response: Option<Response<Body>>,
offset: usize,
length: usize,
request_time: Instant,
@ -154,12 +157,9 @@ impl StreamLoaderController {
self.file_size == 0
}
pub fn range_available(&self, range: Range) -> Result<bool, AudioFileError> {
pub fn range_available(&self, range: Range) -> bool {
let available = if let Some(ref shared) = self.stream_shared {
let download_status = shared
.download_status
.lock()
.map_err(|_| AudioFileError::Poisoned)?;
let download_status = shared.download_status.lock().unwrap();
range.length
<= download_status
@ -169,16 +169,16 @@ impl StreamLoaderController {
range.length <= self.len() - range.start
};
Ok(available)
available
}
pub fn range_to_end_available(&self) -> Result<bool, AudioFileError> {
pub fn range_to_end_available(&self) -> bool {
match self.stream_shared {
Some(ref shared) => {
let read_position = shared.read_position.load(atomic::Ordering::Relaxed);
self.range_available(Range::new(read_position, self.len() - read_position))
}
None => Ok(true),
None => true,
}
}
@ -190,7 +190,8 @@ impl StreamLoaderController {
fn send_stream_loader_command(&self, command: StreamLoaderCommand) {
if let Some(ref channel) = self.channel_tx {
// ignore the error in case the channel has been closed already.
// Ignore the error in case the channel has been closed already.
// This means that the file was completely downloaded.
let _ = channel.send(command);
}
}
@ -213,10 +214,7 @@ impl StreamLoaderController {
self.fetch(range);
if let Some(ref shared) = self.stream_shared {
let mut download_status = shared
.download_status
.lock()
.map_err(|_| AudioFileError::Poisoned)?;
let mut download_status = shared.download_status.lock().unwrap();
while range.length
> download_status
@ -226,7 +224,7 @@ impl StreamLoaderController {
download_status = shared
.cond
.wait_timeout(download_status, DOWNLOAD_TIMEOUT)
.map_err(|_| AudioFileError::Timeout)?
.map_err(|_| AudioFileError::WaitTimeout)?
.0;
if range.length
> (download_status
@ -319,7 +317,7 @@ impl AudioFile {
file_id: FileId,
bytes_per_second: usize,
play_from_beginning: bool,
) -> Result<AudioFile, AudioFileError> {
) -> Result<AudioFile, Error> {
if let Some(file) = session.cache().and_then(|cache| cache.file(file_id)) {
debug!("File {} already in cache", file_id);
return Ok(AudioFile::Cached(file));
@ -340,9 +338,14 @@ impl AudioFile {
let session_ = session.clone();
session.spawn(complete_rx.map_ok(move |mut file| {
if let Some(cache) = session_.cache() {
if cache.save_file(file_id, &mut file) {
debug!("File {} cached to {:?}", file_id, cache.file(file_id));
if let Some(cache_id) = cache.file(file_id) {
if let Err(e) = cache.save_file(file_id, &mut file) {
error!("Error caching file {} to {:?}: {}", file_id, cache_id, e);
} else {
debug!("File {} cached to {:?}", file_id, cache_id);
}
}
debug!("Downloading file {} complete", file_id);
}
}));
@ -350,7 +353,7 @@ impl AudioFile {
Ok(AudioFile::Streaming(streaming.await?))
}
pub fn get_stream_loader_controller(&self) -> Result<StreamLoaderController, AudioFileError> {
pub fn get_stream_loader_controller(&self) -> Result<StreamLoaderController, Error> {
let controller = match self {
AudioFile::Streaming(ref stream) => StreamLoaderController {
channel_tx: Some(stream.stream_loader_command_tx.clone()),
@ -379,7 +382,7 @@ impl AudioFileStreaming {
complete_tx: oneshot::Sender<NamedTempFile>,
bytes_per_second: usize,
play_from_beginning: bool,
) -> Result<AudioFileStreaming, AudioFileError> {
) -> Result<AudioFileStreaming, Error> {
let download_size = if play_from_beginning {
INITIAL_DOWNLOAD_SIZE
+ max(
@ -392,8 +395,8 @@ impl AudioFileStreaming {
INITIAL_DOWNLOAD_SIZE
};
let mut cdn_url = CdnUrl::new(file_id).resolve_audio(&session).await?;
let url = cdn_url.get_url()?;
let cdn_url = CdnUrl::new(file_id).resolve_audio(&session).await?;
let url = cdn_url.try_get_url()?;
trace!("Streaming {:?}", url);
@ -403,23 +406,19 @@ impl AudioFileStreaming {
// Get the first chunk with the headers to get the file size.
// The remainder of that chunk with possibly also a response body is then
// further processed in `audio_file_fetch`.
let response = match streamer.next().await {
Some(Ok(data)) => data,
Some(Err(e)) => return Err(AudioFileError::Cdn(e)),
None => return Err(AudioFileError::Empty),
};
let response = streamer.next().await.ok_or(AudioFileError::NoData)??;
let header_value = response
.headers()
.get(CONTENT_RANGE)
.ok_or(AudioFileError::Parsing)?;
let str_value = header_value.to_str().map_err(|_| AudioFileError::Parsing)?;
let file_size_str = str_value.split('/').last().ok_or(AudioFileError::Parsing)?;
let file_size = file_size_str.parse().map_err(|_| AudioFileError::Parsing)?;
.ok_or(AudioFileError::Header)?;
let str_value = header_value.to_str()?;
let file_size_str = str_value.split('/').last().unwrap_or_default();
let file_size = file_size_str.parse()?;
let initial_request = StreamingRequest {
streamer,
initial_body: Some(response.into_body()),
initial_response: Some(response),
offset: 0,
length: download_size,
request_time,
@ -474,12 +473,7 @@ impl Read for AudioFileStreaming {
let length = min(output.len(), self.shared.file_size - offset);
let length_to_request = match *(self
.shared
.download_strategy
.lock()
.map_err(|_| io::Error::new(io::ErrorKind::Other, "mutex was poisoned"))?)
{
let length_to_request = match *(self.shared.download_strategy.lock().unwrap()) {
DownloadStrategy::RandomAccess() => length,
DownloadStrategy::Streaming() => {
// Due to the read-ahead stuff, we potentially request more than the actual request demanded.
@ -503,42 +497,32 @@ impl Read for AudioFileStreaming {
let mut ranges_to_request = RangeSet::new();
ranges_to_request.add_range(&Range::new(offset, length_to_request));
let mut download_status = self
.shared
.download_status
.lock()
.map_err(|_| io::Error::new(io::ErrorKind::Other, "mutex was poisoned"))?;
let mut download_status = self.shared.download_status.lock().unwrap();
ranges_to_request.subtract_range_set(&download_status.downloaded);
ranges_to_request.subtract_range_set(&download_status.requested);
for &range in ranges_to_request.iter() {
self.stream_loader_command_tx
.send(StreamLoaderCommand::Fetch(range))
.map_err(|_| io::Error::new(io::ErrorKind::Other, "tx channel is disconnected"))?;
.map_err(|err| io::Error::new(io::ErrorKind::BrokenPipe, err))?;
}
if length == 0 {
return Ok(0);
}
let mut download_message_printed = false;
while !download_status.downloaded.contains(offset) {
if let DownloadStrategy::Streaming() = *self
.shared
.download_strategy
.lock()
.map_err(|_| io::Error::new(io::ErrorKind::Other, "mutex was poisoned"))?
{
if !download_message_printed {
debug!("Stream waiting for download of file position {}. Downloaded ranges: {}. Pending ranges: {}", offset, download_status.downloaded, download_status.requested.minus(&download_status.downloaded));
download_message_printed = true;
}
}
download_status = self
.shared
.cond
.wait_timeout(download_status, DOWNLOAD_TIMEOUT)
.map_err(|_| io::Error::new(io::ErrorKind::Other, "timeout acquiring mutex"))?
.map_err(|_| {
io::Error::new(
io::ErrorKind::TimedOut,
Error::deadline_exceeded(AudioFileError::WaitTimeout),
)
})?
.0;
}
let available_length = download_status
@ -551,15 +535,6 @@ impl Read for AudioFileStreaming {
let read_len = min(length, available_length);
let read_len = self.read_file.read(&mut output[..read_len])?;
if download_message_printed {
debug!(
"Read at postion {} completed. {} bytes returned, {} bytes were requested.",
offset,
read_len,
output.len()
);
}
self.position += read_len as u64;
self.shared
.read_position

View file

@ -1,25 +1,25 @@
use std::cmp::{max, min};
use std::io::{Seek, SeekFrom, Write};
use std::sync::{atomic, Arc};
use std::time::{Duration, Instant};
use std::{
cmp::{max, min},
io::{Seek, SeekFrom, Write},
sync::{atomic, Arc},
time::{Duration, Instant},
};
use atomic::Ordering;
use bytes::Bytes;
use futures_util::StreamExt;
use hyper::StatusCode;
use tempfile::NamedTempFile;
use tokio::sync::{mpsc, oneshot};
use librespot_core::session::Session;
use librespot_core::{session::Session, Error};
use crate::range_set::{Range, RangeSet};
use super::{
AudioFileError, AudioFileResult, AudioFileShared, DownloadStrategy, StreamLoaderCommand,
StreamingRequest,
};
use super::{
FAST_PREFETCH_THRESHOLD_FACTOR, MAXIMUM_ASSUMED_PING_TIME, MAX_PREFETCH_REQUESTS,
MINIMUM_DOWNLOAD_SIZE, PREFETCH_THRESHOLD_FACTOR,
StreamingRequest, FAST_PREFETCH_THRESHOLD_FACTOR, MAXIMUM_ASSUMED_PING_TIME,
MAX_PREFETCH_REQUESTS, MINIMUM_DOWNLOAD_SIZE, PREFETCH_THRESHOLD_FACTOR,
};
struct PartialFileData {
@ -49,19 +49,27 @@ async fn receive_data(
let mut measure_ping_time = old_number_of_request == 0;
let result = loop {
let body = match request.initial_body.take() {
let result: Result<_, Error> = loop {
let response = match request.initial_response.take() {
Some(data) => data,
None => match request.streamer.next().await {
Some(Ok(response)) => response.into_body(),
Some(Err(e)) => break Err(e),
Some(Ok(response)) => response,
Some(Err(e)) => break Err(e.into()),
None => break Ok(()),
},
};
let code = response.status();
let body = response.into_body();
if code != StatusCode::PARTIAL_CONTENT {
debug!("Streamer expected partial content but got: {}", code);
break Err(AudioFileError::StatusCode(code).into());
}
let data = match hyper::body::to_bytes(body).await {
Ok(bytes) => bytes,
Err(e) => break Err(e),
Err(e) => break Err(e.into()),
};
if measure_ping_time {
@ -69,16 +77,16 @@ async fn receive_data(
if duration > MAXIMUM_ASSUMED_PING_TIME {
duration = MAXIMUM_ASSUMED_PING_TIME;
}
let _ = file_data_tx.send(ReceivedData::ResponseTime(duration));
file_data_tx.send(ReceivedData::ResponseTime(duration))?;
measure_ping_time = false;
}
let data_size = data.len();
let _ = file_data_tx.send(ReceivedData::Data(PartialFileData {
file_data_tx.send(ReceivedData::Data(PartialFileData {
offset: data_offset,
data,
}));
}))?;
data_offset += data_size;
if request_length < data_size {
warn!(
@ -100,10 +108,8 @@ async fn receive_data(
if request_length > 0 {
let missing_range = Range::new(data_offset, request_length);
let mut download_status = shared
.download_status
.lock()
.map_err(|_| AudioFileError::Poisoned)?;
let mut download_status = shared.download_status.lock().unwrap();
download_status.requested.subtract_range(&missing_range);
shared.cond.notify_all();
}
@ -127,7 +133,7 @@ async fn receive_data(
"Error from streamer for range {} (+{}): {:?}",
requested_offset, requested_length, e
);
Err(e.into())
Err(e)
}
}
}
@ -150,14 +156,8 @@ enum ControlFlow {
}
impl AudioFileFetch {
fn get_download_strategy(&mut self) -> Result<DownloadStrategy, AudioFileError> {
let strategy = self
.shared
.download_strategy
.lock()
.map_err(|_| AudioFileError::Poisoned)?;
Ok(*(strategy))
fn get_download_strategy(&mut self) -> DownloadStrategy {
*(self.shared.download_strategy.lock().unwrap())
}
fn download_range(&mut self, offset: usize, mut length: usize) -> AudioFileResult {
@ -172,52 +172,34 @@ impl AudioFileFetch {
let mut ranges_to_request = RangeSet::new();
ranges_to_request.add_range(&Range::new(offset, length));
let mut download_status = self
.shared
.download_status
.lock()
.map_err(|_| AudioFileError::Poisoned)?;
let mut download_status = self.shared.download_status.lock().unwrap();
ranges_to_request.subtract_range_set(&download_status.downloaded);
ranges_to_request.subtract_range_set(&download_status.requested);
let cdn_url = &self.shared.cdn_url;
let file_id = cdn_url.file_id;
for range in ranges_to_request.iter() {
match cdn_url.urls.first() {
Some(url) => {
match self
.session
.spclient()
.stream_file(&url.0, range.start, range.length)
{
Ok(streamer) => {
download_status.requested.add_range(range);
let url = self.shared.cdn_url.try_get_url()?;
let streaming_request = StreamingRequest {
streamer,
initial_body: None,
offset: range.start,
length: range.length,
request_time: Instant::now(),
};
let streamer = self
.session
.spclient()
.stream_file(url, range.start, range.length)?;
self.session.spawn(receive_data(
self.shared.clone(),
self.file_data_tx.clone(),
streaming_request,
));
}
Err(e) => {
error!("Unable to open stream for track <{}>: {:?}", file_id, e);
}
}
}
None => {
error!("Unable to get CDN URL for track <{}>", file_id);
}
}
download_status.requested.add_range(range);
let streaming_request = StreamingRequest {
streamer,
initial_response: None,
offset: range.start,
length: range.length,
request_time: Instant::now(),
};
self.session.spawn(receive_data(
self.shared.clone(),
self.file_data_tx.clone(),
streaming_request,
));
}
Ok(())
@ -236,11 +218,8 @@ impl AudioFileFetch {
let mut missing_data = RangeSet::new();
missing_data.add_range(&Range::new(0, self.shared.file_size));
{
let download_status = self
.shared
.download_status
.lock()
.map_err(|_| AudioFileError::Poisoned)?;
let download_status = self.shared.download_status.lock().unwrap();
missing_data.subtract_range_set(&download_status.downloaded);
missing_data.subtract_range_set(&download_status.requested);
}
@ -277,7 +256,7 @@ impl AudioFileFetch {
Ok(())
}
fn handle_file_data(&mut self, data: ReceivedData) -> Result<ControlFlow, AudioFileError> {
fn handle_file_data(&mut self, data: ReceivedData) -> Result<ControlFlow, Error> {
match data {
ReceivedData::ResponseTime(response_time) => {
let old_ping_time_ms = self.shared.ping_time_ms.load(Ordering::Relaxed);
@ -324,14 +303,10 @@ impl AudioFileFetch {
output.seek(SeekFrom::Start(data.offset as u64))?;
output.write_all(data.data.as_ref())?;
}
None => return Err(AudioFileError::Output),
None => return Err(AudioFileError::Output.into()),
}
let mut download_status = self
.shared
.download_status
.lock()
.map_err(|_| AudioFileError::Poisoned)?;
let mut download_status = self.shared.download_status.lock().unwrap();
let received_range = Range::new(data.offset, data.data.len());
download_status.downloaded.add_range(&received_range);
@ -355,38 +330,38 @@ impl AudioFileFetch {
fn handle_stream_loader_command(
&mut self,
cmd: StreamLoaderCommand,
) -> Result<ControlFlow, AudioFileError> {
) -> Result<ControlFlow, Error> {
match cmd {
StreamLoaderCommand::Fetch(request) => {
self.download_range(request.start, request.length)?;
}
StreamLoaderCommand::RandomAccessMode() => {
*(self
.shared
.download_strategy
.lock()
.map_err(|_| AudioFileError::Poisoned)?) = DownloadStrategy::RandomAccess();
*(self.shared.download_strategy.lock().unwrap()) = DownloadStrategy::RandomAccess();
}
StreamLoaderCommand::StreamMode() => {
*(self
.shared
.download_strategy
.lock()
.map_err(|_| AudioFileError::Poisoned)?) = DownloadStrategy::Streaming();
*(self.shared.download_strategy.lock().unwrap()) = DownloadStrategy::Streaming();
}
StreamLoaderCommand::Close() => return Ok(ControlFlow::Break),
}
Ok(ControlFlow::Continue)
}
fn finish(&mut self) -> AudioFileResult {
let mut output = self.output.take().ok_or(AudioFileError::Output)?;
let complete_tx = self.complete_tx.take().ok_or(AudioFileError::Output)?;
let output = self.output.take();
output.seek(SeekFrom::Start(0))?;
complete_tx
.send(output)
.map_err(|_| AudioFileError::Channel)
let complete_tx = self.complete_tx.take();
if let Some(mut output) = output {
output.seek(SeekFrom::Start(0))?;
if let Some(complete_tx) = complete_tx {
complete_tx
.send(output)
.map_err(|_| AudioFileError::Channel)?;
}
}
Ok(())
}
}
@ -405,10 +380,8 @@ pub(super) async fn audio_file_fetch(
initial_request.offset,
initial_request.offset + initial_request.length,
);
let mut download_status = shared
.download_status
.lock()
.map_err(|_| AudioFileError::Poisoned)?;
let mut download_status = shared.download_status.lock().unwrap();
download_status.requested.add_range(&requested_range);
}
@ -452,18 +425,15 @@ pub(super) async fn audio_file_fetch(
}
}
if fetch.get_download_strategy()? == DownloadStrategy::Streaming() {
if fetch.get_download_strategy() == DownloadStrategy::Streaming() {
let number_of_open_requests =
fetch.shared.number_of_open_requests.load(Ordering::SeqCst);
if number_of_open_requests < MAX_PREFETCH_REQUESTS {
let max_requests_to_send = MAX_PREFETCH_REQUESTS - number_of_open_requests;
let bytes_pending: usize = {
let download_status = fetch
.shared
.download_status
.lock()
.map_err(|_| AudioFileError::Poisoned)?;
let download_status = fetch.shared.download_status.lock().unwrap();
download_status
.requested
.minus(&download_status.downloaded)

View file

@ -1,6 +1,8 @@
use std::cmp::{max, min};
use std::fmt;
use std::slice::Iter;
use std::{
cmp::{max, min},
fmt,
slice::Iter,
};
#[derive(Copy, Clone, Debug)]
pub struct Range {