mirror of
https://github.com/librespot-org/librespot.git
synced 2025-10-06 03:50:06 +02:00
Various loading improvements
- Improve responsiveness by downloading the smallest possible chunk size when seeking or first loading. - Improve download time and decrease CPU usage by downloading the largest possible chunk size as throughput allows, still allowing for reasonable seek responsiveness (~1 second). - As a result, take refactoring opportunities: simplify prefetching logic, download threading, command sending, and some ergonomics. - Fix disappearing controls in the Spotify mobile UI while loading. - Fix handling of seek, pause, and play commands while loading. - Fix download rate calculation (don't use the Mercury rate). - Fix ping time calculation under lock contention.
This commit is contained in:
parent
cce1b966cb
commit
eb1472c713
7 changed files with 305 additions and 279 deletions
|
@ -1,7 +1,7 @@
|
|||
use std::{
|
||||
cmp::{max, min},
|
||||
io::{Seek, SeekFrom, Write},
|
||||
sync::{atomic::Ordering, Arc},
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
|
@ -17,8 +17,8 @@ use crate::range_set::{Range, RangeSet};
|
|||
|
||||
use super::{
|
||||
AudioFileError, AudioFileResult, AudioFileShared, StreamLoaderCommand, StreamingRequest,
|
||||
FAST_PREFETCH_THRESHOLD_FACTOR, MAXIMUM_ASSUMED_PING_TIME, MAX_PREFETCH_REQUESTS,
|
||||
MINIMUM_DOWNLOAD_SIZE, PREFETCH_THRESHOLD_FACTOR,
|
||||
MAXIMUM_ASSUMED_PING_TIME, MINIMUM_DOWNLOAD_SIZE, MINIMUM_THROUGHPUT,
|
||||
PREFETCH_THRESHOLD_FACTOR,
|
||||
};
|
||||
|
||||
struct PartialFileData {
|
||||
|
@ -27,10 +27,13 @@ struct PartialFileData {
|
|||
}
|
||||
|
||||
enum ReceivedData {
|
||||
Throughput(usize),
|
||||
ResponseTime(Duration),
|
||||
Data(PartialFileData),
|
||||
}
|
||||
|
||||
const ONE_SECOND: Duration = Duration::from_secs(1);
|
||||
|
||||
async fn receive_data(
|
||||
shared: Arc<AudioFileShared>,
|
||||
file_data_tx: mpsc::UnboundedSender<ReceivedData>,
|
||||
|
@ -39,15 +42,21 @@ async fn receive_data(
|
|||
let mut offset = request.offset;
|
||||
let mut actual_length = 0;
|
||||
|
||||
let old_number_of_request = shared
|
||||
.number_of_open_requests
|
||||
.fetch_add(1, Ordering::SeqCst);
|
||||
let permit = shared.download_slots.acquire().await?;
|
||||
|
||||
let mut measure_ping_time = old_number_of_request == 0;
|
||||
let request_time = Instant::now();
|
||||
let mut measure_ping_time = true;
|
||||
let mut measure_throughput = true;
|
||||
|
||||
let result: Result<_, Error> = loop {
|
||||
let response = match request.initial_response.take() {
|
||||
Some(data) => data,
|
||||
Some(data) => {
|
||||
// the request was already made outside of this function
|
||||
measure_ping_time = false;
|
||||
measure_throughput = false;
|
||||
|
||||
data
|
||||
}
|
||||
None => match request.streamer.next().await {
|
||||
Some(Ok(response)) => response,
|
||||
Some(Err(e)) => break Err(e.into()),
|
||||
|
@ -62,6 +71,15 @@ async fn receive_data(
|
|||
},
|
||||
};
|
||||
|
||||
if measure_ping_time {
|
||||
let duration = Instant::now().duration_since(request_time);
|
||||
// may be zero if we are handling an initial response
|
||||
if duration.as_millis() > 0 {
|
||||
file_data_tx.send(ReceivedData::ResponseTime(duration))?;
|
||||
measure_ping_time = false;
|
||||
}
|
||||
}
|
||||
|
||||
let code = response.status();
|
||||
if code != StatusCode::PARTIAL_CONTENT {
|
||||
if code == StatusCode::TOO_MANY_REQUESTS {
|
||||
|
@ -90,24 +108,18 @@ async fn receive_data(
|
|||
|
||||
actual_length += data_size;
|
||||
offset += data_size;
|
||||
|
||||
if measure_ping_time {
|
||||
let mut duration = Instant::now() - request.request_time;
|
||||
if duration > MAXIMUM_ASSUMED_PING_TIME {
|
||||
warn!(
|
||||
"Ping time {} ms exceeds maximum {}, setting to maximum",
|
||||
duration.as_millis(),
|
||||
MAXIMUM_ASSUMED_PING_TIME.as_millis()
|
||||
);
|
||||
duration = MAXIMUM_ASSUMED_PING_TIME;
|
||||
}
|
||||
file_data_tx.send(ReceivedData::ResponseTime(duration))?;
|
||||
measure_ping_time = false;
|
||||
}
|
||||
};
|
||||
|
||||
drop(request.streamer);
|
||||
|
||||
if measure_throughput {
|
||||
let duration = Instant::now().duration_since(request_time).as_millis();
|
||||
if actual_length > 0 && duration > 0 {
|
||||
let throughput = ONE_SECOND.as_millis() as usize * actual_length / duration as usize;
|
||||
file_data_tx.send(ReceivedData::Throughput(throughput))?;
|
||||
}
|
||||
}
|
||||
|
||||
let bytes_remaining = request.length - actual_length;
|
||||
if bytes_remaining > 0 {
|
||||
{
|
||||
|
@ -118,9 +130,7 @@ async fn receive_data(
|
|||
}
|
||||
}
|
||||
|
||||
shared
|
||||
.number_of_open_requests
|
||||
.fetch_sub(1, Ordering::SeqCst);
|
||||
drop(permit);
|
||||
|
||||
if let Err(e) = result {
|
||||
error!(
|
||||
|
@ -151,8 +161,8 @@ enum ControlFlow {
|
|||
}
|
||||
|
||||
impl AudioFileFetch {
|
||||
fn is_download_streaming(&self) -> bool {
|
||||
self.shared.download_streaming.load(Ordering::Acquire)
|
||||
fn has_download_slots_available(&self) -> bool {
|
||||
self.shared.download_slots.available_permits() > 0
|
||||
}
|
||||
|
||||
fn download_range(&mut self, offset: usize, mut length: usize) -> AudioFileResult {
|
||||
|
@ -160,10 +170,17 @@ impl AudioFileFetch {
|
|||
length = MINIMUM_DOWNLOAD_SIZE;
|
||||
}
|
||||
|
||||
// If we are in streaming mode (so not seeking) then start downloading as large
|
||||
// of chunks as possible for better throughput and improved CPU usage, while
|
||||
// still being reasonably responsive (~1 second) in case we want to seek.
|
||||
if self.shared.is_download_streaming() {
|
||||
let throughput = self.shared.throughput();
|
||||
length = max(length, throughput);
|
||||
}
|
||||
|
||||
if offset + length > self.shared.file_size {
|
||||
length = self.shared.file_size - offset;
|
||||
}
|
||||
|
||||
let mut ranges_to_request = RangeSet::new();
|
||||
ranges_to_request.add_range(&Range::new(offset, length));
|
||||
|
||||
|
@ -191,7 +208,6 @@ impl AudioFileFetch {
|
|||
initial_response: None,
|
||||
offset: range.start,
|
||||
length: range.length,
|
||||
request_time: Instant::now(),
|
||||
};
|
||||
|
||||
self.session.spawn(receive_data(
|
||||
|
@ -204,51 +220,36 @@ impl AudioFileFetch {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn pre_fetch_more_data(
|
||||
&mut self,
|
||||
bytes: usize,
|
||||
max_requests_to_send: usize,
|
||||
) -> AudioFileResult {
|
||||
let mut bytes_to_go = bytes;
|
||||
let mut requests_to_go = max_requests_to_send;
|
||||
fn pre_fetch_more_data(&mut self, bytes: usize) -> AudioFileResult {
|
||||
// determine what is still missing
|
||||
let mut missing_data = RangeSet::new();
|
||||
missing_data.add_range(&Range::new(0, self.shared.file_size));
|
||||
{
|
||||
let download_status = self.shared.download_status.lock();
|
||||
missing_data.subtract_range_set(&download_status.downloaded);
|
||||
missing_data.subtract_range_set(&download_status.requested);
|
||||
}
|
||||
|
||||
while bytes_to_go > 0 && requests_to_go > 0 {
|
||||
// determine what is still missing
|
||||
let mut missing_data = RangeSet::new();
|
||||
missing_data.add_range(&Range::new(0, self.shared.file_size));
|
||||
{
|
||||
let download_status = self.shared.download_status.lock();
|
||||
missing_data.subtract_range_set(&download_status.downloaded);
|
||||
missing_data.subtract_range_set(&download_status.requested);
|
||||
}
|
||||
// download data from after the current read position first
|
||||
let mut tail_end = RangeSet::new();
|
||||
let read_position = self.shared.read_position();
|
||||
tail_end.add_range(&Range::new(
|
||||
read_position,
|
||||
self.shared.file_size - read_position,
|
||||
));
|
||||
let tail_end = tail_end.intersection(&missing_data);
|
||||
|
||||
// download data from after the current read position first
|
||||
let mut tail_end = RangeSet::new();
|
||||
let read_position = self.shared.read_position.load(Ordering::Acquire);
|
||||
tail_end.add_range(&Range::new(
|
||||
read_position,
|
||||
self.shared.file_size - read_position,
|
||||
));
|
||||
let tail_end = tail_end.intersection(&missing_data);
|
||||
|
||||
if !tail_end.is_empty() {
|
||||
let range = tail_end.get_range(0);
|
||||
let offset = range.start;
|
||||
let length = min(range.length, bytes_to_go);
|
||||
self.download_range(offset, length)?;
|
||||
requests_to_go -= 1;
|
||||
bytes_to_go -= length;
|
||||
} else if !missing_data.is_empty() {
|
||||
// ok, the tail is downloaded, download something fom the beginning.
|
||||
let range = missing_data.get_range(0);
|
||||
let offset = range.start;
|
||||
let length = min(range.length, bytes_to_go);
|
||||
self.download_range(offset, length)?;
|
||||
requests_to_go -= 1;
|
||||
bytes_to_go -= length;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
if !tail_end.is_empty() {
|
||||
let range = tail_end.get_range(0);
|
||||
let offset = range.start;
|
||||
let length = min(range.length, bytes);
|
||||
self.download_range(offset, length)?;
|
||||
} else if !missing_data.is_empty() {
|
||||
// ok, the tail is downloaded, download something fom the beginning.
|
||||
let range = missing_data.get_range(0);
|
||||
let offset = range.start;
|
||||
let length = min(range.length, bytes);
|
||||
self.download_range(offset, length)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
@ -256,8 +257,46 @@ impl AudioFileFetch {
|
|||
|
||||
fn handle_file_data(&mut self, data: ReceivedData) -> Result<ControlFlow, Error> {
|
||||
match data {
|
||||
ReceivedData::ResponseTime(response_time) => {
|
||||
let old_ping_time_ms = self.shared.ping_time_ms.load(Ordering::Relaxed);
|
||||
ReceivedData::Throughput(mut throughput) => {
|
||||
if throughput < MINIMUM_THROUGHPUT {
|
||||
warn!(
|
||||
"Throughput {} kbps lower than minimum {}, setting to minimum",
|
||||
throughput / 1000,
|
||||
MINIMUM_THROUGHPUT / 1000,
|
||||
);
|
||||
throughput = MINIMUM_THROUGHPUT;
|
||||
}
|
||||
|
||||
let old_throughput = self.shared.throughput();
|
||||
let avg_throughput = if old_throughput > 0 {
|
||||
(old_throughput + throughput) / 2
|
||||
} else {
|
||||
throughput
|
||||
};
|
||||
|
||||
// print when the new estimate deviates by more than 10% from the last
|
||||
if f32::abs((avg_throughput as f32 - old_throughput as f32) / old_throughput as f32)
|
||||
> 0.1
|
||||
{
|
||||
trace!(
|
||||
"Throughput now estimated as: {} kbps",
|
||||
avg_throughput / 1000
|
||||
);
|
||||
}
|
||||
|
||||
self.shared.set_throughput(avg_throughput);
|
||||
}
|
||||
ReceivedData::ResponseTime(mut response_time) => {
|
||||
if response_time > MAXIMUM_ASSUMED_PING_TIME {
|
||||
warn!(
|
||||
"Time to first byte {} ms exceeds maximum {}, setting to maximum",
|
||||
response_time.as_millis(),
|
||||
MAXIMUM_ASSUMED_PING_TIME.as_millis()
|
||||
);
|
||||
response_time = MAXIMUM_ASSUMED_PING_TIME;
|
||||
}
|
||||
|
||||
let old_ping_time_ms = self.shared.ping_time().as_millis();
|
||||
|
||||
// prune old response times. Keep at most two so we can push a third.
|
||||
while self.network_response_times.len() >= 3 {
|
||||
|
@ -268,8 +307,8 @@ impl AudioFileFetch {
|
|||
self.network_response_times.push(response_time);
|
||||
|
||||
// stats::median is experimental. So we calculate the median of up to three ourselves.
|
||||
let ping_time_ms = {
|
||||
let response_time = match self.network_response_times.len() {
|
||||
let ping_time = {
|
||||
match self.network_response_times.len() {
|
||||
1 => self.network_response_times[0],
|
||||
2 => (self.network_response_times[0] + self.network_response_times[1]) / 2,
|
||||
3 => {
|
||||
|
@ -278,22 +317,23 @@ impl AudioFileFetch {
|
|||
times[1]
|
||||
}
|
||||
_ => unreachable!(),
|
||||
};
|
||||
response_time.as_millis() as usize
|
||||
}
|
||||
};
|
||||
|
||||
// print when the new estimate deviates by more than 10% from the last
|
||||
if f32::abs(
|
||||
(ping_time_ms as f32 - old_ping_time_ms as f32) / old_ping_time_ms as f32,
|
||||
(ping_time.as_millis() as f32 - old_ping_time_ms as f32)
|
||||
/ old_ping_time_ms as f32,
|
||||
) > 0.1
|
||||
{
|
||||
debug!("Ping time now estimated as: {} ms", ping_time_ms);
|
||||
trace!(
|
||||
"Time to first byte now estimated as: {} ms",
|
||||
ping_time.as_millis()
|
||||
);
|
||||
}
|
||||
|
||||
// store our new estimate for everyone to see
|
||||
self.shared
|
||||
.ping_time_ms
|
||||
.store(ping_time_ms, Ordering::Relaxed);
|
||||
self.shared.set_ping_time(ping_time);
|
||||
}
|
||||
ReceivedData::Data(data) => {
|
||||
match self.output.as_mut() {
|
||||
|
@ -333,14 +373,6 @@ impl AudioFileFetch {
|
|||
StreamLoaderCommand::Fetch(request) => {
|
||||
self.download_range(request.start, request.length)?
|
||||
}
|
||||
StreamLoaderCommand::RandomAccessMode => self
|
||||
.shared
|
||||
.download_streaming
|
||||
.store(false, Ordering::Release),
|
||||
StreamLoaderCommand::StreamMode => self
|
||||
.shared
|
||||
.download_streaming
|
||||
.store(true, Ordering::Release),
|
||||
StreamLoaderCommand::Close => return Ok(ControlFlow::Break),
|
||||
}
|
||||
|
||||
|
@ -426,40 +458,28 @@ pub(super) async fn audio_file_fetch(
|
|||
else => (),
|
||||
}
|
||||
|
||||
if fetch.is_download_streaming() {
|
||||
let number_of_open_requests =
|
||||
fetch.shared.number_of_open_requests.load(Ordering::SeqCst);
|
||||
if number_of_open_requests < MAX_PREFETCH_REQUESTS {
|
||||
let max_requests_to_send = MAX_PREFETCH_REQUESTS - number_of_open_requests;
|
||||
if fetch.shared.is_download_streaming() && fetch.has_download_slots_available() {
|
||||
let bytes_pending: usize = {
|
||||
let download_status = fetch.shared.download_status.lock();
|
||||
|
||||
let bytes_pending: usize = {
|
||||
let download_status = fetch.shared.download_status.lock();
|
||||
download_status
|
||||
.requested
|
||||
.minus(&download_status.downloaded)
|
||||
.len()
|
||||
};
|
||||
|
||||
download_status
|
||||
.requested
|
||||
.minus(&download_status.downloaded)
|
||||
.len()
|
||||
};
|
||||
let ping_time_seconds = fetch.shared.ping_time().as_secs_f32();
|
||||
let throughput = fetch.shared.throughput();
|
||||
|
||||
let ping_time_seconds =
|
||||
Duration::from_millis(fetch.shared.ping_time_ms.load(Ordering::Relaxed) as u64)
|
||||
.as_secs_f32();
|
||||
let download_rate = fetch.session.channel().get_download_rate_estimate();
|
||||
let desired_pending_bytes = max(
|
||||
(PREFETCH_THRESHOLD_FACTOR
|
||||
* ping_time_seconds
|
||||
* fetch.shared.bytes_per_second as f32) as usize,
|
||||
(ping_time_seconds * throughput as f32) as usize,
|
||||
);
|
||||
|
||||
let desired_pending_bytes = max(
|
||||
(PREFETCH_THRESHOLD_FACTOR
|
||||
* ping_time_seconds
|
||||
* fetch.shared.bytes_per_second as f32) as usize,
|
||||
(FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f32)
|
||||
as usize,
|
||||
);
|
||||
|
||||
if bytes_pending < desired_pending_bytes {
|
||||
fetch.pre_fetch_more_data(
|
||||
desired_pending_bytes - bytes_pending,
|
||||
max_requests_to_send,
|
||||
)?;
|
||||
}
|
||||
if bytes_pending < desired_pending_bytes {
|
||||
fetch.pre_fetch_more_data(desired_pending_bytes - bytes_pending)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue