Browse Source

docker runner, deep_var

Thomas 1 year ago
parent
commit
63fc4f96cf
10 changed files with 344 additions and 80 deletions
  1. 44 2
      Cargo.lock
  2. 3 0
      Cargo.toml
  3. 15 11
      src/bam.rs
  4. 134 0
      src/callers/deep_variant.rs
  5. 3 0
      src/callers/mod.rs
  6. 9 10
      src/commands/dorado.rs
  7. 1 1
      src/config.rs
  8. 31 16
      src/lib.rs
  9. 6 40
      src/pod5.rs
  10. 98 0
      src/runners.rs

+ 44 - 2
Cargo.lock

@@ -51,6 +51,12 @@ dependencies = [
  "memchr",
 ]
 
+[[package]]
+name = "allocator-api2"
+version = "0.2.18"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f"
+
 [[package]]
 name = "android-tzdata"
 version = "0.1.1"
@@ -683,7 +689,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "a6362ed55def622cddc70a4746a68554d7b687713770de539e59a739b249f8ed"
 dependencies = [
  "borsh-derive",
- "cfg_aliases",
+ "cfg_aliases 0.2.1",
 ]
 
 [[package]]
@@ -797,6 +803,12 @@ version = "1.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
 
+[[package]]
+name = "cfg_aliases"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e"
+
 [[package]]
 name = "cfg_aliases"
 version = "0.2.1"
@@ -961,6 +973,16 @@ dependencies = [
  "memchr",
 ]
 
+[[package]]
+name = "ctrlc"
+version = "3.4.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "672465ae37dc1bc6380a6547a8883d5dd397b0f1faaad4f265726cc7042a5345"
+dependencies = [
+ "nix 0.28.0",
+ "windows-sys",
+]
+
 [[package]]
 name = "curl-sys"
 version = "0.4.74+curl-8.9.0"
@@ -1298,6 +1320,11 @@ name = "hashbrown"
 version = "0.14.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
+dependencies = [
+ "ahash 0.8.11",
+ "allocator-api2",
+ "rayon",
+]
 
 [[package]]
 name = "heck"
@@ -1713,6 +1740,18 @@ dependencies = [
  "pin-utils",
 ]
 
+[[package]]
+name = "nix"
+version = "0.28.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ab2156c4fce2f8df6c499cc1c763e4394b7482525bf2a9701c9d79d215f519e4"
+dependencies = [
+ "bitflags 2.6.0",
+ "cfg-if",
+ "cfg_aliases 0.1.1",
+ "libc",
+]
+
 [[package]]
 name = "nix"
 version = "0.29.0"
@@ -1721,7 +1760,7 @@ checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46"
 dependencies = [
  "bitflags 2.6.0",
  "cfg-if",
- "cfg_aliases",
+ "cfg_aliases 0.2.1",
  "libc",
 ]
 
