浏览代码

nanomonsv MA

Thomas 14 小时之前
父节点
当前提交
244ff9366f
共有 4 个文件被更改,包括 321 次插入47 次删除
  1. 6 1
      pandora-config.example.toml
  2. 213 46
      src/callers/nanomonsv.rs
  3. 87 0
      src/commands/modkit.rs
  4. 15 0
      src/config.rs

+ 6 - 1
pandora-config.example.toml

@@ -322,6 +322,12 @@ straglr_solo_output_dir = "{result_dir}/{id}/{time}/straglr"
 # Force Straglr recomputation.
 straglr_force = false
 
+#######################################
+# Marlin
+#######################################
+
+marlin_bed = "/home/t_steimle/ref/hs1/marlin_v1.probes_t2t.bed"
+
 #######################################
 # Bcftools configuration
 #######################################
@@ -413,7 +419,6 @@ promethion_runs_metadata_dir = "/data/promethion-runs-metadata"
 # JSON file mapping flowcell IDs / runs for Pandora.
 promethion_runs_input = "/data/pandora-flowcell-id.json"
 
-
 #######################################
 # Alignment / basecalling (Dorado)
 #######################################

+ 213 - 46
src/callers/nanomonsv.rs

@@ -66,12 +66,13 @@
 //!
 //! ## References
 //!
-//! - [NanomonSV GitHub](https://github.com/friend1ws/nanomonsv)
+//! - [NanomonSV uitHub](https://github.com/friend1ws/nanomonsv)
 //! - [NanomonSV Paper](https://doi.org/10.1186/s13059-020-02175-y)
 use rayon::prelude::*;
 use regex::Regex;
 use std::{
-    fs::{self},
+    fs::{self, File},
+    io::{BufRead, BufReader, BufWriter, Write},
     path::{Path, PathBuf},
     thread,
 };
