Pārlūkot izejas kodu

static DOCKER_ID

Thomas 1 gadu atpakaļ
vecāks
revīzija
1426319ce6
8 mainītis faili ar 64 papildinājumiem un 300 dzēšanām
  1. 0 280
      :
  2. 1 0
      Cargo.lock
  3. 1 0
      Cargo.toml
  4. 1 1
      src/callers/clairs.rs
  5. 1 0
      src/collection/mod.rs
  6. 27 0
      src/collection/somatic_variants.rs
  7. 10 0
      src/lib.rs
  8. 23 19
      src/runners.rs

+ 0 - 280
:

@@ -1,280 +0,0 @@
-// reference="/data/ref/hs1/chm13v2.0.fa"
-// bcftools="/data/tools/bcftools-1.18/bcftools"
-//
-// DIAG_CASE_DIR="/data/longreads_basic_pipe/${name}/diag"
-// MRD_CASE_DIR="/data/longreads_basic_pipe/${name}/mrd"
-//
-// DIAG_OUTPUT_DIR="${DIAG_CASE_DIR}/nanomonsv"
-// MRD_OUTPUT_DIR="${MRD_CASE_DIR}/nanomonsv"
-//
-// DIAG_BAM="${DIAG_CASE_DIR}/${name}_diag_hs1.bam"
-// MRD_BAM="${MRD_CASE_DIR}/${name}_mrd_hs1.bam"
-//
-// DIAG_OUT_PREFIX="${DIAG_OUTPUT_DIR}/${name}_diag"
-// MRD_OUT_PREFIX="${MRD_OUTPUT_DIR}/${name}_mrd"
-//
-// if [ ! -d "$DIAG_OUTPUT_DIR" ]; then
-//   mkdir "$DIAG_OUTPUT_DIR"
-// fi
-//
-// if [ ! -d "$MRD_OUTPUT_DIR" ]; then
-//   mkdir "$MRD_OUTPUT_DIR"
-// fi
-//
-// # PARSE
-// if [ ! -f "${DIAG_OUT_PREFIX}.bp_info.sorted.bed.gz" ]; then
-//   nanomonsv parse --reference_fasta "$reference" "$DIAG_BAM" "$DIAG_OUT_PREFIX"
-// fi
-//
-// if [ ! -f "${MRD_OUT_PREFIX}.bp_info.sorted.bed.gz" ]; then
-//   echo "[pipe] NanomonSV parse ${MRD_BAM}"
-//   nanomonsv parse --reference_fasta "$reference" "$MRD_BAM" "$MRD_OUT_PREFIX"
-// fi
-//
-// # GET
-// if [ ! -f "${DIAG_OUT_PREFIX}.nanomonsv.result.vcf" ]; then
-//   nanomonsv get --control_prefix "$MRD_OUT_PREFIX" \
-//     --control_bam "$MRD_BAM" \
-//     --processes 155 \
-//     "$DIAG_OUT_PREFIX" \
-//     "$DIAG_BAM" "$reference"
-// fi
-//
-// # GET MRD
-// if [ ! -f "${MRD_OUT_PREFIX}.nanomonsv.result.vcf" ]; then
-//   nanomonsv get \
-//     --processes 155 \
-//     "$MRD_OUT_PREFIX" \
-//     "$MRD_BAM" "$reference"
-// fi
-//
-// vcf_passed="${DIAG_OUT_PREFIX}_nanomonsv_PASSED.vcf.gz"
-// if [ ! -f "$vcf_passed" ]; then
-//   $bcftools sort "${DIAG_OUT_PREFIX}.nanomonsv.result.vcf" | \
-//     $bcftools view -s "TUMOR" --write-index --threads 20 \
-//     -i "FILTER='PASS'" \
-//     -o "$vcf_passed"
-// fi
-
-use std::{
-    fs,
-    io::{BufRead, BufReader},
-    path::Path,
-    process::{Command, Stdio},
-    sync::{Arc, Mutex},
-    thread,
-};
-
-use log::info;
-
-use crate::commands::bcftools::{bcftools_keep_pass, BcftoolsConfig};
-
-#[derive(Debug)]
-pub struct NanomonSVConfig {
-    pub bin: String,
-    pub result_dir: String,
-    pub reference: String,
-    pub thread: u8,
-}
-
-impl Default for NanomonSVConfig {
-    fn default() -> Self {
-        Self {
-            bin: "nanomonsv".to_string(),
-            result_dir: "/data/longreads_basic_pipe".to_string(),
-            reference: "/data/ref/hs1/chm13v2.0.fa".to_string(),
-            thread: 155,
-        }
-    }
-}
-
-#[derive(Debug)]
-pub struct NanomonSV {
-    pub id: String,
-    pub diag_bam: String,
-    pub mrd_bam: String,
-    pub diag_out_dir: String,
-    pub mrd_out_dir: String,
-    pub log: String,
-    pub config: NanomonSVConfig,
-}
-
-impl NanomonSV {
-    pub fn new(id: &str, diag_bam: &str, mrd_bam: &str, config: NanomonSVConfig) -> Self {
-        let diag_out_dir = format!("{}/{id}/diag/nanomonsv", &config.result_dir);
-        let mrd_out_dir = format!("{}/{id}/mrd/nanomonsv", &config.result_dir);
-
-        NanomonSV {
-            id: id.to_string(),
-            diag_bam: diag_bam.to_string(),
-            mrd_bam: mrd_bam.to_string(),
-            diag_out_dir,
-            mrd_out_dir,
-            log: String::default(),
-            config,
-        }
-    }
-
-    pub fn run(&mut self) {
-        // Create direcories
-        if !Path::new(&self.diag_out_dir).exists() {
-            fs::create_dir_all(&self.diag_out_dir).unwrap();
-        }
-        if !Path::new(&self.mrd_out_dir).exists() {
-            fs::create_dir_all(&self.mrd_out_dir).unwrap();
-        }
-
-        // Parse
-        let diag_out_prefix = format!("{}/{}_diag", self.diag_out_dir, self.id);
-        let mrd_out_prefix = format!("{}/{}_mrd", self.mrd_out_dir, self.id);
-
-        let diag_info_vcf = format!("{diag_out_prefix}.bp_info.sorted.bed.gz");
-        let mrd_info_vcf = format!("{mrd_out_prefix}.bp_info.sorted.bed.gz");
-
-        let log: Arc<Mutex<String>> = Arc::new(Mutex::new(String::default()));
-
-        let mut threads_handles = Vec::new();
-        if !Path::new(&diag_info_vcf).exists() {
-            let bin = self.config.bin.clone();
-            let reference = self.config.reference.clone();
-            let diag_bam = self.diag_bam.clone();
-            let log = log.clone();
-            let diag_out_prefix = diag_out_prefix.clone();
-
-            let handle = thread::spawn(move || {
-                let mut child = Command::new(bin)
-                    .args([
-                        "parse",
-                        "--reference_fasta",
-                        &reference,
-                        &diag_bam,
-                        &diag_out_prefix,
-                    ])
-                    .stdout(Stdio::piped())
-                    .stderr(Stdio::inherit())
-                    .spawn()
-                    .expect("Failed to spawn NanomonSV parse");
-
-                if let Some(stdout) = child.stdout.take() {
-                    let reader = BufReader::new(stdout);
-                    for line in reader.lines().map_while(Result::ok) {
-                        info!("{}", line);
-                        let mut log_lock = log.lock().unwrap();
-                        log_lock.push_str(&line);
-                        log_lock.push('\n');
-                    }
-                }
-            });
-            threads_handles.push(handle);
-        }
-
-        if !Path::new(&mrd_info_vcf).exists() {
-            let bin = self.config.bin.clone();
-            let reference = self.config.reference.clone();
-            let mrd_bam = self.diag_bam.clone();
-            let log = log.clone();
-            let mrd_out_prefix = mrd_out_prefix.clone();
-
-            let handle = thread::spawn(move || {
-                let mut child = Command::new(bin)
-                    .args([
-                        "parse",
-                        "--reference_fasta",
-                        &reference,
-                        &mrd_bam,
-                        &mrd_out_prefix,
-                    ])
-                    .stdout(Stdio::inherit())
-                    .stderr(Stdio::piped())
-                    .spawn()
-                    .expect("Failed to spawn NanomonSV parse");
-
-                if let Some(stdout) = child.stderr.take() {
-                    let reader = BufReader::new(stdout);
-                    for line in reader.lines().map_while(Result::ok) {
-                        info!("{}", line);
-                        let mut log_lock = log.lock().unwrap();
-                        log_lock.push_str(&line);
-                        log_lock.push('\n');
-                    }
-                }
-                child.wait().unwrap();
-            });
-            threads_handles.push(handle);
-        }
-
-        // Wait for all threads to finish
-        for handle in threads_handles {
-            handle.join().expect("Thread panicked");
-        }
-
-        // Get
-        let diag_result_vcf = format!("{diag_out_prefix}.nanomonsv.result.vcf");
-        let mrd_result_vcf = format!("{mrd_out_prefix}.nanomonsv.result.vcf");
-
-        if !Path::new(&mrd_result_vcf).exists() {
-            let mut child = Command::new(&self.config.bin)
-                .args([
-                    "get",
-                    "--process",
-                    &self.config.thread.to_string(),
-                    &mrd_out_prefix,
-                    &self.mrd_bam,
-                    &self.config.reference,
-                ])
-                .stdout(Stdio::inherit())
-                .stderr(Stdio::piped())
-                .spawn()
-                .expect("Failed to spawn nanomonsv");
-
-            if let Some(stdout) = child.stderr.take() {
-                let reader = BufReader::new(stdout);
-                for line in reader.lines().map_while(Result::ok) {
-                    info!("{line}");
-                    let mut log_lock = log.lock().unwrap();
-                    log_lock.push_str(&line);
-                    log_lock.push('\n');
-                }
-            }
-
-            child.wait().unwrap();
-        }
-
-        if !Path::new(&diag_result_vcf).exists() {
-            let mut child = Command::new(&self.config.bin)
-                .args([
-                    "get",
-                    "--control_prefix",
-                    &mrd_out_prefix,
-                    "--control_bam",
-                    &self.mrd_bam,
-                    "--process",
-                    &self.config.thread.to_string(),
-                    &diag_out_prefix,
-                    &self.diag_bam,
-                    &self.config.reference,
-                ])
-                .stdout(Stdio::inherit())
-                .stderr(Stdio::piped())
-                .spawn()
-                .expect("Failed to spawn nanomonsv");
-
-            if let Some(stdout) = child.stderr.take() {
-                let reader = BufReader::new(stdout);
-                for line in reader.lines().map_while(Result::ok) {
-                    info!("{line}");
-                    let mut log_lock = log.lock().unwrap();
-                    log_lock.push_str(&line);
-                    log_lock.push('\n');
-                }
-            }
-
-            child.wait().unwrap();
-        }
-
-        let vcf_passed = format!("{diag_out_prefix}_nanomonsv_PASSED.vcf.gz");
-        if !Path::new(&vcf_passed).exists() {
-            bcftools_keep_pass(&diag_result_vcf, &vcf_passed, BcftoolsConfig::default()).unwrap();
-        }
-    }
-}