@@ -2012,10 +2051,12 @@ dependencies = [
  "byte-unit",
  "chrono",
  "csv",
+ "ctrlc",
  "duct",
  "env_logger 0.11.5",
  "expectrl",
  "glob",
+ "hashbrown 0.14.5",
  "locale_config",
  "log",
  "logtest",
@@ -2027,6 +2068,7 @@ dependencies = [
  "pandora_lib_pod5",
  "pty-process",
  "ptyprocess",
+ "rayon",
  "regex",
  "serde",
  "test-log",

+ 3 - 0
Cargo.toml

@@ -29,3 +29,6 @@ expectrl = "0.7.1"
 ptyprocess = "0.4.1"
 duct = "0.13.7"
 uuid = { version = "1.10.0", features = ["v4"] }
+rayon = "1.10.0"
+hashbrown = { version = "0.14.5", features = ["rayon"] }
+ctrlc = "3.4.4"

+ 15 - 11
src/bam.rs

@@ -12,6 +12,7 @@ use pandora_lib_bindings::{
     progs::cramino::{Cramino, CraminoRes},
     utils::RunBin,
 };
+use rayon::prelude::*;
 
 #[derive(Debug)]
 pub struct Bam {
@@ -137,18 +138,21 @@ impl BamCollection {
 }
 
 pub fn load_bam_collection(result_dir: &str) -> BamCollection {
-    let mut bams = Vec::new();
     let pattern = format!("{}/*/*/*.bam", result_dir);
-
-    for entry in glob(&pattern).expect("Failed to read glob pattern") {
-        match entry {
-            Ok(path) => match Bam::new(path) {
-                Ok(bam) => bams.push(bam),
-                Err(e) => warn!("{e}"),
-            },
-            Err(e) => warn!("Error: {:?}", e),
-        }
-    }
+    let bams = glob(&pattern)
+        .expect("Failed to read glob pattern")
+        .par_bridge()
+        .filter_map(|entry| {
+            match entry {
+                Ok(path) => match Bam::new(path) {
+                    Ok(bam) => return Some(bam),
+                    Err(e) => warn!("{e}"),
+                },
+                Err(e) => warn!("Error: {:?}", e),
+            }
+            None
+        })
+        .collect();
 
     BamCollection { bams }
 }

+ 134 - 0
src/callers/deep_variant.rs

@@ -0,0 +1,134 @@
+use std::{
+    fs,
+    io::{BufRead, BufReader},
+    path::Path,
+    process::{Child, Command, Stdio},
+    sync::{Arc, Mutex},
+    thread,
+};
+
+use duct::cmd;
+use log::{info, warn};
+
+use crate::runners::{self, DockerRun};
+
+#[derive(Debug)]
+pub struct DeepVariantConfig {
+    pub bin_version: String,
+    pub threads: u8,
+    pub model_type: String,
+    pub result_dir: String,
+    pub reference: String,
+    pub bcftools: String,
+}
+
+impl Default for DeepVariantConfig {
+    fn default() -> Self {
+        Self {
+            bin_version: "1.6.1".to_string(),
+            threads: 155,
+            model_type: "ONT_R104".to_string(),
+            result_dir: "/data/longreads_basic_pipe".to_string(),
+            reference: "/data/ref/hs1/chm13v2.0.fa".to_string(),
+            bcftools: "/data/tools/bcftools-1.18/bcftools".to_string(),
+        }
+    }
+}
+
+#[derive(Debug)]
+pub struct DeepVariant {
+    pub id: String,
+    pub time_point: String,
+    pub bam: String,
+    pub config: DeepVariantConfig,
+}
+
+impl DeepVariant {
+    pub fn new(id: &str, time_point: &str, bam: &str, config: DeepVariantConfig) -> Self {
+        Self {
+            id: id.to_string(),
+            time_point: time_point.to_string(),
+            bam: bam.to_string(),
+            config,
+        }
+    }
+
+    pub fn run(self) -> Self {
+        // Create out dir
+        let output_dir = format!(
+            "{}/{}/{}/DeepVariant",
+            self.config.result_dir, self.id, self.time_point
+        );
+        if !Path::new(&output_dir).exists() {
+            fs::create_dir_all(&output_dir).expect("Failed to create output directory");
+        }
+        let output_vcf = format!(
+            "{output_dir}/{}_{}_DeepVariant.vcf.gz",
+            self.id, self.time_point
+        );
+
+        // Run Docker command if output VCF doesn't exist
+        if !Path::new(&output_vcf).exists() {
+            let docker_run = DockerRun::run(&[
+                "run",
+                "-d",
+                "-v",
+                "/data:/data", // <---
+                "-v",
+                &format!("{}:/output", output_dir),
+                &format!("google/deepvariant:{}", self.config.bin_version),
+                "/opt/deepvariant/bin/run_deepvariant",
+                "--model_type=ONT_R104",
+                "--ref",
+                &self.config.reference,
+                "--reads",
+                &self.bam,
+                "--output_vcf",
+                &format!("/output/{}_{}_DeepVariant.vcf.gz", self.id, self.time_point),
+                "--output_gvcf",
+                &format!(
+                    "/output/{}_{}_DeepVariant.g.vcf.gz",
+                    self.id, self.time_point
+                ),
+                "--num_shards=155",
+                "--logging_dir",
+                &format!("/output/{}_{}_DeepVariant_logs", self.id, self.time_point),
+                "--dry_run=false",
+                "--sample_name",
+                &format!("{}_{}", self.id, self.time_point),
+            ]);
+            docker_run.wait();
+            let log = docker_run.log();
+            println!("{log}");
+        }
+
+        // Keep PASS
+        let vcf_passed = format!(
+            "{}/{}_{}_DeepVariant_PASSED.vcf.gz",
+            output_dir, self.id, self.time_point
+        );
+        if !Path::new(&vcf_passed).exists() {
+            let status = Command::new(&self.config.bcftools)
+                .args([
+                    "view",
+                    "--write-index",
+                    "--threads",
+                    "20",
+                    "-i",
+                    "FILTER='PASS'",
+                    &output_vcf,
+                    "-o",
+                    &vcf_passed,
+                ])
+                .stdout(Stdio::inherit())
+                .stderr(Stdio::inherit())
+                .status()
+                .expect("Failed to filter VCF with bcftools");
+
+            if !status.success() {
+                warn!("bcftools command failed with status: {}", status);
+            }
+        }
+        self
+    }
+}

+ 3 - 0
src/callers/mod.rs

@@ -0,0 +1,3 @@
+
+pub mod deep_variant;
+

+ 9 - 10
src/commands/dorado.rs

@@ -40,7 +40,7 @@ pub struct Dorado {
 
 impl Dorado {
     pub fn init(case: FlowCellCase, config: Config) -> anyhow::Result<Self> {
-        let data_dir = "/data/longreads_basic_pipe";
+        let data_dir = &config.result_dir;
         let case_dir = format!("{}/{}", data_dir, case.id);
         let time_dir = format!("{}/{}", case_dir, case.time_point);
         let bam = format!("{}/{}_{}_hs1.bam", time_dir, case.id, case.time_point);
@@ -100,7 +100,7 @@ impl Dorado {
         info!("Running: {pipe}");
 
         let pipe_cmd = cmd!("bash", "-c", pipe);
-        let mut reader = pipe_cmd.stdout_capture().reader()?;
+        let mut reader = pipe_cmd.stderr_capture().reader()?;
 
         let mut buffer = [0; 1];
         let mut line = String::new();
@@ -110,7 +110,7 @@ impl Dorado {
                 Ok(0) => break, // End of output
                 Ok(_) => {
                     let char = buffer[0] as char;
-                    eprint!("{}", char);
+                    eprint!("-{}", char);
                     std::io::stderr().flush()?;
 
                     if char == '\n' {
@@ -139,7 +139,7 @@ impl Dorado {
         Ok(())
     }
 
-    fn run_cramino(&self) -> anyhow::Result<()> {
+    pub fn run_cramino(&self) -> anyhow::Result<()> {
         let cramino_out = format!(
             "{}/{}_{}_hs1_cramino.txt",
             self.time_dir, self.case.id, self.case.time_point
@@ -150,8 +150,6 @@ impl Dorado {
             "cramino",
             "-t",
             "150",
-            "--hist",
-            "--checksum",
             "--karyotype",
             &self.bam
         )
@@ -164,7 +162,7 @@ impl Dorado {
         Ok(())
     }
 
-    fn run_modkit(&self) -> anyhow::Result<()> {
+    pub fn run_modkit(&self) -> anyhow::Result<()> {
         let mod_summary = format!(
             "{}/{}_{}_5mC_5hmC_summary.txt",
             self.time_dir, self.case.id, self.case.time_point
@@ -181,7 +179,7 @@ impl Dorado {
         Ok(())
     }
 
-    fn create_fastq(&self) -> anyhow::Result<()> {
+    pub fn create_fastq(&self) -> anyhow::Result<()> {
         let bam = &self.bam;
         let fastq = format!(
             "{}/{}/{}/{}_{}.fastq.gz",
@@ -198,7 +196,7 @@ impl Dorado {
         Ok(())
     }
 
-    fn merge_bam(&self, bam: &Path) -> anyhow::Result<()> {
+    pub fn merge_bam(&self, bam: &Path) -> anyhow::Result<()> {
         let composition_a: Vec<String> =
             pandora_lib_pileup::bam_compo(bam.to_string_lossy().as_ref(), 20000)?
                 .iter()
@@ -213,7 +211,8 @@ impl Dorado {
             .filter(|id| composition_b.contains(id))
             .count();
         if n_id > 0 {
-            return Err(anyhow::anyhow!("Already merged"));
+            warn!("{} is already merged", self.case.id);
+            return Ok(());
         }
 
         let into = PathBuf::from(&self.bam);

+ 1 - 1
src/config.rs

@@ -33,7 +33,7 @@ impl Default for AlignConfig {
             ref_fa: "/data/ref/hs1/chm13v2.0.fa".to_string(),
             ref_mmi: "/data/ref/chm13v2.0.mmi".to_string(),
             samtools_view_threads: 20,
-            samtools_sort_threads: 30,
+            samtools_sort_threads: 50,
         }
     }
 }

+ 31 - 16
src/lib.rs

@@ -13,6 +13,8 @@ pub mod config;
 pub mod modkit;
 pub mod pod5;
 mod vcf;
+pub mod callers;
+pub mod runners;
 
 #[derive(Debug)]
 pub struct Collections {
@@ -24,7 +26,7 @@ pub struct Collections {
 
 impl Collections {
     pub fn new(pod_dir: &str, corrected_fc_path: &str, result_dir: &str) -> anyhow::Result<Self> {
-        let pod5 = Pod5Collection::import_dir(pod_dir, corrected_fc_path, result_dir)?;
+        let pod5 = Pod5Collection::new(pod_dir, corrected_fc_path, result_dir)?;
         let bam = BamCollection::new(result_dir);
         let vcf = VcfCollection::new(result_dir);
 
@@ -69,6 +71,7 @@ impl Collections {
             }
         }
 
+        // Group for muxed and push task with all cases
         let mut grouped: HashMap<PathBuf, Vec<FlowCellCase>> = HashMap::new();
         for case in to_demux {
             grouped
@@ -79,6 +82,8 @@ impl Collections {
         grouped
             .into_values()
             .for_each(|data| self.tasks.push(CollectionsTasks::DemuxAlign(data)));
+
+        // Variant calling
     }
 
     pub fn run(&mut self) -> anyhow::Result<()> {
@@ -109,6 +114,7 @@ impl Collections {
 pub enum CollectionsTasks {
     Align(FlowCellCase),
     DemuxAlign(Vec<FlowCellCase>),
+    // DeepVariant()
 }
 
 impl Run for CollectionsTasks {
@@ -127,10 +133,10 @@ impl Run for CollectionsTasks {
 
 #[cfg(test)]
 mod tests {
-    use self::commands::dorado;
+    use self::{callers::deep_variant::DeepVariantConfig, commands::dorado};
 
     use super::*;
-    use crate::bam::BamType;
+    use crate::{bam::BamType, callers::deep_variant::DeepVariant};
 
     #[test]
     fn it_works() {
@@ -150,7 +156,7 @@ mod tests {
         let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
             .build();
 
-        let _ = Pod5Collection::import_dir(
+        let _ = Pod5Collection::new(
             "/data/run_data",
             "/data/flow_cells.tsv",
             "/data/longreads_basic_pipe",
@@ -189,24 +195,33 @@ mod tests {
     }
 
     #[test_log::test]
-    fn collections() -> anyhow::Result<()> {
+    fn mux() -> anyhow::Result<()> {
+        let cases = vec![
+            FlowCellCase { id: "test_04".to_string(), time_point: "diag".to_string(), barcode: "04".to_string(), pod_dir: "/data/test_d".into() },
+            FlowCellCase { id: "test_05".to_string(), time_point: "diag".to_string(), barcode: "05".to_string(), pod_dir: "/data/test_d".into() },
+        ];
+        Dorado::from_mux(cases, Config::default())
+    }
+
+    #[test_log::test]
+    fn deep_variant() -> anyhow::Result<()> {
+        let config = DeepVariantConfig {
+            result_dir: "/data/test".to_string(),
+            ..DeepVariantConfig::default()
+        };
+        let dv = DeepVariant::new("test_a", "diag", "/data/test_data/subset.bam", config).run();
+        Ok(())
+    }
+
+    // cargo test run -- --nocapture; ~/run_scripts/notify_finish.sh &
+    #[test_log::test]
+    fn run() -> anyhow::Result<()> {
         let mut collections = Collections::new(
             "/data/run_data",
             "/data/flow_cells.tsv",
             "/data/longreads_basic_pipe",
         )?;
-        // collections.vcf.print_tsv();
         collections.run()?;
         Ok(())
     }
-
-    #[test_log::test]
-    fn mux() -> anyhow::Result<()> {
-        
-        let cases = vec![
-            FlowCellCase { id: "test_04".to_string(), time_point: "diag".to_string(), barcode: "04".to_string(), pod_dir: "/data/test_d".into() },
-            FlowCellCase { id: "test_05".to_string(), time_point: "diag".to_string(), barcode: "05".to_string(), pod_dir: "/data/test_d".into() },
-        ];
-        Dorado::from_mux(cases, Config::default())
-    }
 }

+ 6 - 40
src/pod5.rs

@@ -4,15 +4,17 @@ use csv::ReaderBuilder;
 use glob::glob;
 use log::{info, warn};
 use pandora_lib_pod5::Pod5Info;
+use rayon::iter::ParallelBridge;
 use serde::Deserialize;
 use std::{
-    collections::HashMap,
     fmt::Display,
     fs::{self, File, Metadata},
     os::unix::fs::MetadataExt,
     path::PathBuf,
     usize,
 };
+use hashbrown::HashMap;
+use rayon::prelude::*;
 
 #[derive(Debug, Clone)]
 pub struct Pod5 {
@@ -178,44 +180,8 @@ pub struct FlowCellCase {
     // pub basecalled: Option<bool>,
 }
 
-impl FlowCellCase {
-    // pub fn basecalled(&mut self, bam_dir: &str, acquisition_id: String) -> bool {
-    //     if let Some(b) = self.basecalled {
-    //         return b;
-    //     } else if let std::result::Result::Ok(p) = PathBuf::from_str(&format!(
-    //         "{bam_dir}/{}/{}/{}_{}_hs1.bam",
-    //         self.id,
-    //         self.time_point.to_lowercase(),
-    //         self.id,
-    //         self.time_point.to_lowercase()
-    //     )) {
-    //         if p.exists() {
-    //             let has_id = pandora_lib_pileup::bam_compo(p.to_str().unwrap(), 20000)
-    //                 .unwrap()
-    //                 .iter()
-    //                 .flat_map(|(rg, _)| {
-    //                     if let Some(index) = rg.find('_') {
-    //                         let fc_id: &str = &rg[..index];
-    //                         vec![fc_id.to_string()]
-    //                     } else {
-    //                         vec![]
-    //                     }
-    //                 })
-    //                 .filter(|i| *i == acquisition_id)
-    //                 .count()
-    //                 > 0;
-    //             if has_id {
-    //                 self.basecalled = Some(true);
-    //                 return true;
-    //             }
-    //         }
-    //     }
-    //     false
-    // }
-}
-
 impl Pod5Collection {
-    pub fn import_dir(pod5_dir: &str, corrected_fc_path: &str, bam_dir: &str) -> Result<Self> {
+    pub fn new(pod5_dir: &str, corrected_fc_path: &str, bam_dir: &str) -> Result<Self> {
         let pod5 = list_pod_files(pod5_dir)?;
         info!("n pod5 {}", pod5.len());
 
@@ -227,7 +193,7 @@ impl Pod5Collection {
 
         let corrected_fc = load_flowcells_corrected_names(corrected_fc_path)?;
         let flow_cells: Vec<FlowCell> = fc
-            .into_values()
+            .par_values()
             .map(|v| {
                 let first = &v[0];
                 let pod5_info = Pod5Info::from_pod5(first.path.to_str().unwrap());
@@ -287,7 +253,7 @@ impl Pod5Collection {
                     run_name: first.run_name.clone(),
                     pod5_type: first.pod5_type.clone(),
                     pod5_info,
-                    pod5: v,
+                    pod5: v.to_vec(),
                 }
             })
             .collect();

+ 98 - 0
src/runners.rs

@@ -0,0 +1,98 @@
+use std::{
+    io::{BufRead, BufReader},
+    process::{Command, Stdio},
+    sync::{Arc, Mutex},
+    thread,
+};
+
+use log::{info, warn};
+
+#[derive(Debug, Default)]
+pub struct DockerRun {
+    pub args: Vec<String>,
+    pub container_id: Arc<Mutex<Option<String>>>,
+    pub logs: Arc<Mutex<String>>,
+}
+
+impl DockerRun {
+    pub fn run(args: &[&str]) -> Self {
+        
+        let docker_run = DockerRun {
+            args: args.iter().map(|e| e.to_string()).collect(),
+            ..Default::default()
+        };
+
+
+        // Setup Ctrl-C handler
+        {
+            let container_id = Arc::clone(&docker_run.container_id);
+            ctrlc::set_handler(move || {
+                if let Ok(container_id) = container_id.lock() {
+                    if let Some(ref id) = *container_id {
+                        warn!("Stopping Docker container...");
+                        let _ = Command::new("docker").args(["stop", id]).status();
+                    }
+                }
+                std::process::exit(1);
+            })
+            .expect("Error setting Ctrl-C handler");
+        }
+
+        // Spawn the main command
+        let output = Command::new("docker")
+            .args(&docker_run.args)
+            .stdout(Stdio::piped())
+            .stderr(Stdio::inherit())
+            .output()
+            .expect("Failed to run Docker command");
+
+        // add id to Arc
+        let id = String::from_utf8_lossy(&output.stdout).trim().to_string();
+        {
+            let mut container_id_lock = docker_run.container_id.lock().unwrap();
+            *container_id_lock = Some(id.clone());
+        }
+
+        // Spawn a thread to follow the logs
+        let log_id = id.clone();
+        let logs_clone = Arc::clone(&docker_run.logs);
+        let _loger_thread = thread::spawn(move || {
+            let mut child = Command::new("docker")
+                .args(["logs", "--follow", &log_id])
+                .stdout(Stdio::piped())
+                .stderr(Stdio::inherit())
+                .spawn()
+                .expect("Failed to follow Docker logs");
+
+            if let Some(stdout) = child.stdout.take() {
+                let reader = BufReader::new(stdout);
+                for line in reader.lines().map_while(Result::ok) {
+                    info!("{}", line);
+                    let mut logs_lock = logs_clone.lock().unwrap();
+                    logs_lock.push_str(&line);
+                    logs_lock.push('\n');
+                }
+            }
+        });
+        docker_run
+    }
+    // Wait for the container to finish
+    pub fn wait(&self) {
+        if let Ok(container_id) = self.container_id.lock() {
+            if let Some(ref id) = *container_id {
+                let status = Command::new("docker")
+                    .args(["wait", id])
+                    .status()
+                    .expect("Failed to wait on Docker container");
+                if !status.success() {
+                    warn!("Docker command failed with status: {}", status);
+                }
+            }
+        }
+    }
+
+    pub fn log(&self) -> String {
+        let logs_lock = self.logs.lock().unwrap();
+        logs_lock.to_string()
+    }
+}