Thomas 1 vuosi sitten
vanhempi
commit
9ceab0f209
6 muutettua tiedostoa jossa 101 lisäystä ja 228 poistoa
  1. 2 0
      Cargo.lock
  2. 2 1
      Cargo.toml
  3. 22 5
      src/callers/clairs.rs
  4. 28 40
      src/callers/deep_variant.rs
  5. 29 177
      src/callers/nanomonsv.rs
  6. 18 5
      src/runners.rs

+ 2 - 0
Cargo.lock

@@ -825,6 +825,7 @@ dependencies = [
  "iana-time-zone",
  "js-sys",
  "num-traits",
+ "serde",
  "wasm-bindgen",
  "windows-targets 0.52.6",
 ]
@@ -2071,6 +2072,7 @@ dependencies = [
  "rayon",
  "regex",
  "serde",
+ "serde_json",
  "test-log",
  "tracing",
  "tracing-test",

+ 2 - 1
Cargo.toml

@@ -12,9 +12,10 @@ pandora_lib_pod5 = { git = "https://git.t0m4.fr/Thomas/pandora_lib_pod5.git" }
 pandora_lib_pileup = { git = "https://git.t0m4.fr/Thomas/pandora_lib_pileup.git" }
 pandora_lib_bindings = { git = "https://git.t0m4.fr/Thomas/pandora_lib_bindings.git" }
 regex = "1.10.5"
-chrono = "0.4.38"
+chrono = { version = "0.4.38", features = ["serde"] }
 csv = "1.3.0"
 serde = { version = "1.0.204", features = ["derive"] }
+serde_json = "1.0"
 tracing-test = "0.2.5"
 tracing = "0.1.40"
 logtest = "2.0.0"

+ 22 - 5
src/callers/clairs.rs

@@ -36,6 +36,7 @@ pub struct ClairS {
     pub mrd_bam: String,
     pub config: ClairSConfig,
     pub log: String,
+    pub log_dir: String,
 }
 
 impl ClairS {
@@ -46,6 +47,7 @@ impl ClairS {
         let vcf_passed = format!("{output_dir}/{id}_diag_clairs_PASSED.vcf.gz",);
         let indel_vcf_passed = format!("${output_dir}/${id}_diag_clairs_indel_PASSED.vcf.gz");
 
+        let log_dir = format!("{}/{}/log/ClairS", config.result_dir, id);
         Self {
             id: id.to_string(),
             output_dir,
@@ -57,6 +59,7 @@ impl ClairS {
             output_indel,
             log: String::default(),
             indel_vcf_passed,
+            log_dir,
         }
     }
 
@@ -67,6 +70,9 @@ impl ClairS {
         } else {
             info!("ClairS output directory exists");
         }
+        if !Path::new(&self.log_dir).exists() {
+            fs::create_dir_all(&self.output_dir).expect("Failed to create output directory");
+        }
 
         // Run Docker command if output VCF doesn't exist
         if !Path::new(&self.output_vcf).exists() {
@@ -96,25 +102,36 @@ impl ClairS {
                 "-s",
                 &format!("{}_diag", self.id),
             ]);
-            self.log = run_wait(&mut docker_run).unwrap().log;
+            let report = run_wait(&mut docker_run).unwrap();
+            report
+                .save_to_file(&format!("{}/clairs_", self.log_dir))
+                .unwrap();
         } else {
             info!("ClairS output vcf already exists");
         }
 
         // Keep PASS
         if !Path::new(&self.vcf_passed).exists() {
-            bcftools_keep_pass(
+            let report = bcftools_keep_pass(
                 &self.output_vcf,
                 &self.vcf_passed,
                 BcftoolsConfig::default(),
-            ).unwrap();
+            )
+            .unwrap();
+            report
+                .save_to_file(&format!("{}/bcftools_pass", self.log_dir))
+                .unwrap();
         }
         if !Path::new(&self.indel_vcf_passed).exists() {
-            bcftools_keep_pass(
+            let report = bcftools_keep_pass(
                 &self.output_indel,
                 &self.indel_vcf_passed,
                 BcftoolsConfig::default(),
-            ).unwrap();
+            )
+            .unwrap();
+            report
+                .save_to_file(&format!("{}/bcftools_pass", self.log_dir))
+                .unwrap();
         }
     }
 }

+ 28 - 40
src/callers/deep_variant.rs

