From cd367faec21a9eb45774b158ae37097215021f31 Mon Sep 17 00:00:00 2001 From: Hachi-R Date: Fri, 11 Apr 2025 13:46:41 -0300 Subject: [PATCH 1/5] fix: oneshot channel --- rust_rpc/src/main.rs | 39 ++++++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/rust_rpc/src/main.rs b/rust_rpc/src/main.rs index 958708ba..5fba6ad4 100644 --- a/rust_rpc/src/main.rs +++ b/rust_rpc/src/main.rs @@ -422,6 +422,8 @@ impl Downloader { let progress_clone = progress.bar.clone(); let filename = real_filename.clone(); + let (log_cancel_tx, mut log_cancel_rx) = tokio::sync::oneshot::channel(); + let log_task = tokio::spawn(async move { let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1)); let tracker = ProgressTracker { @@ -429,22 +431,28 @@ impl Downloader { }; loop { - interval.tick().await; - if let Some(stats) = tracker.get_stats() { - let json_output = json!({ - "progress": stats.progress_percent, - "speed_bps": stats.speed_bytes_per_sec, - "downloaded_bytes": stats.bytes_downloaded, - "total_bytes": stats.total_size, - "eta_seconds": stats.eta_seconds, - "elapsed_seconds": stats.elapsed_seconds, - "filename": filename - }); - println!("{}", json_output); + tokio::select! { + _ = interval.tick() => { + if let Some(stats) = tracker.get_stats() { + let json_output = json!({ + "progress": stats.progress_percent, + "speed_bps": stats.speed_bytes_per_sec, + "downloaded_bytes": stats.bytes_downloaded, + "total_bytes": stats.total_size, + "eta_seconds": stats.eta_seconds, + "elapsed_seconds": stats.elapsed_seconds, + "filename": filename + }); + println!("{}", json_output); + } + } + _ = &mut log_cancel_rx => { + break; + } } } }); - Some(log_task) + Some((log_task, log_cancel_tx)) } else { None }; @@ -499,8 +507,9 @@ impl Downloader { progress.finish(); - if let Some(log_handle) = log_progress { - log_handle.abort(); + if let Some((log_handle, log_cancel_tx)) = log_progress { + let _ = log_cancel_tx.send(()); + let _ = log_handle.await; } let manager = resume_manager.lock().await; From e27536c6b3fd3e48971177d8e4de0a11b325f59c Mon Sep 17 00:00:00 2001 From: Hachi-R Date: Fri, 11 Apr 2025 13:49:16 -0300 Subject: [PATCH 2/5] feat: chunks vector allocation --- rust_rpc/src/main.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rust_rpc/src/main.rs b/rust_rpc/src/main.rs index 5fba6ad4..1e80e442 100644 --- a/rust_rpc/src/main.rs +++ b/rust_rpc/src/main.rs @@ -233,7 +233,8 @@ impl HydraHeader { } fn get_incomplete_chunks(&self) -> Vec<(u64, u64)> { - let mut chunks = Vec::new(); + let incomplete_count = self.chunk_count as usize - self.chunks_bitmap.count_ones(); + let mut chunks = Vec::with_capacity(incomplete_count); let chunk_size = self.chunk_size as u64; for i in 0..self.chunk_count as usize { From d2a868b504f4f13e9dc360c87c0489f4c59e1b61 Mon Sep 17 00:00:00 2001 From: Hachi-R Date: Fri, 11 Apr 2025 13:51:32 -0300 Subject: [PATCH 3/5] fix: update retry backoff --- rust_rpc/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust_rpc/src/main.rs b/rust_rpc/src/main.rs index 1e80e442..af0211ae 100644 --- a/rust_rpc/src/main.rs +++ b/rust_rpc/src/main.rs @@ -552,7 +552,7 @@ impl Downloader { return Err(e); } tokio::time::sleep(tokio::time::Duration::from_millis( - RETRY_BACKOFF_MS * retries as u64, + RETRY_BACKOFF_MS * (2_u64.pow(retries as u32 - 1)), )) .await; } From 555b3dbb1db10df0e395c4b4eca43b2bcde76632 Mon Sep 17 00:00:00 2001 From: Hachi-R Date: Fri, 11 Apr 2025 14:00:22 -0300 Subject: [PATCH 4/5] fix: improve file rename handling --- rust_rpc/src/main.rs | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/rust_rpc/src/main.rs b/rust_rpc/src/main.rs index af0211ae..11f05326 100644 --- a/rust_rpc/src/main.rs +++ b/rust_rpc/src/main.rs @@ -26,6 +26,7 @@ const DEFAULT_RESUME_ONLY: bool = false; const HEADER_SIZE: usize = 4096; const MAGIC_NUMBER: &[u8; 5] = b"HYDRA"; const FORMAT_VERSION: u8 = 1; +const FINALIZE_BUFFER_SIZE: usize = 1024 * 1024; #[derive(Parser)] #[command(name = "hydra-httpdl")] @@ -809,17 +810,23 @@ impl ResumeManager { let source = File::open(&self.file_path)?; let dest = File::create(&temp_path)?; - let mut reader = BufReader::new(source); - let mut writer = BufWriter::new(dest); + let mut reader = BufReader::with_capacity(FINALIZE_BUFFER_SIZE, source); + let mut writer = BufWriter::with_capacity(FINALIZE_BUFFER_SIZE, dest); reader.seek(SeekFrom::Start(HEADER_SIZE as u64))?; std::io::copy(&mut reader, &mut writer)?; writer.flush()?; + drop(writer); - std::fs::rename(temp_path, &self.file_path)?; - - Ok(()) + match std::fs::rename(&temp_path, &self.file_path) { + Ok(_) => Ok(()), + Err(_) => { + let _ = std::fs::remove_file(&self.file_path); + std::fs::rename(&temp_path, &self.file_path)?; + Ok(()) + } + } } } From 8c442e742a40a41331ec354e9e44d7f6f58ce9d6 Mon Sep 17 00:00:00 2001 From: Hachi-R Date: Fri, 11 Apr 2025 14:02:06 -0300 Subject: [PATCH 5/5] fix: add range request support validation --- rust_rpc/src/main.rs | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/rust_rpc/src/main.rs b/rust_rpc/src/main.rs index 11f05326..1908ce87 100644 --- a/rust_rpc/src/main.rs +++ b/rust_rpc/src/main.rs @@ -620,6 +620,28 @@ impl Downloader { async fn get_file_info(&self) -> Result<(u64, Option, String)> { let resp = self.client.head(&self.config.url).send().await?; + let accepts_ranges = resp + .headers() + .get("accept-ranges") + .and_then(|v| v.to_str().ok()) + .map(|v| v.contains("bytes")) + .unwrap_or(false); + + if !accepts_ranges { + let range_check = self + .client + .get(&self.config.url) + .header("Range", "bytes=0-0") + .send() + .await?; + + if range_check.status() != StatusCode::PARTIAL_CONTENT { + anyhow::bail!( + "Server does not support Range requests, cannot continue with parallel download" + ); + } + } + let file_size = if let Some(content_length) = resp.headers().get("content-length") { content_length.to_str()?.parse()? } else {