Thomas 1 month ago
parent
commit
3e63458572
6 changed files with 197 additions and 8 deletions
  1. 14 0
      pandora-config.example.toml
  2. 4 0
      src/config.rs
  3. 15 7
      src/de_novo/de_novo_pipe.rs
  4. 1 0
      src/de_novo/mod.rs
  5. 130 0
      src/de_novo/wtdbg2.rs
  6. 33 1
      src/io/bam.rs

+ 14 - 0
pandora-config.example.toml

@@ -501,6 +501,20 @@ minimap2_threads = 16
 # Can be reduced to 8G for contig→contig or local assembly realignment.
 minimap2_slurm_mem = "32G"
 
+#######################################
+# wtdbg2
+#######################################
+
+# Path to the wtdbg2.pl wrapper script.
+# Handles both assembly (wtdbg2) and consensus (wtpoa-cns) in one call.
+wtdbg2_bin = "/home/t_steimle/somatic_pipe_tools/wtdbg2/wtdbg2.pl"
+
+# Threads for wtdbg2 + wtpoa-cns. 8 is sufficient for local assembly.
+wtdbg2_threads = 8
+
+# Memory for SLURM. wtdbg2 is lightweight — 16G is ample for local assembly.
+wtdbg2_slurm_mem = "16G"
+
 #######################################
 # Marlin
 #######################################

+ 4 - 0
src/config.rs

@@ -541,6 +541,10 @@ pub struct Config {
     pub medaka_threads: u8,
     pub medaka_slurm_mem: String,
     pub medaka_model: String,
+
+    pub wtdbg2_bin: String,
+    pub wtdbg2_threads: u8,
+    pub wtdbg2_slurm_mem: String,
 }
 
 #[derive(Debug, Clone, Serialize, Deserialize)]

+ 15 - 7
src/de_novo/de_novo_pipe.rs

@@ -227,7 +227,7 @@ pub fn run_local_assembly_iterative(
         );
 
         let reads_path = round_dir.join("reads.fastq");
