Browse Source

vcf tasks

Thomas 1 year ago
parent
commit
86dfefc30a
6 changed files with 384 additions and 34 deletions
  1. 14 27
      src/callers/clairs.rs
  2. 1 0
      src/callers/mod.rs
  3. 280 0
      src/callers/nanomonsv.rs
  4. 75 4
      src/lib.rs
  5. 13 2
      src/runners.rs
  6. 1 1
      src/vcf.rs

+ 14 - 27
src/callers/clairs.rs

@@ -1,9 +1,6 @@
-use crate::{
-    commands::bcftools::{Bcftools, BcftoolsConfig},
-    runners::DockerRun,
-};
 use std::{fs, path::Path};
 use tracing::info;
+use crate::{commands::bcftools::{Bcftools, BcftoolsConfig}, runners::DockerRun};
 
 #[derive(Debug)]
 pub struct ClairSConfig {
@@ -27,7 +24,7 @@ impl Default for ClairSConfig {
 #[derive(Debug)]
 pub struct ClairS {
     pub id: String,
-    pub result_dir: String,
+    pub output_dir: String,
     pub output_vcf: String,
     pub output_indel: String,
     pub vcf_passed: String,
@@ -40,15 +37,15 @@ pub struct ClairS {
 
 impl ClairS {
     pub fn new(id: &str, diag_bam: &str, mrd_bam: &str, config: ClairSConfig) -> Self {
-        let result_dir = format!("{}/{}/diag/ClairS", config.result_dir, id);
-        let output_vcf = format!("{result_dir}/output.vcf.gz");
-        let output_indel = format!("{result_dir}/indel.vcf.gz");
-        let vcf_passed = format!("{result_dir}/{id}_diag_clairs_PASSED.vcf.gz",);
-        let indel_vcf_passed = format!("{result_dir}/${id}_diag_clairs_indel_PASSED.vcf.gz");
+        let output_dir = format!("{}/{}/diag/ClairS", config.result_dir, id);
+        let output_vcf = format!("{output_dir}/output.vcf.gz");
+        let output_indel = format!("${output_dir}/indel.vcf.gz");
+        let vcf_passed = format!("{output_dir}/{id}_diag_clairs_PASSED.vcf.gz",);
+        let indel_vcf_passed = format!("${output_dir}/${id}_diag_clairs_indel_PASSED.vcf.gz");
 
         Self {
             id: id.to_string(),
-            result_dir,
+            output_dir,
             output_vcf,
             vcf_passed,
             diag_bam: diag_bam.to_string(),
@@ -62,8 +59,8 @@ impl ClairS {
 
     pub fn run(&mut self) {
         // Create out dir
-        if !Path::new(&self.result_dir).exists() {
-            fs::create_dir_all(&self.result_dir).expect("Failed to create output directory");
+        if !Path::new(&self.output_dir).exists() {
+            fs::create_dir_all(&self.output_dir).expect("Failed to create output directory");
         } else {
             info!("ClairS output directory exists");
         }
@@ -76,7 +73,7 @@ impl ClairS {
                 "-v",
                 "/data:/data",
                 "-v",
-                &format!("{}:{}", self.result_dir, self.result_dir),
+                &format!("{}:{}", self.output_dir, self.output_dir),
                 "hkubal/clairs:latest",
                 "/opt/bin/run_clairs",
                 "--tumor_bam_fn",
@@ -92,7 +89,7 @@ impl ClairS {
                 "--enable_indel_calling",
                 "--include_all_ctgs",
                 "--output_dir",
-                &self.result_dir,
+                &self.output_dir,
                 "-s",
                 &format!("{}_diag", self.id),
             ]);
@@ -104,20 +101,10 @@ impl ClairS {
 
         // Keep PASS
         if !Path::new(&self.vcf_passed).exists() {
-            Bcftools::new(
-                &self.output_vcf,
-                &self.vcf_passed,
-                BcftoolsConfig::default(),
-            )
-            .keep_pass();
+            Bcftools::new(&self.output_vcf, &self.vcf_passed, BcftoolsConfig::default()).keep_pass();
         }
         if !Path::new(&self.indel_vcf_passed).exists() {
-            Bcftools::new(
-                &self.output_indel,
-                &self.indel_vcf_passed,
-                BcftoolsConfig::default(),
-            )
-            .keep_pass();
+            Bcftools::new(&self.output_indel, &self.indel_vcf_passed, BcftoolsConfig::default()).keep_pass();
         }
     }
 }

+ 1 - 0
src/callers/mod.rs

@@ -1,3 +1,4 @@
 pub mod deep_variant;
 pub mod clairs;
+pub mod nanomonsv;
 

+ 280 - 0
src/callers/nanomonsv.rs

@@ -0,0 +1,280 @@
+// reference="/data/ref/hs1/chm13v2.0.fa"
+// bcftools="/data/tools/bcftools-1.18/bcftools"
+//
+// DIAG_CASE_DIR="/data/longreads_basic_pipe/${name}/diag"
+// MRD_CASE_DIR="/data/longreads_basic_pipe/${name}/mrd"
+//
+// DIAG_OUTPUT_DIR="${DIAG_CASE_DIR}/nanomonsv"
+// MRD_OUTPUT_DIR="${MRD_CASE_DIR}/nanomonsv"
+//
+// DIAG_BAM="${DIAG_CASE_DIR}/${name}_diag_hs1.bam"
+// MRD_BAM="${MRD_CASE_DIR}/${name}_mrd_hs1.bam"
+//
+// DIAG_OUT_PREFIX="${DIAG_OUTPUT_DIR}/${name}_diag"
+// MRD_OUT_PREFIX="${MRD_OUTPUT_DIR}/${name}_mrd"
+//
+// if [ ! -d "$DIAG_OUTPUT_DIR" ]; then
+//   mkdir "$DIAG_OUTPUT_DIR"
+// fi
+//
+// if [ ! -d "$MRD_OUTPUT_DIR" ]; then
+//   mkdir "$MRD_OUTPUT_DIR"
+// fi
+//
+// # PARSE
+// if [ ! -f "${DIAG_OUT_PREFIX}.bp_info.sorted.bed.gz" ]; then
+//   nanomonsv parse --reference_fasta "$reference" "$DIAG_BAM" "$DIAG_OUT_PREFIX"
+// fi
+//
+// if [ ! -f "${MRD_OUT_PREFIX}.bp_info.sorted.bed.gz" ]; then
+//   echo "[pipe] NanomonSV parse ${MRD_BAM}"
+//   nanomonsv parse --reference_fasta "$reference" "$MRD_BAM" "$MRD_OUT_PREFIX"
+// fi
+//
+// # GET
+// if [ ! -f "${DIAG_OUT_PREFIX}.nanomonsv.result.vcf" ]; then
+//   nanomonsv get --control_prefix "$MRD_OUT_PREFIX" \
+//     --control_bam "$MRD_BAM" \
+//     --processes 155 \
+//     "$DIAG_OUT_PREFIX" \
+//     "$DIAG_BAM" "$reference"
+// fi
+//
+// # GET MRD
+// if [ ! -f "${MRD_OUT_PREFIX}.nanomonsv.result.vcf" ]; then
+//   nanomonsv get \
+//     --processes 155 \
+//     "$MRD_OUT_PREFIX" \
+//     "$MRD_BAM" "$reference"
+// fi
+//
+// vcf_passed="${DIAG_OUT_PREFIX}_nanomonsv_PASSED.vcf.gz"
+// if [ ! -f "$vcf_passed" ]; then
+//   $bcftools sort "${DIAG_OUT_PREFIX}.nanomonsv.result.vcf" | \
+//     $bcftools view -s "TUMOR" --write-index --threads 20 \
+//     -i "FILTER='PASS'" \
+//     -o "$vcf_passed"
+// fi
+
+use std::{
+    fs,
+    io::{BufRead, BufReader},
+    path::Path,
+    process::{Command, Stdio},
+    sync::{Arc, Mutex},
+    thread,
+};
+
+use log::info;
+
+use crate::commands::bcftools::{Bcftools, BcftoolsConfig};
+
+#[derive(Debug)]
+pub struct NanomonSVConfig {
+    pub bin: String,
+    pub result_dir: String,
+    pub reference: String,
+    pub thread: u8,
+}
+
+impl Default for NanomonSVConfig {
+    fn default() -> Self {
+        Self {
+            bin: "nanomonsv".to_string(),
+            result_dir: "/data/longreads_basic_pipe".to_string(),
+            reference: "/data/ref/hs1/chm13v2.0.fa".to_string(),
+            thread: 155,
+        }
+    }
+}
+
+#[derive(Debug)]
+pub struct NanomonSV {
+    pub id: String,
+    pub diag_bam: String,
+    pub mrd_bam: String,
+    pub diag_out_dir: String,
+    pub mrd_out_dir: String,
+    pub log: String,
+    pub config: NanomonSVConfig,
+}
+
+impl NanomonSV {
+    pub fn new(id: &str, diag_bam: &str, mrd_bam: &str, config: NanomonSVConfig) -> Self {
+        let diag_out_dir = format!("{}/{id}/diag/nanomonsv", &config.result_dir);
+        let mrd_out_dir = format!("{}/{id}/mrd/nanomonsv", &config.result_dir);
+
+        NanomonSV {
+            id: id.to_string(),
+            diag_bam: diag_bam.to_string(),
+            mrd_bam: mrd_bam.to_string(),
+            diag_out_dir,
+            mrd_out_dir,
+            log: String::default(),
+            config,
+        }
+    }
+
+    pub fn run(&mut self) {
+        // Create direcories
+        if !Path::new(&self.diag_out_dir).exists() {
+            fs::create_dir_all(&self.diag_out_dir).unwrap();
+        }
+        if !Path::new(&self.mrd_out_dir).exists() {
+            fs::create_dir_all(&self.mrd_out_dir).unwrap();
+        }
+
+        // Parse
+        let diag_out_prefix = format!("{}/{}_diag", self.diag_out_dir, self.id);
+        let mrd_out_prefix = format!("{}/{}_mrd", self.mrd_out_dir, self.id);
+
+        let diag_info_vcf = format!("{diag_out_prefix}.bp_info.sorted.bed.gz");
+        let mrd_info_vcf = format!("{mrd_out_prefix}.bp_info.sorted.bed.gz");
+
+        let log: Arc<Mutex<String>> = Arc::new(Mutex::new(String::default()));
+
+        let mut threads_handles = Vec::new();
+        if !Path::new(&diag_info_vcf).exists() {
+            let bin = self.config.bin.clone();
+            let reference = self.config.reference.clone();
+            let diag_bam = self.diag_bam.clone();
+            let log = log.clone();
+            let diag_out_prefix = diag_out_prefix.clone();
+
+            let handle = thread::spawn(move || {
+                let mut child = Command::new(bin)
+                    .args([
+                        "parse",
+                        "--reference_fasta",
+                        &reference,
+                        &diag_bam,
+                        &diag_out_prefix,
+                    ])
+                    .stdout(Stdio::piped())
+                    .stderr(Stdio::inherit())
+                    .spawn()
+                    .expect("Failed to spawn NanomonSV parse");
+
+                if let Some(stdout) = child.stdout.take() {
+                    let reader = BufReader::new(stdout);
+                    for line in reader.lines().map_while(Result::ok) {
+                        info!("{}", line);
+                        let mut log_lock = log.lock().unwrap();
+                        log_lock.push_str(&line);
+                        log_lock.push('\n');
+                    }
+                }
+            });
+            threads_handles.push(handle);
+        }
+
+        if !Path::new(&mrd_info_vcf).exists() {
+            let bin = self.config.bin.clone();
+            let reference = self.config.reference.clone();
+            let mrd_bam = self.diag_bam.clone();
+            let log = log.clone();
+            let mrd_out_prefix = mrd_out_prefix.clone();
+
+            let handle = thread::spawn(move || {
+                let mut child = Command::new(bin)
+                    .args([
+                        "parse",
+                        "--reference_fasta",
+                        &reference,
+                        &mrd_bam,
+                        &mrd_out_prefix,
+                    ])
+                    .stdout(Stdio::inherit())
+                    .stderr(Stdio::piped())
+                    .spawn()
+                    .expect("Failed to spawn NanomonSV parse");
+
+                if let Some(stdout) = child.stderr.take() {
+                    let reader = BufReader::new(stdout);
+                    for line in reader.lines().map_while(Result::ok) {
+                        info!("{}", line);
+                        let mut log_lock = log.lock().unwrap();
+                        log_lock.push_str(&line);
+                        log_lock.push('\n');
+                    }
+                }
+                child.wait().unwrap();
+            });
+            threads_handles.push(handle);
+        }
+
+        // Wait for all threads to finish
+        for handle in threads_handles {
+            handle.join().expect("Thread panicked");
+        }
+
+        // Get
+        let diag_result_vcf = format!("{diag_out_prefix}.nanomonsv.result.vcf");
+        let mrd_result_vcf = format!("{mrd_out_prefix}.nanomonsv.result.vcf");
+
+        if !Path::new(&mrd_result_vcf).exists() {
+            let mut child = Command::new(&self.config.bin)
+                .args([
+                    "get",
+                    "--process",
+                    &self.config.thread.to_string(),
+                    &mrd_out_prefix,
+                    &self.mrd_bam,
+                    &self.config.reference,
+                ])
+                .stdout(Stdio::inherit())
+                .stderr(Stdio::piped())
+                .spawn()
+                .expect("Failed to spawn nanomonsv");
+
+            if let Some(stdout) = child.stderr.take() {
+                let reader = BufReader::new(stdout);
+                for line in reader.lines().map_while(Result::ok) {
+                    info!("{line}");
+                    let mut log_lock = log.lock().unwrap();
+                    log_lock.push_str(&line);
+                    log_lock.push('\n');
+                }
+            }
+
+            child.wait().unwrap();
+        }
+
+        if !Path::new(&diag_result_vcf).exists() {
+            let mut child = Command::new(&self.config.bin)
+                .args([
+                    "get",
+                    "--control_prefix",
+                    &mrd_out_prefix,
+                    "--control_bam",
+                    &self.mrd_bam,
+                    "--process",
+                    &self.config.thread.to_string(),
+                    &diag_out_prefix,
+                    &self.diag_bam,
+                    &self.config.reference,
+                ])
+                .stdout(Stdio::inherit())
+                .stderr(Stdio::piped())
+                .spawn()
+                .expect("Failed to spawn nanomonsv");
+
+            if let Some(stdout) = child.stderr.take() {
+                let reader = BufReader::new(stdout);
+                for line in reader.lines().map_while(Result::ok) {
+                    info!("{line}");
+                    let mut log_lock = log.lock().unwrap();
+                    log_lock.push_str(&line);
+                    log_lock.push('\n');
+                }
+            }
+
+            child.wait().unwrap();
+        }
+
+        let vcf_passed = format!("{diag_out_prefix}_nanomonsv_PASSED.vcf.gz");
+        if !Path::new(&vcf_passed).exists() {
+            Bcftools::new(&diag_result_vcf, &vcf_passed, BcftoolsConfig::default()).keep_pass();
+        }
+    }
+}

+ 75 - 4
src/lib.rs

@@ -1,12 +1,15 @@
 use std::{collections::HashMap, path::PathBuf};
 
 use bam::BamCollection;
+use callers::{clairs::{ClairS, ClairSConfig}, deep_variant::{DeepVariant, DeepVariantConfig}, nanomonsv::{NanomonSV, NanomonSVConfig}};
 use commands::dorado::{Dorado, Run};
 use config::Config;
 use log::{info, warn};
 use pod5::{FlowCellCase, Pod5Collection, Pod5Type};
 use vcf::VcfCollection;
 
+use crate::vcf::Vcf;
+
 pub mod bam;
 pub mod commands;
 pub mod config;
@@ -84,6 +87,38 @@ impl Collections {
             .for_each(|data| self.tasks.push(CollectionsTasks::DemuxAlign(data)));
 
         // Variant calling
+        self.vcf.sort_by_id();
+        let mut vcf_by_ids: HashMap<String, Vec<Vcf>> = HashMap::new();
+        self.vcf.vcfs.iter().for_each(|v| {
+            vcf_by_ids.entry(v.id.clone()).or_default().push(v.clone());
+        });
+
+        vcf_by_ids.iter().for_each(|(k, v)| {
+            if v.len() != 5 {
+                if let (Some(diag), Some(mrd)) = (self.bam.get(k, "diag").first(), self.bam.get(k, "mrd").first()) {
+
+                    println!("{v:#?}");
+                    let diag_bam = diag.path.to_str().unwrap().to_string();
+                    let mrd_bam = mrd.path.to_str().unwrap().to_string();
+
+                    let has_clairs = v.iter().any(|v| v.caller == "clairs");
+                    let has_clairs_indel = v.iter().any(|v| v.caller == "clairs_indel");
+                    if !has_clairs || !has_clairs_indel {
+                        self.tasks.push(CollectionsTasks::ClairS { id: k.to_string(), diag_bam: diag_bam.clone(), mrd_bam: mrd_bam.clone(), config: ClairSConfig::default() });
+                    }
+                    if !v.iter().any(|v| v.caller == "DeepVariant" && v.time_point == "diag") {
+                        self.tasks.push(CollectionsTasks::DeepVariant { id: k.to_string(), time_point: "diag".to_string(), bam: diag_bam.clone(), config: DeepVariantConfig::default() })
+                    }
+                    if !v.iter().any(|v| v.caller == "DeepVariant" && v.time_point == "mrd") {
+                        self.tasks.push(CollectionsTasks::DeepVariant { id: k.to_string(), time_point: "mrd".to_string(), bam: mrd_bam.clone(), config: DeepVariantConfig::default() })
+                    }
+                    if !v.iter().any(|v| v.caller == "nanomonsv") {
+                        self.tasks.push(CollectionsTasks::NanomonSV { id: k.to_string(), diag_bam: diag_bam.clone(), mrd_bam: mrd_bam.clone(), config: NanomonSVConfig::default() })
+                    }
+                };
+            }
+        });
+        
     }
 
     pub fn run(&mut self) -> anyhow::Result<()> {
@@ -114,7 +149,9 @@ impl Collections {
 pub enum CollectionsTasks {
     Align(FlowCellCase),
     DemuxAlign(Vec<FlowCellCase>),
-    // DeepVariant()
+    DeepVariant {id: String, time_point: String, bam: String, config: DeepVariantConfig},
+    ClairS {id: String, diag_bam: String, mrd_bam: String, config: ClairSConfig},
+    NanomonSV {id: String, diag_bam: String, mrd_bam: String, config: NanomonSVConfig},
 }
 
 impl Run for CollectionsTasks {
@@ -126,6 +163,15 @@ impl Run for CollectionsTasks {
             CollectionsTasks::DemuxAlign(cases) => {
                 Dorado::from_mux(cases, Config::default())?;
             },
+            CollectionsTasks::DeepVariant { id, time_point, bam, config } => {
+                DeepVariant::new(&id, &time_point, &bam, config).run();
+            },
+            CollectionsTasks::ClairS { id, diag_bam, mrd_bam, config } => {
+                ClairS::new(&id, &diag_bam, &mrd_bam, config).run();
+            },
+            CollectionsTasks::NanomonSV { id, diag_bam, mrd_bam, config } => {
+                NanomonSV::new(&id, &diag_bam, &mrd_bam, config).run();
+            },
         }
         Ok(())
     }
@@ -136,7 +182,7 @@ mod tests {
     use self::{callers::deep_variant::DeepVariantConfig, commands::dorado};
 
     use super::*;
-    use crate::{bam::BamType, callers::{clairs::{ClairS, ClairSConfig}, deep_variant::DeepVariant}};
+    use crate::{bam::BamType, callers::{clairs::{ClairS, ClairSConfig}, deep_variant::DeepVariant, nanomonsv::{NanomonSV, NanomonSVConfig}}};
 
     #[test]
     fn it_works() {
@@ -185,7 +231,6 @@ mod tests {
     fn vcf() -> anyhow::Result<()> {
         let mut vcf_collection = VcfCollection::new("/data/longreads_basic_pipe");
         vcf_collection.sort_by_id();
-
         vcf_collection
             .vcfs
             .iter()
@@ -223,15 +268,41 @@ mod tests {
         Ok(())
     }
 
+    #[test_log::test]
+    fn nanomonsv() -> anyhow::Result<()> {
+        let config = NanomonSVConfig {
+            result_dir: "/data/test".to_string(),
+            ..NanomonSVConfig::default()
+        };
+        NanomonSV::new("test_a", "/data/test_data/subset.bam", "/data/test_data/subset_mrd.bam", config).run();
+        Ok(())
+    }
+
     // cargo test run -- --nocapture; ~/run_scripts/notify_finish.sh &
     #[test_log::test]
-    fn run() -> anyhow::Result<()> {
+    fn task() -> anyhow::Result<()> {
+        let mut collections = Collections::new(
+            "/data/run_data",
+            "/data/flow_cells.tsv",
+            "/data/longreads_basic_pipe",
+        )?;
+        collections.todo();
+        println!("{:#?}", collections.tasks);
+        // collections.run()?;
+        Ok(())
+    }
+
+    #[test_log::test]
+    fn run_tasks() -> anyhow::Result<()> {
         let mut collections = Collections::new(
             "/data/run_data",
             "/data/flow_cells.tsv",
             "/data/longreads_basic_pipe",
         )?;
+        collections.todo();
+        println!("{:#?}", collections.tasks);
         collections.run()?;
         Ok(())
     }
+
 }

+ 13 - 2
src/runners.rs

@@ -16,7 +16,6 @@ pub struct DockerRun {
 
 impl DockerRun {
     pub fn run(args: &[&str]) -> Self {
-        
         let docker_run = DockerRun {
             args: args.iter().map(|e| e.to_string()).collect(),
             ..Default::default()
@@ -56,6 +55,18 @@ impl DockerRun {
         let log_id = id.clone();
         let logs_clone = Arc::clone(&docker_run.logs);
         let _loger_thread = thread::spawn(move || {
+            let output = Command::new("docker")
+                .args(["inspect", "--format='{{.Config.Image}}'", &log_id])
+                .output()
+                .expect("Failed to execute Docker inspect");
+            let container_name = if output.status.success() {
+                String::from_utf8(output.stdout)
+                    .expect("Failed to convert output to string")
+                    .trim() // Trim to remove any trailing newlines
+                    .to_string()
+            } else {
+                "".to_string()
+            };
             let mut child = Command::new("docker")
                 .args(["logs", "--follow", &log_id])
                 .stdout(Stdio::piped())
@@ -66,7 +77,7 @@ impl DockerRun {
             if let Some(stdout) = child.stdout.take() {
                 let reader = BufReader::new(stdout);
                 for line in reader.lines().map_while(Result::ok) {
-                    info!("{}", line);
+                    info!("[{container_name}] {line}");
                     let mut logs_lock = logs_clone.lock().unwrap();
                     logs_lock.push_str(&line);
                     logs_lock.push('\n');

+ 1 - 1
src/vcf.rs

@@ -8,7 +8,7 @@ use std::{fs::Metadata, os::unix::fs::MetadataExt, path::PathBuf};
 use noodles_csi as csi;
 use num_format::{Locale, ToFormattedString};
 
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct Vcf {
     pub id: String,
     pub caller: String,