Răsfoiți Sursa

README.md for jq snippets // better nanomonsv parse and variants saving

Thomas 1 an în urmă
părinte
comite
3b783dbb80
4 a modificat fișierele cu 104 adăugiri și 79 ștergeri
  1. 7 0
      src/README.md
  2. 55 47
      src/callers/nanomonsv.rs
  3. 9 2
      src/pipes/somatic.rs
  4. 33 30
      src/variant/variant_collection.rs

+ 7 - 0
src/README.md

@@ -0,0 +1,7 @@
+== jq for selecting
+
+* Somatic Variants of chrM (25) 
+```
+ zcat /data/longreads_basic_pipe/*/diag/somatic_variants.json.gz | jq -C '[.data[] | select(.position.contig == 25 and ((.annotations[] | select(.ConstitAlt != null).ConstitAlt) // 0) <= 1 ) | del(.hash) | (.vcf_variants[] |= (del(.hash,.position,.reference,.alternative) | .infos |= map(select(. != "Empty"))))]'
+
+```

+ 55 - 47
src/callers/nanomonsv.rs

@@ -126,21 +126,14 @@ impl Variants for NanomonSV {
     fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
         let caller = self.caller_cat();
         let add = vec![caller.clone()];
-        info!(
-            "Loading variants from {}: {}",
-            caller, self.vcf_passed
-        );
+        info!("Loading variants from {}: {}", caller, self.vcf_passed);
 
         let variants = read_vcf(&self.vcf_passed)?;
 
         variants.par_iter().for_each(|v| {
             annotations.insert_update(v.hash(), &add);
         });
-        info!(
-            "{}, {} variants loaded.",
-            caller,
-            variants.len()
-        );
+        info!("{}, {} variants loaded.", caller, variants.len());
         Ok(VariantCollection {
             variants,
             vcf: Vcf::new(self.vcf_passed.clone().into())?,
@@ -272,10 +265,7 @@ impl Variants for NanomonSVSolo {
 
         variants.par_iter().for_each(|v| {
             // let var_cat = v.alteration_category();
-            annotations.insert_update(
-                v.hash(),
-                &add,
-            );
+            annotations.insert_update(v.hash(), &add);
         });
         Ok(VariantCollection {
             variants,
@@ -306,48 +296,66 @@ pub fn somatic_parse(s: &NanomonSV) -> anyhow::Result<()> {
     let diag_info_vcf = format!("{diag_out_prefix}.bp_info.sorted.bed.gz");
     let mrd_info_vcf = format!("{mrd_out_prefix}.bp_info.sorted.bed.gz");
 
-    let mut threads_handles = Vec::new();
-    if !Path::new(&diag_info_vcf).exists() {
-        let diag_bam = s.diag_bam.clone();
-        info!("Nanomonsv parse {diag_bam}.");
-        let diag_out_prefix = diag_out_prefix.clone();
-        let config = s.config.clone();
-        let log_dir = s.log_dir.clone();
-        let handle = thread::spawn(move || {
-            let report = nanomonsv_parse(&diag_bam, &diag_out_prefix, &config).unwrap();
-            report
-                .save_to_file(&format!("{log_dir}/nanomonsv_parse_diag_"))
-                .unwrap();
-        });
-        threads_handles.push(handle);
-    } else {
-        debug!("Nanonmonsv parse results already exists: {diag_info_vcf}");
+    let threads_handles = vec![
+        spawn_parse_thread(
+            &s.diag_bam,
+            &diag_out_prefix,
+            &s.config,
+            &s.log_dir,
+            &diag_info_vcf,
+        )?,
+        spawn_parse_thread(
+            &s.mrd_bam,
+            &mrd_out_prefix,
+            &s.config,
+            &s.log_dir,
+            &mrd_info_vcf,
+        )?,
+    ];
+
+    // Wait for all threads to finish and propagate errors
+    for handle in threads_handles {
+        handle
+            .join()
+            .map_err(|_| anyhow::anyhow!("Nanomonsv Thread panicked"))??;
     }
 
-    if !Path::new(&mrd_info_vcf).exists() {
-        let mrd_bam = s.mrd_bam.clone();
-        info!("Nanomonsv parse {mrd_bam}.");
-        let mrd_out_prefix = mrd_out_prefix.clone();
+    info!("Nanomonsv parsing completed successfully.");
 
-        let config = s.config.clone();
-        let log_dir = s.log_dir.clone();
+    Ok(())
+}
+
+// Helper function to spawn a thread for parsing
+fn spawn_parse_thread(
+    bam: &str,
+    out_prefix: &str,
+    config: &Config,
+    log_dir: &str,
+    info_vcf: &str,
+) -> anyhow::Result<thread::JoinHandle<anyhow::Result<()>>> {
+    if !Path::new(info_vcf).exists() {
+        let bam = bam.to_string();
+        let out_prefix = out_prefix.to_string();
+        let config = config.clone();
+        let log_dir = log_dir.to_string();
+
+        info!("Nanomonsv parsing started for BAM: {}", bam);
         let handle = thread::spawn(move || {
-            let report = nanomonsv_parse(&mrd_bam, &mrd_out_prefix, &config).unwrap();
+            let report = nanomonsv_parse(&bam, &out_prefix, &config)
+                .with_context(|| format!("Failed to parse BAM: {}", bam))?;
+
             report
-                .save_to_file(&format!("{log_dir}/nanomonsv_parse_mrd_"))
-                .unwrap();
+                .save_to_file(&format!("{log_dir}/nanomonsv_parse_{}_", bam))
+                .with_context(|| format!("Failed to save report for BAM: {}", bam))?;
+
+            Ok(())
         });
-        threads_handles.push(handle);
-    } else {
-        debug!("Nanonmonsv parse results already exists: {mrd_info_vcf}");
-    }
 
-    // Wait for all threads to finish
-    for handle in threads_handles {
-        handle.join().expect("Thread panicked for nanomonsv parse");
+        Ok(handle)
+    } else {
+        debug!("Nanomonsv parse results already exist: {}", info_vcf);
+        Ok(thread::spawn(|| Ok(()))) // Return a dummy thread that does nothing
     }
-
-    Ok(())
 }
 
 pub fn nanomonsv_get(

+ 9 - 2
src/pipes/somatic.rs

@@ -233,9 +233,16 @@ impl SomaticStats {
 
 impl Run for Somatic {
     fn run(&mut self) -> anyhow::Result<()> {
+        let config = self.config.clone();
         let id = self.id.clone();
+
+        let result_json = format!("{}/somatic_variants.json.gz", config.tumoral_dir(&id));
+
+        if Path::new(&result_json).exists() {
+            return Err(anyhow::anyhow!("already exists"));
+        }
+
         info!("Running somatic pipe for {id}.");
-        let config = self.config.clone();
         let annotations = Arc::new(self.annotations.clone());
 
         // Stats dir
@@ -486,7 +493,7 @@ impl Run for Somatic {
         );
 
         info!("Final unique variants: {}", variants.data.len());
-        variants.save_to_json(&format!("{}/somatic_variants.json.gz", config.tumoral_dir(&id)))?;
+        variants.save_to_json(&result_json)?;
 
         Ok(())
     }

+ 33 - 30
src/variant/variant_collection.rs

@@ -239,26 +239,19 @@ impl Variants {
                     ));
                 }
                 std::cmp::Ordering::Equal => {
-                    match (
-                        self_variant.reference == other_variant.reference,
-                        self_variant.alternative == other_variant.alternative,
-                    ) {
-                        (true, true) => {
-                            let mut merged_variant = self_iter.next().unwrap();
-
-                            merged_variant
-                                .vcf_variants
-                                .push(others_iter.next().unwrap());
-                            n_merged += 1;
-                            result.push(merged_variant);
-                        }
-                        _ => {
-                            result.push(self_iter.next().unwrap());
-                            result.push(create_variant(
-                                vec![others_iter.next().unwrap()],
-                                annotations,
-                            ));
-                        }
+                    let self_variant = self_iter.next().unwrap();
+                    let other_variant = others_iter.next().unwrap();
+
+                    if self_variant.reference == other_variant.reference
+                        && self_variant.alternative == other_variant.alternative
+                    {
+                        let mut merged_variant = self_variant;
+                        merged_variant.vcf_variants.push(other_variant);
+                        n_merged += 1;
+                        result.push(merged_variant);
+                    } else {
+                        result.push(self_variant);
+                        result.push(create_variant(vec![other_variant], annotations));
                     }
                 }
             }
@@ -273,23 +266,33 @@ impl Variants {
     }
 
     pub fn save_to_json(&self, filename: &str) -> anyhow::Result<()> {
-        let json = serde_json::to_string(self)?;
-        let file = File::create(filename)?;
+        let file = File::create(filename)
+            .with_context(|| format!("Failed to create file: {}", filename))?;
         let mut writer = BGZFWriter::new(file, bgzip::Compression::default());
-        writer.write_all(json.as_bytes())?;
-        writer.close()?;
+
+        serde_json::to_writer(&mut writer, self)
+            .with_context(|| format!("Failed to serialize JSON to file: {}", filename))?;
+
+        writer
+            .close()
+            .with_context(|| format!("Failed to close BGZF writer for file: {}", filename))?;
+
+        debug!("Successfully saved variants to {}", filename);
         Ok(())
     }
 
     pub fn load_from_json(filename: &str) -> anyhow::Result<Self> {
-        let file = File::open(filename)?;
-        let mut reader = BGZFReader::new(file)?;
-        let mut json = String::new();
-        reader.read_to_string(&mut json)?;
-        let variants: Variants = serde_json::from_str(&json)?;
+        let file = File::open(filename)
+            .with_context(|| format!("Failed to open file: {}", filename))?;
+        let mut reader = BGZFReader::new(file)
+            .with_context(|| format!("Failed to create BGZF reader for file: {}", filename))?;
+
+        let variants: Self = serde_json::from_reader(&mut reader)
+            .with_context(|| format!("Failed to deserialize JSON from file: {}", filename))?;
+
+        debug!("Successfully loaded variants from {}", filename);
         Ok(variants)
     }
-
 }
 
 fn create_variant(vcf_variants: Vec<VcfVariant>, annotations: &Annotations) -> Variant {