-        let (assembler, polisher) = builder.build(reads_path, &round_dir);
+        let (assembler, polisher) = builder.build(reads_path, &round_dir)?;
 
         results = run_local_assembly(
             &current_records,
@@ -318,11 +318,19 @@ pub type BoxedAssembler = Box<dyn Assembler>;
 pub type BoxedPolisher = Box<dyn Polisher>;
 
 pub trait LocalAssemblyBuilder {
-    fn build(&self, reads_path: PathBuf, round_dir: &Path) -> (BoxedAssembler, BoxedPolisher);
+    fn build(
+        &self,
+        reads_path: PathBuf,
+        round_dir: &Path,
+    ) -> anyhow::Result<(BoxedAssembler, BoxedPolisher)>;
 }
 
 impl LocalAssemblyBuilder for FlyeMedakaBuilder {
-    fn build(&self, reads_path: PathBuf, round_dir: &Path) -> (BoxedAssembler, BoxedPolisher) {
+    fn build(
+        &self,
+        reads_path: PathBuf,
+        round_dir: &Path,
+    ) -> anyhow::Result<(BoxedAssembler, BoxedPolisher)> {
         let assembler = FlyeAssembler::from_config(
             &self.config,
             reads_path.clone(),
@@ -337,7 +345,7 @@ impl LocalAssemblyBuilder for FlyeMedakaBuilder {
             round_dir.join("medaka"),
         );
 
-        (Box::new(assembler), Box::new(polisher))
+        Ok((Box::new(assembler), Box::new(polisher)))
     }
 }
 
@@ -425,7 +433,7 @@ mod tests {
     use env_logger::init;
     use log::info;
 
-    use crate::io::bam::fetch_primary_records;
+    use crate::{de_novo::wtdbg2::Wtdbg2MedakaBuilder, io::bam::fetch_primary_records};
 
     use super::*;
 
@@ -449,7 +457,7 @@ mod tests {
 
         let bam_path = PathBuf::from(config.tumoral_bam(id));
         let work_dir = PathBuf::from(format!(
-            "{}/{id}/{}/asm",
+            "{}/{id}/{}/asm_wtdbg2",
             config.result_dir, config.tumoral_name
         ));
         if work_dir.exists() {
@@ -479,7 +487,7 @@ mod tests {
         let results = run_local_assembly_iterative(
             &bam_path,
             records,
-            &FlyeMedakaBuilder {
+            &Wtdbg2MedakaBuilder {
                 config: config.clone(),
             },
             work_dir.clone(),

+ 1 - 0
src/de_novo/mod.rs

@@ -1,6 +1,7 @@
 pub mod de_novo_pipe;
 pub mod flye;
 pub mod medaka;
+pub mod wtdbg2;
 
 use crate::commands::{Command as JobCommand, LocalRunner, SbatchRunner, SlurmRunner};
 use std::path::{Path, PathBuf};

+ 130 - 0
src/de_novo/wtdbg2.rs

@@ -0,0 +1,130 @@
+use std::path::{Path, PathBuf};
+
+use anyhow::Context;
+
+use crate::{
+    commands::{Command as JobCommand, LocalRunner, SbatchRunner, SlurmParams, SlurmRunner},
+    config::Config,
+    de_novo::{
+        de_novo_pipe::{BoxedAssembler, BoxedPolisher, LocalAssemblyBuilder},
+        medaka::MedakaPolisher,
+        Assembler,
+    },
+};
+
+pub struct Wtdbg2Assembler {
+    pub reads: PathBuf,
+    pub out_dir: PathBuf,
+    pub genome_size: String, // e.g. "500k"
+    pub threads: u8,
+    pub wtdbg2_bin: String,
+    pub slurm_mem: String,
+    pub prefix: PathBuf, // out_dir/asm
+}
+
+impl Wtdbg2Assembler {
+    pub fn from_config(
+        config: &Config,
+        reads: PathBuf,
+        out_dir: PathBuf,
+        genome_size: String,
+    ) -> anyhow::Result<Self> {
+        std::fs::create_dir_all(&out_dir)
+            .with_context(|| format!("Cannot create wtdbg2 output dir: {}", out_dir.display()))?;
+        let prefix = out_dir.join("asm");
+        Ok(Self {
+            reads,
+            out_dir,
+            genome_size,
+            threads: config.wtdbg2_threads,
+            wtdbg2_bin: config.wtdbg2_bin.clone(),
+            slurm_mem: config.wtdbg2_slurm_mem.clone(),
+            prefix,
+        })
+    }
+}
+
+impl JobCommand for Wtdbg2Assembler {
+    fn cmd(&self) -> String {
+        let wtdbg2_dir = PathBuf::from(&self.wtdbg2_bin)
+            .parent()
+            .map(|p| p.display().to_string())
+            .unwrap_or_default();
+
+        format!(
+            r#"set -euo pipefail
+PATH={wtdbg2_dir}:${{PATH}} && {wtdbg2} -P -x ont -g {genome_size} -t {threads} -o {prefix} {reads}
+{wtdbg2_dir}/wtpoa-cns -t {threads} -i {prefix}.ctg.lay.gz -fo {fasta}
+"#,
+            wtdbg2_dir = wtdbg2_dir,
+            wtdbg2 = self.wtdbg2_bin,
+            genome_size = self.genome_size,
+            threads = self.threads,
+            prefix = self.prefix.display(),
+            reads = self.reads.display(),
+            fasta = self.assembly_fasta().display(),
+        )
+    }
+}
+
+impl Assembler for Wtdbg2Assembler {
+    fn reads_input(&self) -> &Path {
+        &self.reads
+    }
+    fn output_dir(&self) -> &Path {
+        &self.out_dir
+    }
+    fn assembly_fasta(&self) -> PathBuf {
+        self.prefix.with_extension("asm.cns.fa")
+    }
+    fn assembly_graph(&self) -> Option<PathBuf> {
+        // wtdbg2 produces a .dot graph
+        Some(self.prefix.with_extension("dot"))
+    }
+}
+
+impl SbatchRunner for Wtdbg2Assembler {
+    fn slurm_params(&self) -> SlurmParams {
+        SlurmParams {
+            job_name: Some("wtdbg2".to_string()),
+            cpus_per_task: Some(self.threads.into()),
+            mem: Some(self.slurm_mem.clone()),
+            partition: Some("shortq".to_string()),
+            gres: None,
+        }
+    }
+}
+
+impl SlurmRunner for Wtdbg2Assembler {
+    fn slurm_args(&self) -> Vec<String> {
+        self.slurm_params().to_args()
+    }
+}
+
+impl LocalRunner for Wtdbg2Assembler {}
+
+pub struct Wtdbg2MedakaBuilder {
+    pub config: Config,
+}
+
+impl LocalAssemblyBuilder for Wtdbg2MedakaBuilder {
+    fn build(
+        &self,
+        reads_path: PathBuf,
+        round_dir: &Path,
+    ) -> anyhow::Result<(BoxedAssembler, BoxedPolisher)> {
+        let assembler = Wtdbg2Assembler::from_config(
+            &self.config,
+            reads_path.clone(),
+            round_dir.join("wtdbg2"),
+            "10k".to_string(),
+        )?;
+        let polisher = MedakaPolisher::from_config(
+            &self.config,
+            reads_path,
+            assembler.assembly_fasta(),
+            round_dir.join("medaka"),
+        );
+        Ok((Box::new(assembler), Box::new(polisher)))
+    }
+}

+ 33 - 1
src/io/bam.rs

@@ -1004,6 +1004,32 @@ pub fn fetch_primary_records(
 }
 
 /// Query-name filter for [`fetch_primary_records`].
+pub fn query_aligned_span(record: &rust_htslib::bam::Record) -> (usize, usize) {
+    let cigar = record.cigar();
+
+    let qstart = cigar
+        .iter()
+        .take_while(|c| matches!(c, Cigar::SoftClip(_) | Cigar::HardClip(_)))
+        .filter_map(|c| match c {
+            Cigar::SoftClip(len) => Some(*len as usize),
+            _ => None,
+        })
+        .sum::<usize>();
+
+    let qend = record.seq_len()
+        - cigar
+            .iter()
+            .rev()
+            .take_while(|c| matches!(c, Cigar::SoftClip(_) | Cigar::HardClip(_)))
+            .filter_map(|c| match c {
+                Cigar::SoftClip(len) => Some(*len as usize),
+                _ => None,
+            })
+            .sum::<usize>();
+
+    (qstart, qend)
+}
+
 pub enum QnameFilter {
     /// Retain only records whose query name is in the set.
     Whitelist(HashSet<Vec<u8>>),
@@ -1059,7 +1085,13 @@ pub fn bam_to_aligned_bed<P: AsRef<Path>>(input_bam: P, output_bed: P) -> anyhow
         if end > start {
             let qname = std::str::from_utf8(record.qname())?;
 
-            writeln!(writer, "{chrom}\t{start}\t{end}\t{qname}")?;
+            let (qstart, qend) = query_aligned_span(&record);
+
+            writeln!(
+                writer,
+                "{chrom}\t{start}\t{end}\t{}:{}-{}",
+                qname, qstart, qend
+            )?;
         }
     }