@@ -1,11 +1,10 @@
-use std::{
-    fs,
-    path::Path,
-    process::{Command, Stdio},
-};
-use log::{info, warn};
+use log::info;
+use std::{fs, path::Path};
 
-use crate::runners::{run_wait, DockerRun};
+use crate::{
+    commands::bcftools::{bcftools_keep_pass, BcftoolsConfig},
+    runners::{run_wait, DockerRun},
+};
 
 #[derive(Debug)]
 pub struct DeepVariantConfig {
@@ -39,23 +38,19 @@ pub struct DeepVariant {
     pub output_vcf: String,
     pub vcf_passed: String,
     pub log: String,
+    pub log_dir: String,
     pub config: DeepVariantConfig,
 }
 
 impl DeepVariant {
     pub fn new(id: &str, time_point: &str, bam: &str, config: DeepVariantConfig) -> Self {
-        let output_dir = format!(
-            "{}/{}/{}/DeepVariant",
-            config.result_dir, id, time_point
-        );
-        let output_vcf = format!(
-            "{output_dir}/{}_{}_DeepVariant.vcf.gz",
-            id, time_point
-        );
+        let output_dir = format!("{}/{}/{}/DeepVariant", config.result_dir, id, time_point);
+        let output_vcf = format!("{output_dir}/{}_{}_DeepVariant.vcf.gz", id, time_point);
         let vcf_passed = format!(
             "{}/{}_{}_DeepVariant_PASSED.vcf.gz",
-            output_dir, id,time_point
+            output_dir, id, time_point
         );
+        let log_dir = format!("{}/{}/log/DeepVariant", config.result_dir, id);
 
         Self {
             id: id.to_string(),
@@ -66,15 +61,19 @@ impl DeepVariant {
             output_dir,
             output_vcf,
             vcf_passed,
+            log_dir,
         }
     }
 
-    pub fn run(mut self) -> Self {
+    pub fn run(&self) {
         // Create out dir
         if !Path::new(&self.output_dir).exists() {
             fs::create_dir_all(&self.output_dir).expect("Failed to create output directory");
         }
-        
+        if !Path::new(&self.log_dir).exists() {
+            fs::create_dir_all(&self.log_dir).expect("Failed to create log directory");
+        }
+
         // Run Docker command if output VCF doesn't exist
         if !Path::new(&self.output_vcf).exists() {
             let mut docker_run = DockerRun::new(&[
@@ -105,33 +104,22 @@ impl DeepVariant {
                 "--sample_name",
                 &format!("{}_{}", self.id, self.time_point),
             ]);
-            self.log = run_wait(&mut docker_run).unwrap().log;
+            let report = run_wait(&mut docker_run).unwrap();
+            report.save_to_file(&format!("{}/deepvariant_", self.log_dir)).unwrap();
         }
 
         // Keep PASS
         if !Path::new(&self.vcf_passed).exists() {
             info!("Filtering PASS variants");
-            let status = Command::new(&self.config.bcftools)
-                .args([
-                    "view",
-                    "--write-index",
-                    "--threads",
-                    "20",
-                    "-i",
-                    "FILTER='PASS'",
-                    &self.output_vcf,
-                    "-o",
-                    &self.vcf_passed,
-                ])
-                .stdout(Stdio::inherit())
-                .stderr(Stdio::inherit())
-                .status()
-                .expect("Failed to filter VCF with bcftools");
-
-            if !status.success() {
-                warn!("bcftools command failed with status: {}", status);
-            }
+            let report = bcftools_keep_pass(
+                &self.output_vcf,
+                &self.vcf_passed,
+                BcftoolsConfig::default(),
+            )
+            .unwrap();
+            report
+                .save_to_file(&format!("{}/bcftools_pass_", self.log_dir))
+                .unwrap();
         }
-        self
     }
 }

+ 29 - 177
src/callers/nanomonsv.rs

@@ -1,67 +1,6 @@
-// reference="/data/ref/hs1/chm13v2.0.fa"
-// bcftools="/data/tools/bcftools-1.18/bcftools"
-//
-// DIAG_CASE_DIR="/data/longreads_basic_pipe/${name}/diag"
-// MRD_CASE_DIR="/data/longreads_basic_pipe/${name}/mrd"
-//
-// DIAG_OUTPUT_DIR="${DIAG_CASE_DIR}/nanomonsv"
-// MRD_OUTPUT_DIR="${MRD_CASE_DIR}/nanomonsv"
-//
-// DIAG_BAM="${DIAG_CASE_DIR}/${name}_diag_hs1.bam"
-// MRD_BAM="${MRD_CASE_DIR}/${name}_mrd_hs1.bam"
-//
-// DIAG_OUT_PREFIX="${DIAG_OUTPUT_DIR}/${name}_diag"
-// MRD_OUT_PREFIX="${MRD_OUTPUT_DIR}/${name}_mrd"
-//
-// if [ ! -d "$DIAG_OUTPUT_DIR" ]; then
-//   mkdir "$DIAG_OUTPUT_DIR"
-// fi
-//
-// if [ ! -d "$MRD_OUTPUT_DIR" ]; then
-//   mkdir "$MRD_OUTPUT_DIR"
-// fi
-//
-// # PARSE
-// if [ ! -f "${DIAG_OUT_PREFIX}.bp_info.sorted.bed.gz" ]; then
-//   nanomonsv parse --reference_fasta "$reference" "$DIAG_BAM" "$DIAG_OUT_PREFIX"
-// fi
-//
-// if [ ! -f "${MRD_OUT_PREFIX}.bp_info.sorted.bed.gz" ]; then
-//   echo "[pipe] NanomonSV parse ${MRD_BAM}"
-//   nanomonsv parse --reference_fasta "$reference" "$MRD_BAM" "$MRD_OUT_PREFIX"
-// fi
-//
-// # GET
-// if [ ! -f "${DIAG_OUT_PREFIX}.nanomonsv.result.vcf" ]; then
-//   nanomonsv get --control_prefix "$MRD_OUT_PREFIX" \
-//     --control_bam "$MRD_BAM" \
-//     --processes 155 \
-//     "$DIAG_OUT_PREFIX" \
-//     "$DIAG_BAM" "$reference"
-// fi
-//
-// # GET MRD
-// if [ ! -f "${MRD_OUT_PREFIX}.nanomonsv.result.vcf" ]; then
-//   nanomonsv get \
-//     --processes 155 \
-//     "$MRD_OUT_PREFIX" \
-//     "$MRD_BAM" "$reference"
-// fi
-//
-// vcf_passed="${DIAG_OUT_PREFIX}_nanomonsv_PASSED.vcf.gz"
-// if [ ! -f "$vcf_passed" ]; then
-//   $bcftools sort "${DIAG_OUT_PREFIX}.nanomonsv.result.vcf" | \
-//     $bcftools view -s "TUMOR" --write-index --threads 20 \
-//     -i "FILTER='PASS'" \
-//     -o "$vcf_passed"
-// fi
-
 use std::{
     fs,
-    // io::{BufRead, BufReader},
     path::Path,
-    // process::{Command, Stdio},
-    // sync::{Arc, Mutex},
     thread,
 };
 
@@ -99,6 +38,7 @@ pub struct NanomonSV {
     pub diag_out_dir: String,
     pub mrd_out_dir: String,
     pub log: String,
+    pub log_dir: String,
     pub config: NanomonSVConfig,
 }
 
@@ -106,6 +46,7 @@ impl NanomonSV {
     pub fn new(id: &str, diag_bam: &str, mrd_bam: &str, config: NanomonSVConfig) -> Self {
         let diag_out_dir = format!("{}/{id}/diag/nanomonsv", &config.result_dir);
         let mrd_out_dir = format!("{}/{id}/mrd/nanomonsv", &config.result_dir);
+        let log_dir = format!("{}/{id}/log/nanomonsv", &config.result_dir);
 
         NanomonSV {
             id: id.to_string(),
@@ -115,6 +56,7 @@ impl NanomonSV {
             mrd_out_dir,
             log: String::default(),
             config,
+            log_dir,
         }
     }
 
@@ -126,6 +68,9 @@ impl NanomonSV {
         if !Path::new(&self.mrd_out_dir).exists() {
             fs::create_dir_all(&self.mrd_out_dir).unwrap();
         }
+        if !Path::new(&self.log_dir).exists() {
+            fs::create_dir_all(&self.log_dir).unwrap();
+        }
 
         // Parse
         info!("Nanomonsv Parse");
@@ -135,79 +80,31 @@ impl NanomonSV {
         let diag_info_vcf = format!("{diag_out_prefix}.bp_info.sorted.bed.gz");
         let mrd_info_vcf = format!("{mrd_out_prefix}.bp_info.sorted.bed.gz");
 
-        // let log: Arc<Mutex<String>> = Arc::new(Mutex::new(String::default()));
-
         let mut threads_handles = Vec::new();
         if !Path::new(&diag_info_vcf).exists() {
-            // let bin = self.config.bin.clone();
-            // let reference = self.config.reference.clone();
             let diag_bam = self.diag_bam.clone();
-            // let log = log.clone();
             let diag_out_prefix = diag_out_prefix.clone();
             let config = self.config.clone();
-
+            let log_dir = self.log_dir.clone();
             let handle = thread::spawn(move || {
-                nanomonsv_parse(&diag_bam, &diag_out_prefix, config).unwrap();
-                // let mut child = Command::new(bin)
-                //     .args([
-                //         "parse",
-                //         "--reference_fasta",
-                //         &reference,
-                //         &diag_bam,
-                //         &diag_out_prefix,
-                //     ])
-                //     .stdout(Stdio::piped())
-                //     .stderr(Stdio::inherit())
-                //     .spawn()
-                //     .expect("Failed to spawn NanomonSV parse");
-                //
-                // if let Some(stdout) = child.stdout.take() {
-                //     let reader = BufReader::new(stdout);
-                //     for line in reader.lines().map_while(Result::ok) {
-                //         info!("{}", line);
-                //         let mut log_lock = log.lock().unwrap();
-                //         log_lock.push_str(&line);
-                //         log_lock.push('\n');
-                //     }
-                // }
-                // child.wait().unwrap();
+                let report = nanomonsv_parse(&diag_bam, &diag_out_prefix, config).unwrap();
+                report
+                    .save_to_file(&format!("{log_dir}/nanomonsv_parse_diag_"))
+                    .unwrap();
             });
             threads_handles.push(handle);
         }
 
         if !Path::new(&mrd_info_vcf).exists() {
-            // let bin = self.config.bin.clone();
-            // let reference = self.config.reference.clone();
-            // let log = log.clone();
             let mrd_out_prefix = mrd_out_prefix.clone();
             let mrd_bam = self.mrd_bam.clone();
             let config = self.config.clone();
-
+            let log_dir = self.log_dir.clone();
             let handle = thread::spawn(move || {
-                nanomonsv_parse(&mrd_bam, &mrd_out_prefix, config).unwrap();
-                // let mut child = Command::new(bin)
-                //     .args([
-                //         "parse",
-                //         "--reference_fasta",
-                //         &reference,
-                //         &mrd_bam,
-                //         &mrd_out_prefix,
-                //     ])
-                //     .stdout(Stdio::inherit())
-                //     .stderr(Stdio::piped())
-                //     .spawn()
-                //     .expect("Failed to spawn NanomonSV parse");
-                //
-                // if let Some(stdout) = child.stderr.take() {
-                //     let reader = BufReader::new(stdout);
-                //     for line in reader.lines().map_while(Result::ok) {
-                //         info!("{}", line);
-                //         let mut log_lock = log.lock().unwrap();
-                //         log_lock.push_str(&line);
-                //         log_lock.push('\n');
-                //     }
-                // }
-                // child.wait().unwrap();
+                let report = nanomonsv_parse(&mrd_bam, &mrd_out_prefix, config).unwrap();
+                report
+                    .save_to_file(&format!("{log_dir}/nanomonsv_parse_mrd_"))
+                    .unwrap();
             });
             threads_handles.push(handle);
         }
@@ -223,7 +120,7 @@ impl NanomonSV {
         let mrd_result_vcf = format!("{mrd_out_prefix}.nanomonsv.result.vcf");
 
         if !Path::new(&mrd_result_vcf).exists() {
-            nanomonsv_get(
+            let report = nanomonsv_get(
                 &self.mrd_bam,
                 &mrd_out_prefix,
                 None,
@@ -231,76 +128,31 @@ impl NanomonSV {
                 self.config.clone(),
             )
             .unwrap();
-            // let mut child = Command::new(&self.config.bin)
-            //     .args([
-            //         "get",
-            //         "--process",
-            //         &self.config.thread.to_string(),
-            //         &mrd_out_prefix,
-            //         &self.mrd_bam,
-            //         &self.config.reference,
-            //     ])
-            //     .stdout(Stdio::inherit())
-            //     .stderr(Stdio::piped())
-            //     .spawn()
-            //     .expect("Failed to spawn nanomonsv");
-            //
-            // if let Some(stdout) = child.stderr.take() {
-            //     let reader = BufReader::new(stdout);
-            //     for line in reader.lines().map_while(Result::ok) {
-            //         info!("{line}");
-            //         let mut log_lock = log.lock().unwrap();
-            //         log_lock.push_str(&line);
-            //         log_lock.push('\n');
-            //     }
-            // }
-            //
-            // child.wait().unwrap();
+            report
+                .save_to_file(&format!("{}/nanomonsv_get_mrd_", self.log_dir))
+                .unwrap();
         }
 
         if !Path::new(&diag_result_vcf).exists() {
-            nanomonsv_get(
-                &self.mrd_bam,
+            let report = nanomonsv_get(
+                &self.diag_bam,
                 &diag_out_prefix,
                 Some(&self.mrd_bam),
                 Some(&mrd_out_prefix),
                 self.config.clone(),
             )
             .unwrap();
-            // let mut child = Command::new(&self.config.bin)
-            //     .args([
-            //         "get",
-            //         "--control_prefix",
-            //         &mrd_out_prefix,
-            //         "--control_bam",
-            //         &self.mrd_bam,
-            //         "--process",
-            //         &self.config.thread.to_string(),
-            //         &diag_out_prefix,
-            //         &self.diag_bam,
-            //         &self.config.reference,
-            //     ])
-            //     .stdout(Stdio::inherit())
-            //     .stderr(Stdio::piped())
-            //     .spawn()
-            //     .expect("Failed to spawn nanomonsv");
-            //
-            // if let Some(stdout) = child.stderr.take() {
-            //     let reader = BufReader::new(stdout);
-            //     for line in reader.lines().map_while(Result::ok) {
-            //         info!("{line}");
-            //         let mut log_lock = log.lock().unwrap();
-            //         log_lock.push_str(&line);
-            //         log_lock.push('\n');
-            //     }
-            // }
-            //
-            // child.wait().unwrap();
+            report
+                .save_to_file(&format!("{}/nanomonsv_get_diag_", self.log_dir,))
+                .unwrap();
         }
 
         let vcf_passed = format!("{diag_out_prefix}_nanomonsv_PASSED.vcf.gz");
         if !Path::new(&vcf_passed).exists() {
-            bcftools_keep_pass(&diag_result_vcf, &vcf_passed, BcftoolsConfig::default()).unwrap();
+            let report  = bcftools_keep_pass(&diag_result_vcf, &vcf_passed, BcftoolsConfig::default()).unwrap();
+            report
+                .save_to_file(&format!("{}/bcftools_pass_", self.log_dir,))
+                .unwrap();
         }
     }
 }

+ 18 - 5
src/runners.rs

@@ -1,12 +1,11 @@
 use std::{
-    io::{BufRead, BufReader},
-    process::{Child, Command, Stdio},
-    sync::{Arc, Mutex},
-    thread,
+    fs::File, io::{BufRead, BufReader, Write}, process::{Child, Command, Stdio}, sync::{Arc, Mutex}, thread
 };
 
 use chrono::{DateTime, Utc};
 use log::{info, warn};
+use serde::{Deserialize, Serialize};
+use uuid::Uuid;
 
 pub trait Run {
     fn run(&mut self) -> anyhow::Result<()>;
@@ -31,13 +30,27 @@ pub fn run_wait<T: RunWait>(item: &mut T) -> anyhow::Result<RunReport> {
     Ok(run_report)
 }
 
-#[derive(Debug, Default)]
+#[derive(Debug, Default, Serialize, Deserialize)]
 pub struct RunReport {
     pub start: DateTime<Utc>,
     pub end: DateTime<Utc>,
     pub log: String,
 }
 
+impl RunReport {
+    /// Serialize the RunReport to a JSON string
+    pub fn save_to_file(&self, file_prefix: &str) -> std::io::Result<()> {
+        let json_data = serde_json::to_string_pretty(self)
+            .expect("Failed to serialize RunReport");
+        let uuid = Uuid::new_v4().to_string()[..5].to_string();
+        let file_path = format!("{}{}.log", file_prefix, uuid);
+        let mut file = File::create(&file_path)?;
+        file.write_all(json_data.as_bytes())?;
+        Ok(())
+    }
+}
+
+
 pub trait Log {
     fn log(&self) -> String;
 }