1
0
Fork 0
mirror of https://github.com/librespot-org/librespot.git synced 2025-10-06 03:50:06 +02:00

Merge branch 'dev' into new-api-client

This commit is contained in:
Roderick van Domburg 2021-06-26 00:14:20 +02:00
commit 7cd1b7a26a
No known key found for this signature in database
GPG key ID: FE2585E713F9F30A
47 changed files with 2586 additions and 1601 deletions

View file

@ -1,8 +1,9 @@
use std::cmp::{max, min};
use std::io::{Seek, SeekFrom, Write};
use std::sync::{atomic, Arc};
use std::time::Instant;
use std::time::{Duration, Instant};
use atomic::Ordering;
use byteorder::{BigEndian, WriteBytesExt};
use bytes::Bytes;
use futures_util::StreamExt;
@ -17,7 +18,7 @@ use crate::range_set::{Range, RangeSet};
use super::{AudioFileShared, DownloadStrategy, StreamLoaderCommand};
use super::{
FAST_PREFETCH_THRESHOLD_FACTOR, MAXIMUM_ASSUMED_PING_TIME_SECONDS, MAX_PREFETCH_REQUESTS,
FAST_PREFETCH_THRESHOLD_FACTOR, MAXIMUM_ASSUMED_PING_TIME, MAX_PREFETCH_REQUESTS,
MINIMUM_DOWNLOAD_SIZE, PREFETCH_THRESHOLD_FACTOR,
};
@ -58,7 +59,7 @@ struct PartialFileData {
}
enum ReceivedData {
ResponseTimeMs(usize),
ResponseTime(Duration),
Data(PartialFileData),
}
@ -75,7 +76,7 @@ async fn receive_data(
let old_number_of_request = shared
.number_of_open_requests
.fetch_add(1, atomic::Ordering::SeqCst);
.fetch_add(1, Ordering::SeqCst);
let mut measure_ping_time = old_number_of_request == 0;
@ -87,14 +88,11 @@ async fn receive_data(
};
if measure_ping_time {
let duration = Instant::now() - request_sent_time;
let duration_ms: u64;
if 0.001 * (duration.as_millis() as f64) > MAXIMUM_ASSUMED_PING_TIME_SECONDS {
duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64;
} else {
duration_ms = duration.as_millis() as u64;
let mut duration = Instant::now() - request_sent_time;
if duration > MAXIMUM_ASSUMED_PING_TIME {
duration = MAXIMUM_ASSUMED_PING_TIME;
}
let _ = file_data_tx.send(ReceivedData::ResponseTimeMs(duration_ms as usize));
let _ = file_data_tx.send(ReceivedData::ResponseTime(duration));
measure_ping_time = false;
}
let data_size = data.len();
@ -128,7 +126,7 @@ async fn receive_data(
shared
.number_of_open_requests
.fetch_sub(1, atomic::Ordering::SeqCst);
.fetch_sub(1, Ordering::SeqCst);
if result.is_err() {
warn!(
@ -150,7 +148,7 @@ struct AudioFileFetch {
file_data_tx: mpsc::UnboundedSender<ReceivedData>,
complete_tx: Option<oneshot::Sender<NamedTempFile>>,
network_response_times_ms: Vec<usize>,
network_response_times: Vec<Duration>,
}
// Might be replaced by enum from std once stable
@ -238,7 +236,7 @@ impl AudioFileFetch {
// download data from after the current read position first
let mut tail_end = RangeSet::new();
let read_position = self.shared.read_position.load(atomic::Ordering::Relaxed);
let read_position = self.shared.read_position.load(Ordering::Relaxed);
tail_end.add_range(&Range::new(
read_position,
self.shared.file_size - read_position,
@ -268,26 +266,23 @@ impl AudioFileFetch {
fn handle_file_data(&mut self, data: ReceivedData) -> ControlFlow {
match data {
ReceivedData::ResponseTimeMs(response_time_ms) => {
trace!("Ping time estimated as: {} ms.", response_time_ms);
ReceivedData::ResponseTime(response_time) => {
trace!("Ping time estimated as: {}ms", response_time.as_millis());
// record the response time
self.network_response_times_ms.push(response_time_ms);
// prune old response times. Keep at most three.
while self.network_response_times_ms.len() > 3 {
self.network_response_times_ms.remove(0);
// prune old response times. Keep at most two so we can push a third.
while self.network_response_times.len() >= 3 {
self.network_response_times.remove(0);
}
// record the response time
self.network_response_times.push(response_time);
// stats::median is experimental. So we calculate the median of up to three ourselves.
let ping_time_ms: usize = match self.network_response_times_ms.len() {
1 => self.network_response_times_ms[0] as usize,
2 => {
((self.network_response_times_ms[0] + self.network_response_times_ms[1])
/ 2) as usize
}
let ping_time = match self.network_response_times.len() {
1 => self.network_response_times[0],
2 => (self.network_response_times[0] + self.network_response_times[1]) / 2,
3 => {
let mut times = self.network_response_times_ms.clone();
let mut times = self.network_response_times.clone();
times.sort_unstable();
times[1]
}
@ -297,7 +292,7 @@ impl AudioFileFetch {
// store our new estimate for everyone to see
self.shared
.ping_time_ms
.store(ping_time_ms, atomic::Ordering::Relaxed);
.store(ping_time.as_millis() as usize, Ordering::Relaxed);
}
ReceivedData::Data(data) => {
self.output
@ -391,7 +386,7 @@ pub(super) async fn audio_file_fetch(
file_data_tx,
complete_tx: Some(complete_tx),
network_response_times_ms: Vec::new(),
network_response_times: Vec::with_capacity(3),
};
loop {
@ -409,10 +404,8 @@ pub(super) async fn audio_file_fetch(
}
if fetch.get_download_strategy() == DownloadStrategy::Streaming() {
let number_of_open_requests = fetch
.shared
.number_of_open_requests
.load(atomic::Ordering::SeqCst);
let number_of_open_requests =
fetch.shared.number_of_open_requests.load(Ordering::SeqCst);
if number_of_open_requests < MAX_PREFETCH_REQUESTS {
let max_requests_to_send = MAX_PREFETCH_REQUESTS - number_of_open_requests;
@ -425,14 +418,15 @@ pub(super) async fn audio_file_fetch(
};
let ping_time_seconds =
0.001 * fetch.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64;
Duration::from_millis(fetch.shared.ping_time_ms.load(Ordering::Relaxed) as u64)
.as_secs_f32();
let download_rate = fetch.session.channel().get_download_rate_estimate();
let desired_pending_bytes = max(
(PREFETCH_THRESHOLD_FACTOR
* ping_time_seconds
* fetch.shared.stream_data_rate as f64) as usize,
(FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64)
* fetch.shared.stream_data_rate as f32) as usize,
(FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f32)
as usize,
);