Thomas před 3 týdny
rodič
revize
af50b44ea0
4 změnil soubory, kde provedl 208 přidání a 155 odebrání
  1. 6 9
      src/annotation/echtvar.rs
  2. 4 4
      src/annotation/vep.rs
  3. 84 21
      src/helpers.rs
  4. 114 121
      src/variant/variant_collection.rs

+ 6 - 9
src/annotation/echtvar.rs

@@ -1,18 +1,15 @@
-use std::{
-    io::{BufRead, BufReader},
-    path::{Path, PathBuf},
-    process::{Command, Stdio},
-};
+use std::path::{Path, PathBuf};
 
-use anyhow::{Context, Ok, Result};
-use log::warn;
+use anyhow::{Ok, Result};
 use uuid::Uuid;
 
 use crate::{
     commands::{
-        CapturedOutput, Command as JobCommand, LocalBatchRunner, LocalRunner, SbatchRunner, SlurmParams, SlurmRunner
+        CapturedOutput, Command as JobCommand, LocalBatchRunner, LocalRunner, SbatchRunner,
+        SlurmParams, SlurmRunner,
     },
-    config::Config, run,
+    config::Config,
+    run,
 };
 
 use super::{cosmic::Cosmic, gnomad::GnomAD};

+ 4 - 4
src/annotation/vep.rs

@@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
 use std::{
     cmp::{Ordering, Reverse},
     fmt::Display,
-    path::PathBuf,
+    path::{Path, PathBuf},
     str::FromStr,
 };
 