@@ -141,7 +142,7 @@ impl LocalRunner for NanomonSV {}
 
 impl SlurmRunner for NanomonSV {
     fn slurm_args(&self) -> Vec<String> {
-        let mem = self.memory.unwrap_or(40);
+        let mem = self.memory.unwrap_or(20);
         SlurmParams {
             job_name: Some(format!("nanomonsv_{}", self.id)),
             cpus_per_task: Some(self.threads as u32),
@@ -154,7 +155,7 @@ impl SlurmRunner for NanomonSV {
 }
 impl SbatchRunner for NanomonSV {
     fn slurm_params(&self) -> SlurmParams {
-        let mem = self.memory.unwrap_or(40);
+        let mem = self.memory.unwrap_or(20);
         SlurmParams {
             job_name: Some(format!("nanomonsv_{}", self.id)),
             cpus_per_task: Some(self.threads as u32),
@@ -198,8 +199,16 @@ impl Initialize for NanomonSV {
         };
 
         if nanomonsv.config.nanomonsv_force {
-            remove_dir_if_exists(&nanomonsv.config.nanomonsv_output_dir(&nanomonsv.id, "diag"))?;
-            remove_dir_if_exists(&nanomonsv.config.nanomonsv_output_dir(&nanomonsv.id, "mrd"))?;
+            remove_dir_if_exists(
+                &nanomonsv
+                    .config
+                    .nanomonsv_output_dir(&nanomonsv.id, &config.tumoral_name),
+            )?;
+            remove_dir_if_exists(
+                &nanomonsv
+                    .config
+                    .nanomonsv_output_dir(&nanomonsv.id, &config.normal_name),
+            )?;
         }
 
         Ok(nanomonsv)
@@ -214,12 +223,14 @@ impl ShouldRun for NanomonSV {
     /// `true` if the passed VCF does not exist or is older than any input BAM.
     fn should_run(&self) -> bool {
         let passed_vcf = self.config.nanomonsv_passed_vcf(&self.id);
-        let mrd_out_dir = self.config.nanomonsv_output_dir(&self.id, "mrd");
-        let mrd_out_prefix = format!("{}/{}_mrd", mrd_out_dir, self.id);
-        let mrd_info_vcf = format!("{mrd_out_prefix}.bp_info.sorted.bed.gz");
+        let norm_out_dir = self
+            .config
+            .nanomonsv_output_dir(&self.id, &self.config.normal_name);
+        let norm_out_prefix = format!("{}/{}_{}", norm_out_dir, self.id, self.config.normal_name);
+        let norm_info_vcf = format!("{norm_out_prefix}.bp_info.sorted.bed.gz");
 
         let result = [
-            is_file_older(&mrd_info_vcf, &self.config.normal_bam(&self.id), true).unwrap_or(true),
+            is_file_older(&norm_info_vcf, &self.config.normal_bam(&self.id), true).unwrap_or(true),
             is_file_older(&passed_vcf, &self.config.tumoral_bam(&self.id), true).unwrap_or(true),
         ]
         .iter()
@@ -264,8 +275,12 @@ impl Run for NanomonSV {
         let diag_bam = self.config.tumoral_bam(&self.id);
         let normal_bam = self.config.normal_bam(&self.id);
 
-        let diag_out_dir = self.config.nanomonsv_output_dir(&self.id, &self.config.tumoral_name);
-        let normal_out_dir = self.config.nanomonsv_output_dir(&self.id, &self.config.normal_name);
+        let diag_out_dir = self
+            .config
+            .nanomonsv_output_dir(&self.id, &self.config.tumoral_name);
+        let normal_out_dir = self
+            .config
+            .nanomonsv_output_dir(&self.id, &self.config.normal_name);
         let vcf_passed = self.config.nanomonsv_passed_vcf(&self.id);
 
         fs::create_dir_all(&diag_out_dir)?;
@@ -282,7 +297,8 @@ impl Run for NanomonSV {
         )?;
 
         let diag_out_prefix = format!("{}/{}_{}", diag_out_dir, self.id, self.config.tumoral_name);
-        let normal_out_prefix = format!("{}/{}_{}", normal_out_dir, self.id, self.config.normal_name);
+        let normal_out_prefix =
+            format!("{}/{}_{}", normal_out_dir, self.id, self.config.normal_name);
 
         let diag_result_vcf = format!("{diag_out_prefix}.nanomonsv.result.vcf");
         let normal_result_vcf = format!("{normal_out_prefix}.nanomonsv.result.vcf");
@@ -291,9 +307,12 @@ impl Run for NanomonSV {
             info!("Nanomonsv get from normal bam: {normal_bam}.");
             let report = nanomonsv_get(&normal_bam, &normal_out_prefix, None, None, &self.config)
                 .context(format!(
-                    "Error while running NanomonSV get for {normal_result_vcf}"
-                ))?;
-            report.save_to_file(format!("{}/nanomonsv_get_{}_", self.log_dir, self.config.normal_name))?;
+                "Error while running NanomonSV get for {normal_result_vcf}"
+            ))?;
+            report.save_to_file(format!(
+                "{}/nanomonsv_get_{}_",
+                self.log_dir, self.config.normal_name
+            ))?;
         } else {
             debug!(
                 "NanomonSV `get` results already exists for {} normal, skipping execution.",
@@ -313,7 +332,10 @@ impl Run for NanomonSV {
             .context(format!(
                 "Error while running NanomonSV get for {diag_result_vcf}"
             ))?;
-            report.save_to_file(format!("{}/nanomonsv_get_{}_", self.log_dir, self.config.tumoral_name))?;
+            report.save_to_file(format!(
+                "{}/nanomonsv_get_{}_",
+                self.log_dir, self.config.tumoral_name
+            ))?;
         } else {
             debug!(
                 "NanomonSV `get` results already exists for {} tumoral, skipping execution.",
@@ -322,6 +344,9 @@ impl Run for NanomonSV {
         }
 
         if !Path::new(&vcf_passed).exists() {
+            // Nanomonsv BUG v0.8.0
+            patch_simple_repeat_header(&diag_result_vcf)
+                .context("Failed to patch Header file for Simple Repeat filter tag")?;
             let mut keep =
                 BcftoolsKeepPass::from_config(&self.config, &diag_result_vcf, &vcf_passed);
             let report =
@@ -654,29 +679,36 @@ pub fn nanomonsv_parse(
 /// # Arguments
 /// * `id` - Sample ID
 /// * `diag_bam` - Path to diagnostic (tumoral) BAM file
-/// * `mrd_bam` - Path to matched normal BAM file
+/// * `norm_bam` - Path to matched normal BAM file
 /// * `diag_out_dir` - Output directory for diagnostic BAM
-/// * `mrd_out_dir` - Output directory for MRD BAM
+/// * `norm_out_dir` - Output directory for MRD BAM
 /// * `config` - Shared pipeline configuration
 /// * `log_dir` - Log output directory
 fn somatic_parse(
     id: &str,
     diag_bam: &str,
-    mrd_bam: &str,
+    norm_bam: &str,
     diag_out_dir: &str,
-    mrd_out_dir: &str,
+    norm_out_dir: &str,
     config: &Config,
     log_dir: &str,
 ) -> anyhow::Result<()> {
-    let diag_out_prefix = format!("{diag_out_dir}/{id}_diag");
-    let mrd_out_prefix = format!("{mrd_out_dir}/{id}_mrd");
+    let tumoral_out_prefix = format!("{diag_out_dir}/{id}_diag");
+    let norm_out_prefix = format!("{norm_out_dir}/{id}_norm"); // TODO: will break if norm
+                                                               // name change
 
-    let diag_info_vcf = format!("{diag_out_prefix}.bp_info.sorted.bed.gz");
-    let mrd_info_vcf = format!("{mrd_out_prefix}.bp_info.sorted.bed.gz");
+    let diag_info_vcf = format!("{tumoral_out_prefix}.bp_info.sorted.bed.gz");
+    let norm_info_vcf = format!("{norm_out_prefix}.bp_info.sorted.bed.gz");
 
     let threads_handles = vec![
-        spawn_parse_thread(diag_bam, &diag_out_prefix, config, log_dir, &diag_info_vcf)?,
-        spawn_parse_thread(mrd_bam, &mrd_out_prefix, config, log_dir, &mrd_info_vcf)?,
+        spawn_parse_thread(
+            diag_bam,
+            &tumoral_out_prefix,
+            config,
+            log_dir,
+            &diag_info_vcf,
+        )?,
+        spawn_parse_thread(norm_bam, &norm_out_prefix, config, log_dir, &norm_info_vcf)?,
     ];
 
     for handle in threads_handles {
@@ -767,7 +799,7 @@ pub fn nanomonsv_get(
         config: config.clone(),
         job_args: args,
         threads: config.nanomonsv_threads,
-        memory: Some(40),
+        memory: Some(20),
     };
     run!(config, &mut runner)
 }
@@ -779,8 +811,8 @@ pub fn nanomonsv_get(
 /// # Errors
 /// Returns an error if directory traversal, filtering, or concatenation fails.
 pub fn nanomonsv_create_pon(config: &Config, pon_path: &str) -> anyhow::Result<()> {
-    let mut passed_mrd = Vec::new();
-    for mrd_dir in find_nanomonsv_dirs(
+    let mut passed_normal = Vec::new();
+    for normal_dir in find_nanomonsv_dirs(
         &PathBuf::from(&config.result_dir),
         &config.normal_name,
         0,
@@ -790,25 +822,29 @@ pub fn nanomonsv_create_pon(config: &Config, pon_path: &str) -> anyhow::Result<(
         let mut passed_csi = None;
         let mut result = None;
 
-        for entry in fs::read_dir(&mrd_dir)
-            .with_context(|| format!("Failed to read directory: {}", mrd_dir.display()))?
+        for entry in fs::read_dir(&normal_dir)
+            .with_context(|| format!("Failed to read directory: {}", normal_dir.display()))?
         {
-            let entry =
-                entry.with_context(|| format!("Failed to read entry in: {}", mrd_dir.display()))?;
+            let entry = entry
+                .with_context(|| format!("Failed to read entry in: {}", normal_dir.display()))?;
             let file_name = entry.file_name().to_string_lossy().to_string();
             let path = entry.path();
 
-            if file_name.ends_with("_mrd_nanomonsv_PASSED.vcf.gz") {
+            if file_name.ends_with(&format!("_{}_nanomonsv_PASSED.vcf.gz", config.normal_name)) {
                 passed = Some(path);
-            } else if file_name.ends_with("_mrd_nanomonsv_PASSED.vcf.gz.csi") {
+            } else if file_name.ends_with(&format!(
+                "_{}_nanomonsv_PASSED.vcf.gz.csi",
+                config.normal_name
+            )) {
                 passed_csi = Some(path);
-            } else if file_name.ends_with("_mrd.nanomonsv.result.vcf") {
+            } else if file_name.ends_with(&format!("_{}.nanomonsv.result.vcf", config.normal_name))
+            {
                 result = Some(path);
             }
         }
 
         match (result, passed.clone(), passed_csi) {
-            (None, None, None) => error!("No result in {}", mrd_dir.display()),
+            (None, None, None) => error!("No result in {}", normal_dir.display()),
             (Some(p), None, None) => {
                 let output = replace_filename_suffix(
                     &p,
@@ -821,19 +857,19 @@ pub fn nanomonsv_create_pon(config: &Config, pon_path: &str) -> anyhow::Result<(
                 if let Err(r) = run!(config, &mut keep) {
                     error!("{r}");
                 } else {
-                    passed_mrd.push(output);
+                    passed_normal.push(output);
                 }
             }
             (Some(_), Some(p), None) => warn!("Processing csi for {}", p.display()),
-            (Some(_), Some(p), Some(_)) => passed_mrd.push(p),
+            (Some(_), Some(p), Some(_)) => passed_normal.push(p),
             _ => {} // All files found
         }
     }
 
-    println!("{} vcf to concat", passed_mrd.len());
+    println!("{} vcf to concat", passed_normal.len());
     let mut concat = BcftoolsConcat::from_config(
         config,
-        passed_mrd
+        passed_normal
             .iter()
             .map(|p| p.to_string_lossy().to_string())
             .collect(),
@@ -848,7 +884,7 @@ pub fn nanomonsv_create_pon(config: &Config, pon_path: &str) -> anyhow::Result<(
 ///
 /// # Arguments
 /// * `root` - Starting directory for search
-/// * `time_point` - Time point identifier (e.g., "mrd")
+/// * `time_point` - Time point identifier (e.g., "norm")
 /// * `depth` - Current recursion depth
 /// * `max_depth` - Maximum recursion depth
 pub fn find_nanomonsv_dirs(
@@ -896,9 +932,9 @@ pub fn find_nanomonsv_dirs(
 ///
 /// # Example
 /// ```
-/// let path = Path::new("/data/sample_mrd.nanomonsv.result.vcf");
-/// let new_path = replace_filename_suffix(path, "_mrd.nanomonsv.result.vcf", "_mrd_PASSED.vcf.gz");
-/// // new_path: /data/sample_mrd_PASSED.vcf.gz
+/// let path = Path::new("/data/sample_norm.nanomonsv.result.vcf");
+/// let new_path = replace_filename_suffix(path, "_norm.nanomonsv.result.vcf", "_norm_PASSED.vcf.gz");
+/// // new_path: /data/sample_norm_PASSED.vcf.gz
 /// ```
 pub fn replace_filename_suffix(path: &Path, from: &str, to: &str) -> PathBuf {
     let file_name = path.file_name().and_then(|s| s.to_str()).unwrap_or("");
@@ -908,6 +944,137 @@ pub fn replace_filename_suffix(path: &Path, from: &str, to: &str) -> PathBuf {
     path.with_file_name(new_file_name)
 }
 
+/// Patch a VCF **in place** by ensuring the FILTER `Simple_repeat`
+/// is defined in the header.
+///
+/// This function is a thin, safe wrapper around
+/// [`patch_simple_repeat_filter_header`]. It writes a patched copy
+/// to a temporary file in the **same directory** and then atomically
+/// replaces the original file using `rename(2)`.
+///
+/// Guarantees:
+/// - No partial writes to the original file
+/// - Atomic replacement on POSIX filesystems
+/// - Original file remains intact if an error occurs
+///
+/// # Parameters
+/// - `vcf_path`: path to the `.vcf` file to patch
+///
+/// # Errors
+/// Returns any I/O error encountered while patching or renaming.
+///
+/// # Example
+/// ```no_run
+/// patch_simple_repeat_filter_header_in_place("input.vcf")?;
+/// ```
+pub fn patch_simple_repeat_header<P: AsRef<Path>>(vcf_path: P) -> anyhow::Result<()> {
+    let vcf_path = vcf_path.as_ref();
+
+    let mut tmp: PathBuf = vcf_path.into();
+    tmp.set_extension("vcf.tmp");
+
+    patch_simple_repeat_filter_header(vcf_path, &tmp)?;
+
+    fs::rename(&tmp, vcf_path)?;
+
+    Ok(())
+}
+
+/// Ensure the VCF header defines the FILTER `Simple_repeat`.
+///
+/// This function reads a **plain text VCF** (not gzipped) and writes a patched
+/// copy where the following header line is inserted **once**, immediately
+/// before the `#CHROM` line, **if and only if** it is missing:
+///
+/// ```text
+/// ##FILTER=<ID=Simple_repeat,Description="Simple repeat region">
+/// ```
+///
+/// Behaviour:
+/// - Header lines (`##...`) are copied verbatim.
+/// - If a `##FILTER=<ID=Simple_repeat,...>` line already exists, the file is
+///   copied unchanged.
+/// - Variant records are streamed unchanged (no buffering of the body).
+/// - If `#CHROM` is encountered without the FILTER being defined, the line is
+///   injected just before it.
+/// - If no `#CHROM` line exists, the file is passed through unchanged.
+///
+/// This patch is required to avoid `bcftools view -i 'FILTER="PASS"'` failing
+/// on VCFs that contain `FILTER=Simple_repeat` in records but lack the
+/// corresponding header definition.
+///
+/// # Parameters
+/// - `input_vcf`: path to the input `.vcf`
+/// - `output_vcf`: path to the patched `.vcf`
+///
+/// # Errors
+/// Returns any I/O error encountered while reading or writing the file.
+///
+/// # Example
+/// ```no_run
+/// patch_simple_repeat_filter_header(
+///     "input.vcf",
+///     "input.header_fixed.vcf",
+/// )?;
+/// ```
+///
+/// # Notes
+/// - This function intentionally does **not** validate records.
+/// - No external tools (`bcftools`) are invoked.
+/// - The function is deterministic and idempotent.
+pub fn patch_simple_repeat_filter_header<P: AsRef<Path>>(
+    input_vcf: P,
+    output_vcf: P,
+) -> anyhow::Result<()> {
+    let input_vcf = input_vcf.as_ref();
+    let output_vcf = output_vcf.as_ref();
+
+    let mut reader = BufReader::new(File::open(input_vcf)?);
+    let mut writer = BufWriter::new(File::create(output_vcf)?);
+
+    const NEEDLE: &str = "##FILTER=<ID=Simple_repeat";
+    const INSERT_LINE: &str = "##FILTER=<ID=Simple_repeat,Description=\"Simple repeat region\">\n";
+
+    let mut line = String::new();
+    let mut saw_simple_repeat = false;
+    let mut inserted = false;
+
+    loop {
+        line.clear();
+        let n = reader.read_line(&mut line)?;
+        if n == 0 {
+            break;
+        }
+
+        if line.starts_with("##") {
+            if line.starts_with(NEEDLE) {
+                saw_simple_repeat = true;
+            }
+            writer.write_all(line.as_bytes())?;
+            continue;
+        }
+
+        if line.starts_with("#CHROM") {
+            if !saw_simple_repeat {
+                writer.write_all(INSERT_LINE.as_bytes())?;
+                inserted = true;
+            }
+            writer.write_all(line.as_bytes())?;
+
+            // Copy the rest of the file unchanged
+            std::io::copy(&mut reader, &mut writer)?;
+            break;
+        }
+
+        // If weird pre-header content exists, just pass through
+        writer.write_all(line.as_bytes())?;
+    }
+
+    writer.flush()?;
+    let _ = inserted;
+    Ok(())
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;

+ 87 - 0
src/commands/modkit.rs

@@ -169,6 +169,81 @@ where
     T::from_str(value).context(format!("Failed to parse {} value", key))
 }
 
+pub struct ModkitPileupMarlin {
+    id: String,
+    time: String,
+    config: Config,
+}
+impl InitializeSolo for ModkitPileupMarlin {
+    fn initialize(id: &str, time: &str, config: &Config) -> anyhow::Result<Self> {
+        Ok(Self {
+            id: id.to_string(),
+            time: time.to_string(),
+
+            config: config.clone(),
+        })
+    }
+}
+
+impl Command for ModkitPileupMarlin {
+    fn cmd(&self) -> String {
+        let hp_bam = self.config.solo_haplotagged_bam(&self.id, &self.time);
+        let bam = self.config.solo_bam(&self.id, &self.time);
+        let out = format!("{}/{}_{}_MARLIN.bed", self.config.solo_dir(&self.id, &self.time), &self.id, &self.time);
+        let bam = if Path::new(&hp_bam).exists() {
+            hp_bam
+        } else {
+            bam
+        };
+        format!(
+            "{modkit} pileup {bam} {out} -t {threads} --combine-mods --only-tabs --ref {reference} --include-bed {marlin_bed}",
+            modkit = self.config.modkit_bin,
+            bam = bam,
+            out = out,
+            threads = self.config.modkit_summary_threads,
+            reference = self.config.reference,
+            marlin_bed = self.config.marlin_bed,
+        )
+    }
+}
+
+impl LocalRunner for ModkitPileupMarlin {}
+impl SlurmRunner for ModkitPileupMarlin {
+    fn slurm_args(&self) -> Vec<String> {
+        SlurmParams {
+            job_name: Some(format!("modkit_pileup_{}_{}", self.id, self.time)),
+            cpus_per_task: Some(self.config.modkit_summary_threads.into()),
+            mem: Some("40G".into()),
+            partition: Some("shortq".into()),
+            gres: None,
+        }
+        .to_args()
+    }
+}
+impl SbatchRunner for ModkitPileupMarlin {
+    fn slurm_params(&self) -> SlurmParams {
+        SlurmParams {
+            job_name: Some(format!("modkit_pileup_{}_{}", self.id, self.time)),
+            cpus_per_task: Some(self.config.modkit_summary_threads.into()),
+            mem: Some("40G".into()),
+            partition: Some("shortq".into()),
+            gres: None,
+        }
+    }
+}
+
+impl ModkitPileupMarlin {
+    pub fn run(&mut self) -> anyhow::Result<()> {
+        let report = run!(&self.config, self)?;
+        let log_prefix = format!("{}/{}/log/modkit_pileup_{}", self.config.result_dir, self.id, self.time);
+        report.save_to_file(&log_prefix).context(format!(
+            "Error while writing Severus logs into {log_prefix}"
+        ))?;
+        Ok(())
+    }
+}
+
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -191,6 +266,18 @@ mod tests {
         caller.run()?;
 
 
+        Ok(())
+    }
+
+    #[test]
+    fn modkit_marlin() -> anyhow::Result<()> {
+        test_init();
+        let config = Config::default();
+
+        let mut caller = ModkitPileupMarlin::initialize("DUMCO", "diag", &config)?;
+        caller.run()?;
+
+
         Ok(())
     }
 }

+ 15 - 0
src/config.rs

@@ -258,6 +258,9 @@ pub struct Config {
     /// Placeholders: `{result_dir}`, `{id}`, `{time}`.
     pub severus_solo_output_dir: String,
 
+    // === MARLIN ===
+    pub marlin_bed: String,
+
     // === Bcftools configuration ===
     /// Path to Bcftools binary.
     pub bcftools_bin: String,
@@ -532,6 +535,18 @@ impl Config {
         )
     }
 
+    /// Tumor haplotagged BAM.
+    pub fn solo_haplotagged_bam(&self, id: &str, time: &str) -> String {
+        format!(
+            "{}/{}_{}_{}_{}.bam",
+            self.solo_dir(id, time),
+            id,
+            time,
+            self.reference_name,
+            self.haplotagged_bam_tag_name
+        )
+    }
+
     /// Tumor haplotagged BAM.
     pub fn tumoral_haplotagged_bam(&self, id: &str) -> String {
         format!(