diff --git a/audio/src/fetch.rs b/audio/src/fetch.rs index dd7ae51a..8ca37c64 100644 --- a/audio/src/fetch.rs +++ b/audio/src/fetch.rs @@ -201,6 +201,7 @@ struct AudioFileShared { file_size: usize, cond: Condvar, download_status: Mutex, + number_of_open_requests: AtomicUsize, ping_time_ms: AtomicUsize, read_position: AtomicUsize, } @@ -213,6 +214,7 @@ impl AudioFileOpenStreaming { file_size: size, cond: Condvar::new(), download_status: Mutex::new(AudioFileDownloadStatus {requested: RangeSet::new(), downloaded: RangeSet::new()}), + number_of_open_requests: AtomicUsize::new(0), ping_time_ms: AtomicUsize::new(0), read_position: AtomicUsize::new(0), }); @@ -433,6 +435,7 @@ struct AudioFileFetchDataReceiver { data_offset: usize, request_length: usize, request_sent_time: Option, + measure_ping_time: bool, } impl AudioFileFetchDataReceiver { @@ -445,6 +448,10 @@ impl AudioFileFetchDataReceiver { request_sent_time: Instant, ) -> AudioFileFetchDataReceiver { + let measure_ping_time = shared.number_of_open_requests.load(atomic::Ordering::SeqCst) == 0; + + shared.number_of_open_requests.fetch_add(1, atomic::Ordering::SeqCst); + AudioFileFetchDataReceiver { shared: shared, data_rx: data_rx, @@ -452,6 +459,7 @@ impl AudioFileFetchDataReceiver { data_offset: data_offset, request_length: request_length, request_sent_time: Some(request_sent_time), + measure_ping_time: measure_ping_time, } } } @@ -468,6 +476,9 @@ impl AudioFileFetchDataReceiver { download_status.requested.subtract_range(&missing_range); self.shared.cond.notify_all(); } + + self.shared.number_of_open_requests.fetch_sub(1, atomic::Ordering::SeqCst); + } } @@ -480,15 +491,18 @@ impl Future for AudioFileFetchDataReceiver { trace!("Looping data_receiver for offset {} and length {}", self.data_offset, self.request_length); match self.data_rx.poll() { Ok(Async::Ready(Some(data))) => { - if let Some(request_sent_time) = self.request_sent_time { - let duration = Instant::now() - request_sent_time; - let duration_ms: u64; - if duration.as_secs() > MAXIMUM_ASSUMED_PING_TIME_SECONDS { - duration_ms = MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000; - }else { - duration_ms = duration.as_secs() *1000 + duration.subsec_millis() as u64; + if self.measure_ping_time { + if let Some(request_sent_time) = self.request_sent_time { + let duration = Instant::now() - request_sent_time; + let duration_ms: u64; + if duration.as_secs() > MAXIMUM_ASSUMED_PING_TIME_SECONDS { + duration_ms = MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000; + } else { + duration_ms = duration.as_secs() * 1000 + duration.subsec_millis() as u64; + } + let _ = self.file_data_tx.unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize)); + self.measure_ping_time = false; } - let _ = self.file_data_tx.unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize)); } let data_size = data.len(); trace!("data_receiver got {} bytes of data", data_size); @@ -502,14 +516,15 @@ impl Future for AudioFileFetchDataReceiver { } if self.request_length == 0 { trace!("Data receiver completed at position {}", self.data_offset); + self.finish(); return Ok(Async::Ready(())); } } Ok(Async::Ready(None)) => { if self.request_length > 0 { warn!("Received less data from server than requested."); - self.finish(); } + self.finish(); return Ok(Async::Ready(())); } Ok(Async::NotReady) => {