+ 1 - 0
Cargo.lock

@@ -2058,6 +2058,7 @@ dependencies = [
  "expectrl",
  "glob",
  "hashbrown 0.14.5",
+ "lazy_static",
  "locale_config",
  "log",
  "logtest",

+ 1 - 0
Cargo.toml

@@ -33,3 +33,4 @@ uuid = { version = "1.10.0", features = ["v4"] }
 rayon = "1.10.0"
 hashbrown = { version = "0.14.5", features = ["rayon"] }
 ctrlc = "3.4.4"
+lazy_static = "1.5.0"

+ 1 - 1
src/callers/clairs.rs

@@ -71,7 +71,7 @@ impl ClairS {
             info!("ClairS output directory exists");
         }
         if !Path::new(&self.log_dir).exists() {
-            fs::create_dir_all(&self.output_dir).expect("Failed to create output directory");
+            fs::create_dir_all(&self.log_dir).expect("Failed to create output directory");
         }
 
         // Run Docker command if output VCF doesn't exist

+ 1 - 0
src/collection/mod.rs

@@ -18,6 +18,7 @@ use crate::{
 pub mod bam;
 pub mod pod5;
 pub mod vcf;
+pub mod somatic_variants;
 
 #[derive(Debug)]
 pub struct Collections {

+ 27 - 0
src/collection/somatic_variants.rs

@@ -0,0 +1,27 @@
+use std::path::PathBuf;
+
+use glob::glob;
+use log::warn;
+use rayon::prelude::*;
+
+pub struct SomaticVarCollection {
+    pub somatic_var: Vec<PathBuf>,
+}
+
+impl SomaticVarCollection {
+    pub fn new(result_dir: &str) -> anyhow::Result<Self> {
+        let pattern = format!("{}/*/*/*_variants.bytes.gz", result_dir);
+        let somatic_var = glob(&pattern)
+            .expect("Failed to read glob pattern")
+            .par_bridge()
+            .filter_map(|entry| {
+                match entry {
+                    Ok(path) => return Some(path),
+                    Err(e) => warn!("Error: {:?}", e),
+                }
+                None
+            })
+            .collect();
+        Ok(SomaticVarCollection { somatic_var })
+    }
+}

+ 10 - 0
src/lib.rs

@@ -1,3 +1,5 @@
+use std::sync::{Arc, Mutex};
+
 pub mod commands;
 pub mod config;
 pub mod modkit;
@@ -5,6 +7,14 @@ pub mod callers;
 pub mod runners;
 pub mod collection;
 
+#[macro_use]
+extern crate lazy_static;
+
+// Define DOCKER_ID lock for handling Docker kill when ctrlc is pressed
+lazy_static! {
+    static ref DOCKER_ID: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
+}
+
 #[cfg(test)]
 mod tests {
     use self::{callers::deep_variant::DeepVariantConfig, collection::pod5::{FlowCellCase, Pod5Collection}, commands::dorado, config::Config};

+ 23 - 19
src/runners.rs

@@ -1,5 +1,9 @@
 use std::{
-    fs::File, io::{BufRead, BufReader, Write}, process::{Child, Command, Stdio}, sync::{Arc, Mutex}, thread
+    fs::File,
+    io::{BufRead, BufReader, Write},
+    process::{Child, Command, Stdio},
+    sync::{Arc, Mutex},
+    thread,
 };
 
 use chrono::{DateTime, Utc};
@@ -7,6 +11,9 @@ use log::{info, warn};
 use serde::{Deserialize, Serialize};
 use uuid::Uuid;
 
+use crate::DOCKER_ID;
+
+
 pub trait Run {
     fn run(&mut self) -> anyhow::Result<()>;
 }
@@ -40,8 +47,7 @@ pub struct RunReport {
 impl RunReport {
     /// Serialize the RunReport to a JSON string
     pub fn save_to_file(&self, file_prefix: &str) -> std::io::Result<()> {
-        let json_data = serde_json::to_string_pretty(self)
-            .expect("Failed to serialize RunReport");
+        let json_data = serde_json::to_string_pretty(self).expect("Failed to serialize RunReport");
         let uuid = Uuid::new_v4().to_string()[..5].to_string();
         let file_path = format!("{}{}.log", file_prefix, uuid);
         let mut file = File::create(&file_path)?;
@@ -50,7 +56,6 @@ impl RunReport {
     }
 }
 
-
 pub trait Log {
     fn log(&self) -> String;
 }
@@ -76,19 +81,15 @@ impl DockerRun {
 impl Run for DockerRun {
     fn run(&mut self) -> anyhow::Result<()> {
         // Setup Ctrl-C handler cant be defined two times
-        {
-            let container_id = Arc::clone(&self.container_id);
-            ctrlc::set_handler(move || {
-                if let Ok(container_id) = container_id.lock() {
-                    if let Some(ref id) = *container_id {
-                        warn!("Stopping Docker container...");
-                        let _ = Command::new("docker").args(["stop", id]).status();
-                    }
+        let _ = ctrlc::try_set_handler(move || {
+            if let Ok(container_id) = DOCKER_ID.lock() {
+                if let Some(ref id) = *container_id {
+                    warn!("Stopping Docker container...");
+                    let _ = Command::new("docker").args(["stop", id]).status();
                 }
-                std::process::exit(1);
-            })
-            .expect("Error setting Ctrl-C handler");
-        }
+            }
+            std::process::exit(1);
+        });
 
         // Spawn the main command
         let output = Command::new("docker")
@@ -101,7 +102,7 @@ impl Run for DockerRun {
         // add id to Arc
         let id = String::from_utf8_lossy(&output.stdout).trim().to_string();
         {
-            let mut container_id_lock = self.container_id.lock().unwrap();
+            let mut container_id_lock = DOCKER_ID.lock().unwrap();
             *container_id_lock = Some(id.clone());
         }
 
@@ -144,7 +145,7 @@ impl Run for DockerRun {
 
 impl Wait for DockerRun {
     fn wait(&mut self) -> anyhow::Result<()> {
-        if let Ok(container_id) = self.container_id.lock() {
+        if let Ok(container_id) = DOCKER_ID.lock() {
             if let Some(ref id) = *container_id {
                 let status = Command::new("docker")
                     .args(["wait", id])
@@ -155,6 +156,9 @@ impl Wait for DockerRun {
                 }
             }
         }
+        let mut container_id_lock = DOCKER_ID.lock().unwrap();
+        *container_id_lock = None;
+
         Ok(())
     }
 }
@@ -193,7 +197,7 @@ impl Run for CommandRun {
             .stderr(Stdio::piped())
             .spawn()
             .expect("Failed to spawn");
-        
+
         if let Some(stderr) = child.stderr.take() {
             let reader = BufReader::new(stderr);
             for line in reader.lines().map_while(Result::ok) {