@@ -648,10 +648,10 @@ pub struct VepJob {
 }
 
 impl VepJob {
-    pub fn new(in_path: &str, out_path: &str, config: &Config) -> Self {
+    pub fn new(in_path: impl AsRef<Path>, out_path: impl AsRef<Path>, config: &Config) -> Self {
         VepJob {
-            in_vcf: in_path.into(),
-            out_vcf: out_path.into(),
+            in_vcf: in_path.as_ref().to_path_buf(),
+            out_vcf: out_path.as_ref().to_path_buf(),
             config: config.clone(),
         }
     }

+ 84 - 21
src/helpers.rs

@@ -5,13 +5,15 @@ use log::{debug, error, warn};
 use rust_htslib::bam::Read;
 use rustc_hash::FxHashMap;
 use serde::{Deserialize, Serialize};
+use uuid::Uuid;
 use std::{
     cmp::Ordering,
     collections::{HashMap, HashSet},
     fmt, fs,
     iter::Sum,
     ops::{Add, Div},
-    path::{Path, PathBuf}, sync::atomic::AtomicBool,
+    path::{Path, PathBuf},
+    sync::atomic::AtomicBool,
 };
 
 pub fn find_unique_file(dir_path: &str, suffix: &str) -> anyhow::Result<String> {
@@ -80,8 +82,8 @@ pub fn force_or_not(_path: &str, _force: bool) -> anyhow::Result<()> {
     // }
     //
     // if output_exists {
-//     info!("{} already exists.", path.display())
-// }
+    //     info!("{} already exists.", path.display())
+    // }
     Ok(())
 }
 
@@ -275,17 +277,6 @@ pub fn par_intersection<T: Ord + Send + Sync + Clone>(
         })
 }
 
-pub fn temp_file_path(suffix: &str) -> std::io::Result<PathBuf> {
-    let temp_path = tempfile::Builder::new()
-        .prefix("pandora-temp-")
-        .suffix(suffix)
-        .rand_bytes(5)
-        .tempfile()?
-        .into_temp_path();
-
-    Ok(temp_path.to_path_buf())
-}
-
 pub fn estimate_shannon_entropy(dna_sequence: &str) -> f64 {
     let m = dna_sequence.len() as f64;
 
@@ -834,12 +825,47 @@ impl Drop for TempDirGuard {
     }
 }
 
-/// Guard that tracks temporary files and cleans them up on drop unless disarmed.
+/// Guard that tracks temporary filesystem paths and cleans them up on drop
+/// unless explicitly disarmed.
+///
+/// This is intended for pipeline-style code where intermediate outputs
+/// are produced by downstream tools. The guard only tracks paths; it does
+/// **not** create files itself.
+///
+/// If the pipeline completes successfully, call [`disarm`] to prevent
+/// cleanup. Otherwise, all tracked paths are removed on drop on a
+/// best-effort basis.
+///
+/// # Example
+///
+/// ```no_run
+/// use std::path::PathBuf;
+///
+/// let tmp_dir = PathBuf::from("/tmp");
+/// let mut guard = TempFileGuard::new();
+///
+/// let path1 = guard.temp_path(".tmp", &tmp_dir);
+/// let path2 = guard.temp_path(".tmp", &tmp_dir);
+///
+/// // Produce outputs at `path1` and `path2`
+///
+/// // Mark success
+/// guard.disarm();
+/// ```
+///
+/// If `disarm()` is not called, both paths are cleaned up automatically
+/// when the guard is dropped.
 pub struct TempFileGuard {
     files: Vec<PathBuf>,
     disarmed: AtomicBool,
 }
 
+impl Default for TempFileGuard {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
 impl TempFileGuard {
     pub fn new() -> Self {
         Self {
@@ -861,11 +887,12 @@ impl TempFileGuard {
     /// Disarms the guard, preventing cleanup on drop.
     /// Call this when the pipeline succeeds.
     pub fn disarm(&self) {
-        self.disarmed.store(true, std::sync::atomic::Ordering::SeqCst);
+        self.disarmed
+            .store(true, std::sync::atomic::Ordering::SeqCst);
     }
 
     /// Manually clean up all tracked files.
-    pub fn cleanup(&self) {
+    pub fn cleanup(&mut self) {
         for file in &self.files {
             if file.exists() {
                 if let Err(e) = remove_bam_with_index(file) {
@@ -873,11 +900,38 @@ impl TempFileGuard {
                 }
             }
         }
+        self.files.clear();
     }
 
     pub fn clear(&mut self) {
         self.files.clear();
     }
+
+    /// Generates a unique temporary path inside `tmp_dir` and registers it
+    /// for cleanup.
+    ///
+    /// The returned path is derived from a UUIDv4 and **no file is created**.
+    /// The caller is responsible for creating or writing to the path.
+    ///
+    /// # Example
+    ///
+    /// ```no_run
+    /// use std::path::PathBuf;
+    ///
+    /// let tmp_dir = PathBuf::from("/tmp");
+    /// let mut guard = TempFileGuard::new();
+    ///
+    /// let path = guard.temp_path(".tmp", &tmp_dir);
+    /// // create or write to `path`
+    /// ```
+    pub fn tmp_path(&mut self, suffix: &str, tmp_dir: impl AsRef<Path>) -> PathBuf {
+        let name = format!("pandora-tmp-{}{}", Uuid::new_v4(), suffix);
+
+        let tmp_dir = tmp_dir.as_ref();
+        let path = tmp_dir.join(name);
+        self.track(path.clone());
+        path
+    }
 }
 
 impl Drop for TempFileGuard {
@@ -892,6 +946,18 @@ impl Drop for TempFileGuard {
     }
 }
 
+// pub fn temp_file_path(suffix: &str, config: &Config) -> std::io::Result<PathBuf> {
+//     let p = config.tmp_dir;
+//     let temp_path = tempfile::Builder::new()
+//         .prefix("pandora-temp-")
+//         .suffix(suffix)
+//         .rand_bytes(5)
+//         .tempfile()?
+//         .into_temp_path();
+//
+//     Ok(temp_path.to_path_buf())
+// }
+
 /// Extracts genome sizes from BAM header.
 pub fn get_genome_sizes(
     header: &rust_htslib::bam::Header,
@@ -915,12 +981,9 @@ pub fn bam_contigs<P: AsRef<std::path::Path>>(bam: P) -> anyhow::Result<Vec<Stri
     let reader = rust_htslib::bam::Reader::from_path(&bam)?;
     let header = rust_htslib::bam::Header::from_template(reader.header());
 
-    Ok(get_genome_sizes(&header)?
-        .into_keys()
-        .collect())
+    Ok(get_genome_sizes(&header)?.into_keys().collect())
 }
 
-
 /// Split genome into ~n_parts equal-sized chunks (by number of bases),
 /// returning for each chunk a list of regions in `ctg:start-end` form.
 /// Split genome into ~n_parts equal-sized chunks (by bases).

+ 114 - 121
src/variant/variant_collection.rs

@@ -2,7 +2,7 @@ use std::{
     collections::{HashMap, HashSet},
     fs::{self, File},
     io::{Read, Write},
-    path::{Path, PathBuf},
+    path::PathBuf,
     sync::Arc,
 };
 
@@ -34,8 +34,8 @@ use crate::{
     },
     config::Config,
     helpers::{
-        app_storage_dir, detect_repetition, estimate_shannon_entropy, mean, temp_file_path,
-        Hash128, Repeat,
+        app_storage_dir, detect_repetition, estimate_shannon_entropy, mean, Hash128, Repeat,
+        TempFileGuard,
     },
     io::{fasta::sequence_at, readers::get_reader, vcf::vcf_header, writers::get_gz_writer},
     positions::{overlaps_par, GenomePosition, GenomeRange, GetGenomePosition},
@@ -1705,8 +1705,10 @@ impl ExternalAnnotation {
         let config: &Config = &self.config;
 
         let mut results: Vec<(Hash128, Vec<VEP>)> = if !unfound.is_empty() {
-            let optimal_chunk_size = unfound.len().div_ceil(max_chunks as usize);
-            let optimal_chunk_size = optimal_chunk_size.max(min_chunk_size);
+            let optimal_chunk_size = unfound
+                .len()
+                .div_ceil(max_chunks as usize)
+                .max(min_chunk_size);
 
             debug!("{} chunks to process.", unfound.len() / optimal_chunk_size);
             unfound
@@ -1719,7 +1721,7 @@ impl ExternalAnnotation {
                         e
                     })
                 })
-                .collect::<Result<Vec<_>, _>>()? // Collect results into a Result<Vec<_>>
+                .collect::<Result<Vec<_>, _>>()?
                 .into_iter()
                 .flatten()
                 .collect::<Vec<_>>()
@@ -1728,107 +1730,106 @@ impl ExternalAnnotation {
         };
 
         if !sv.is_empty() {
-            let optimal_chunk_size = sv.len().div_ceil(max_chunks as usize);
-            let optimal_chunk_size = optimal_chunk_size.max(min_chunk_size);
+            let optimal_chunk_size = sv.len().div_ceil(max_chunks as usize).max(min_chunk_size);
 
-            let results_sv = sv
+            let results_sv: Vec<(Hash128, Vec<VEP>)> = sv
                 .par_chunks(optimal_chunk_size)
-                .flat_map(|chunk| -> anyhow::Result<Vec<_>> {
-                    let in_tmp = temp_file_path(".vcf")
-                        .context("Can't create tmp path for in tmp")?
-                        .to_str()
-                        .unwrap()
-                        .to_string();
-                    let out_vep = temp_file_path("_vep.txt")
-                        .context("Can't create tmp path for in tmp")?
-                        .to_str()
-                        .unwrap()
-                        .to_string();
-
-                    let out_summary = format!("{out_vep}_summary.html");
-                    let out_warnings = format!("{out_vep}_warnings.txt");
-
-                    // Write input VCF
-                    let mut vcf =
-                        File::create(&in_tmp).context("Can't create input vcf file for VEP.")?;
-                    writeln!(vcf, "{}", header)?;
-                    for (i, mut row) in chunk.iter().cloned().enumerate() {
-                        row.id = (i + 1).to_string();
-                        let s = row.into_vcf_row();
-                        writeln!(vcf, "{s}",)?;
-                    }
+                .enumerate()
+                .map(
+                    |(chunk_i, chunk)| -> anyhow::Result<Vec<(Hash128, Vec<VEP>)>> {
+                        debug!("Processing SV chunk {chunk_i}");
+
+                        let mut guard = TempFileGuard::new();
+                        let in_tmp = guard.tmp_path(".vcf", &config.tmp_dir);
+                        let out_vep = guard.tmp_path("_vep.txt", &config.tmp_dir);
+
+                        let out_summary =
+                            PathBuf::from(format!("{}_summary.html", out_vep.display()));
+                        let out_warnings =
+                            PathBuf::from(format!("{}_warnings.txt", out_vep.display()));
+                        guard.track(out_summary.clone());
+                        guard.track(out_warnings.clone());
+
+                        // Write input file
+                        {
+                            let mut vcf =
+                                File::create(&in_tmp).context("Can't create input file.")?;
+                            writeln!(vcf, "{}", header)?;
+                            for (i, mut row) in chunk.iter().cloned().enumerate() {
+                                row.id = (i + 1).to_string();
+                                let s = row.into_vcf_row();
+                                writeln!(vcf, "{s}")?;
+                            }
+                        }
 
-                    let mut vep_job = VepJob::new(&in_tmp, &out_vep, config);
-                    run!(config, &mut vep_job).context("Error while running VEP.")?;
-
-                    let mut reader_vep = ReaderBuilder::new()
-                        .delimiter(b'\t')
-                        .has_headers(false)
-                        .comment(Some(b'#'))
-                        .flexible(true)
-                        .from_reader(
-                            fs::File::open(&out_vep).context("Can't open VEP result file.")?,
-                        );
-
-                    let mut lines: HashMap<u64, Vec<VepLine>> = HashMap::new();
-                    for line in reader_vep.deserialize() {
-                        let line: VepLine = line.context("Failed to deserialize VepLine")?;
-                        let key = line
-                            .uploaded_variation
-                            .parse::<u64>()
-                            .context("Failed to parse uploaded_variation as u64")?;
-
-                        lines.entry(key).or_default().push(line);
-                    }
+                        let mut vep_job = VepJob::new(&in_tmp, &out_vep, config);
+                        run!(config, &mut vep_job).context("Error while running VEP.")?;
 
-                    fs::remove_file(&in_tmp).context(format!("Can't remove file {in_tmp}"))?;
+                        let mut reader_vep = ReaderBuilder::new()
+                            .delimiter(b'\t')
+                            .has_headers(false)
+                            .comment(Some(b'#'))
+                            .flexible(true)
+                            .from_reader(
+                                fs::File::open(&out_vep).context("Can't open result file.")?,
+                            );
 
-                    let mut n_not_vep = 0;
-                    let mut chunk_results: Vec<(Hash128, Vec<VEP>)> = Vec::new();
+                        let mut lines: HashMap<u64, Vec<VepLine>> = HashMap::new();
+                        for line in reader_vep.deserialize() {
+                            let line: VepLine = line.context("Failed to deserialize VepLine")?;
+                            let key = line
+                                .uploaded_variation
+                                .parse::<u64>()
+                                .context("Failed to parse uploaded_variation as u64")?;
+                            lines.entry(key).or_default().push(line);
+                        }
+
+                        let mut n_not_vep = 0usize;
+                        let mut chunk_results: Vec<(Hash128, Vec<VEP>)> = Vec::new();
 
-                    chunk.iter().enumerate().for_each(|(i, entry)| {
-                        let k = (i + 1) as u64;
+                        chunk.iter().enumerate().for_each(|(i, entry)| {
+                            let k = (i + 1) as u64;
 
-                        if let Some(vep_lines) = lines.get(&k) {
-                            if let Ok(veps) = vep_lines.iter().map(VEP::try_from).collect() {
-                                chunk_results.push((entry.hash(), veps));
+                            if let Some(vep_lines) = lines.get(&k) {
+                                if let Ok(veps) = vep_lines.iter().map(VEP::try_from).collect() {
+                                    chunk_results.push((entry.hash(), veps));
+                                }
+                            } else {
+                                warn!(
+                                    "No entry for {}\t{}\t{}",
+                                    entry.position, entry.reference, entry.alternative
+                                );
+                                n_not_vep += 1;
                             }
-                        } else {
-                            warn!(
-                                "No VEP entry for {}\t{}\t{}",
-                                entry.position, entry.reference, entry.alternative
-                            );
-                            n_not_vep += 1;
+                        });
+
+                        if n_not_vep > 0 && out_warnings.exists() {
+                            debug!("{n_not_vep} entries not annotated.");
+                            let warnings =
+                                fs::read_to_string(&out_warnings).with_context(|| {
+                                    format!("Can't read warnings file {}", out_warnings.display())
+                                })?;
+                            warn!("Warnings:\n{warnings}");
                         }
-                    });
-                    fs::remove_file(&out_vep).context(format!("Can't remove file {out_vep}"))?;
-
-                    if n_not_vep > 0 {
-                        debug!("{n_not_vep} variants not annotated by VEP.");
-                        let warnings = fs::read_to_string(&out_warnings)
-                            .context(format!("Can't read VEP warnings: {out_warnings}"))?;
-                        warn!("VEP warnings:\n{warnings}");
-                    }
-                    if Path::new(&out_warnings).exists() {
-                        fs::remove_file(&out_warnings)
-                            .context(format!("Can't remove file {out_warnings}"))?;
-                    }
-                    if Path::new(&out_summary).exists() {
-                        fs::remove_file(&out_summary)
-                            .context(format!("Can't remove file {out_summary}"))?;
-                    }
-                    Ok(chunk_results)
-                })
+
+                        // Success: remove temps and silence Drop warning.
+                        guard.cleanup();
+                        guard.disarm();
+
+                        Ok(chunk_results)
+                    },
+                )
+                .collect::<Result<Vec<_>, _>>()?
+                .into_iter()
                 .flatten()
-                .collect::<Vec<_>>();
+                .collect();
 
             results.extend(results_sv);
         }
+
         info!("{} total variants annotaded by VEP.", results.len());
 
         for (hash, veps) in results {
-            // self.update_database(hash, "vep", &serde_json::to_vec(&veps)?)?;
-
             annotations
                 .store
                 .entry(hash)
@@ -1855,17 +1856,15 @@ fn process_vep_chunk(
     header: &str,
     config: &Config,
 ) -> anyhow::Result<Vec<(Hash128, Vec<VEP>)>> {
-    let in_tmp = temp_file_path("vcf")?
-        .to_str()
-        .ok_or_else(|| anyhow::anyhow!("Failed to convert temp file path to string"))?
-        .to_string();
-    let out_vep = temp_file_path("_vep.txt")?
-        .to_str()
-        .ok_or_else(|| anyhow::anyhow!("Failed to convert temp file path to string"))?
-        .to_string();
-
-    let out_summary = format!("{out_vep}_summary.html");
-    let out_warnings = format!("{out_vep}_warnings.txt");
+    let mut guard = TempFileGuard::new();
+
+    let in_tmp = guard.tmp_path(".vcf", &config.tmp_dir);
+    let out_vep = guard.tmp_path("_vep.txt", &config.tmp_dir);
+
+    let out_summary = PathBuf::from(format!("{}_summary.html", out_vep.display()));
+    let out_warnings = PathBuf::from(format!("{}_warnings.txt", out_vep.display()));
+    guard.track(out_summary.clone());
+    guard.track(out_warnings.clone());
 
     let mut vcf = File::create(&in_tmp)?;
     writeln!(vcf, "{}", header)?;
@@ -1885,7 +1884,7 @@ fn process_vep_chunk(
     let mut vep_job = VepJob::new(&in_tmp, &out_vep, config);
     if let Err(e) = run!(config, &mut vep_job) {
         error!("VEP error: {e}");
-        return Err(anyhow::anyhow!("VEP execution failed: {}", e)); // Propagate the error.
+        return Err(anyhow::anyhow!("VEP execution failed: {}", e));
     }
 
     let mut reader_vep = ReaderBuilder::new()
@@ -1893,21 +1892,19 @@ fn process_vep_chunk(
         .has_headers(false)
         .comment(Some(b'#'))
         .flexible(true)
-        .from_reader(fs::File::open(&out_vep)?); // If this fails, the error is propagated.
+        .from_reader(fs::File::open(&out_vep)?);
 
     let mut lines: HashMap<u64, Vec<VepLine>> = HashMap::new();
     for line in reader_vep.deserialize() {
-        let line: VepLine = line.context("Failed to deserialize VepLine")?; // Propagate the error.
+        let line: VepLine = line.context("Failed to deserialize VepLine")?;
         let key = line
             .uploaded_variation
             .parse::<u64>()
-            .context("Failed to parse uploaded_variation as u64")?; // Propagate the error.
+            .context("Failed to parse uploaded_variation as u64")?;
         lines.entry(key).or_default().push(line);
     }
 
-    fs::remove_file(&in_tmp).context(format!("Can't remove file {in_tmp}"))?;
-
-    let mut n_not_vep = 0;
+    let mut n_not_vep = 0usize;
     let mut chunk_results: Vec<(Hash128, Vec<VEP>)> = Vec::new();
 
     chunk.iter().enumerate().for_each(|(i, entry)| {
@@ -1925,21 +1922,17 @@ fn process_vep_chunk(
             n_not_vep += 1;
         }
     });
-    fs::remove_file(&out_vep).context(format!("Can't remove file {out_vep}"))?;
 
-    if n_not_vep > 0 {
-        debug!("{n_not_vep} variants not annotated by VEP.");
+    if n_not_vep > 0 && out_warnings.exists() {
+        debug!("{n_not_vep} variants not annotated.");
         let warnings = fs::read_to_string(&out_warnings)
-            .context(format!("Can't read VEP warnings: {out_warnings}"))?;
-        warn!("VEP warnings:\n{warnings}");
+            .with_context(|| format!("Can't read warnings file {}", out_warnings.display()))?;
+        warn!("Warnings:\n{warnings}");
     }
 
-    if Path::new(&out_warnings).exists() {
-        fs::remove_file(&out_warnings).context(format!("Can't remove file {out_warnings}"))?;
-    }
-    if Path::new(&out_summary).exists() {
-        fs::remove_file(&out_summary).context(format!("Can't remove file {out_summary}"))?;
-    }
+    // Success: remove temps and avoid Drop warning.
+    guard.cleanup();
+    guard.disarm();
 
-    Ok(chunk_results) // Return the successful result.
+    Ok(chunk_results)
 }