Merge branch 'ci/rust-rpc' of github.com:hydralauncher/hydra into ci/rust-rpc

This commit is contained in:
Chubby Granny Chaser
2025-04-11 18:08:18 +01:00

View File

@@ -26,6 +26,7 @@ const DEFAULT_RESUME_ONLY: bool = false;
const HEADER_SIZE: usize = 4096;
const MAGIC_NUMBER: &[u8; 5] = b"HYDRA";
const FORMAT_VERSION: u8 = 1;
const FINALIZE_BUFFER_SIZE: usize = 1024 * 1024;
#[derive(Parser)]
#[command(name = "hydra-httpdl")]
@@ -233,7 +234,8 @@ impl HydraHeader {
}
fn get_incomplete_chunks(&self) -> Vec<(u64, u64)> {
let mut chunks = Vec::new();
let incomplete_count = self.chunk_count as usize - self.chunks_bitmap.count_ones();
let mut chunks = Vec::with_capacity(incomplete_count);
let chunk_size = self.chunk_size as u64;
for i in 0..self.chunk_count as usize {
@@ -422,6 +424,8 @@ impl Downloader {
let progress_clone = progress.bar.clone();
let filename = real_filename.clone();
let (log_cancel_tx, mut log_cancel_rx) = tokio::sync::oneshot::channel();
let log_task = tokio::spawn(async move {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
let tracker = ProgressTracker {
@@ -429,22 +433,28 @@ impl Downloader {
};
loop {
interval.tick().await;
if let Some(stats) = tracker.get_stats() {
let json_output = json!({
"progress": stats.progress_percent,
"speed_bps": stats.speed_bytes_per_sec,
"downloaded_bytes": stats.bytes_downloaded,
"total_bytes": stats.total_size,
"eta_seconds": stats.eta_seconds,
"elapsed_seconds": stats.elapsed_seconds,
"filename": filename
});
println!("{}", json_output);
tokio::select! {
_ = interval.tick() => {
if let Some(stats) = tracker.get_stats() {
let json_output = json!({
"progress": stats.progress_percent,
"speed_bps": stats.speed_bytes_per_sec,
"downloaded_bytes": stats.bytes_downloaded,
"total_bytes": stats.total_size,
"eta_seconds": stats.eta_seconds,
"elapsed_seconds": stats.elapsed_seconds,
"filename": filename
});
println!("{}", json_output);
}
}
_ = &mut log_cancel_rx => {
break;
}
}
}
});
Some(log_task)
Some((log_task, log_cancel_tx))
} else {
None
};
@@ -499,8 +509,9 @@ impl Downloader {
progress.finish();
if let Some(log_handle) = log_progress {
log_handle.abort();
if let Some((log_handle, log_cancel_tx)) = log_progress {
let _ = log_cancel_tx.send(());
let _ = log_handle.await;
}
let manager = resume_manager.lock().await;
@@ -542,7 +553,7 @@ impl Downloader {
return Err(e);
}
tokio::time::sleep(tokio::time::Duration::from_millis(
RETRY_BACKOFF_MS * retries as u64,
RETRY_BACKOFF_MS * (2_u64.pow(retries as u32 - 1)),
))
.await;
}
@@ -609,6 +620,28 @@ impl Downloader {
async fn get_file_info(&self) -> Result<(u64, Option<String>, String)> {
let resp = self.client.head(&self.config.url).send().await?;
let accepts_ranges = resp
.headers()
.get("accept-ranges")
.and_then(|v| v.to_str().ok())
.map(|v| v.contains("bytes"))
.unwrap_or(false);
if !accepts_ranges {
let range_check = self
.client
.get(&self.config.url)
.header("Range", "bytes=0-0")
.send()
.await?;
if range_check.status() != StatusCode::PARTIAL_CONTENT {
anyhow::bail!(
"Server does not support Range requests, cannot continue with parallel download"
);
}
}
let file_size = if let Some(content_length) = resp.headers().get("content-length") {
content_length.to_str()?.parse()?
} else {
@@ -799,17 +832,23 @@ impl ResumeManager {
let source = File::open(&self.file_path)?;
let dest = File::create(&temp_path)?;
let mut reader = BufReader::new(source);
let mut writer = BufWriter::new(dest);
let mut reader = BufReader::with_capacity(FINALIZE_BUFFER_SIZE, source);
let mut writer = BufWriter::with_capacity(FINALIZE_BUFFER_SIZE, dest);
reader.seek(SeekFrom::Start(HEADER_SIZE as u64))?;
std::io::copy(&mut reader, &mut writer)?;
writer.flush()?;
drop(writer);
std::fs::rename(temp_path, &self.file_path)?;
Ok(())
match std::fs::rename(&temp_path, &self.file_path) {
Ok(_) => Ok(()),
Err(_) => {
let _ = std::fs::remove_file(&self.file_path);
std::fs::rename(&temp_path, &self.file_path)?;
Ok(())
}
}
}
}