Thomas 1 day ago
parent
commit
09c26b3b2c
1 changed files with 43 additions and 3 deletions
  1. 43 3
      src/scan/scan.rs

+ 43 - 3
src/scan/scan.rs

@@ -47,6 +47,7 @@
 //!
 use std::collections::HashMap;
 use std::path::Path;
+use std::sync::atomic::{AtomicU64, Ordering};
 use std::{fmt, fs, io::Write};
 
 use anyhow::Context;
@@ -58,6 +59,8 @@ use rust_htslib::bam::{self, IndexedReader, Read, Record};
 
 use crate::helpers::{bam_contigs, get_genome_sizes, is_file_older};
 use crate::io::bam::fb_inv_from_record;
+use crate::io::readers::get_gz_reader;
+use crate::io::tsv::tsv_reader;
 use crate::io::writers::get_gz_writer;
 use crate::math::filter_outliers_modified_z_score_with_indices;
 
@@ -664,6 +667,8 @@ pub fn scan_contig(
     Ok(out)
 }
 
+static TMP_COUNTER: AtomicU64 = AtomicU64::new(0);
+
 /// Performs a whole-genome scan on a BAM file, counting reads in fixed-size bins and writing
 /// one TSV (gzipped) file per contig.
 ///
@@ -758,10 +763,9 @@ pub fn par_whole_scan(id: &str, time_point: &str, config: &Config) -> anyhow::Re
                 .with_context(|| format!("failed to create output dir: {out_parent:?}"))?;
 
             let tmp_path = out_parent.join(format!(
-                ".{}.{}.{}.tmp.gz",
-                contig,
+                ".{contig}.{}.{}.tmp.tsv.gz",
                 std::process::id(),
-                std::thread::current().name().unwrap_or("rayon")
+                TMP_COUNTER.fetch_add(1, Ordering::Relaxed),
             ));
 
             let tmp_path_str = tmp_path
@@ -788,6 +792,10 @@ pub fn par_whole_scan(id: &str, time_point: &str, config: &Config) -> anyhow::Re
                 })?;
             }
 
+            validate_count_file(tmp_path_str, contig, bins.len()).with_context(|| {
+                format!("invalid temp count file before rename: {tmp_path_str}")
+            })?;
+
             std::fs::rename(&tmp_path, &out_file)
                 .with_context(|| format!("failed atomic rename {tmp_path_str} -> {out_file}"))?;
 
@@ -797,6 +805,38 @@ pub fn par_whole_scan(id: &str, time_point: &str, config: &Config) -> anyhow::Re
     Ok(())
 }
 
+fn validate_count_file(path: &str, expected_contig: &str, expected_rows: usize) -> anyhow::Result<()> {
+    let rdr = get_gz_reader(path)?;
+    let mut tsv = tsv_reader(rdr);
+    let mut rec = csv::ByteRecord::new();
+
+    let mut n = 0usize;
+    while tsv.read_byte_record(&mut rec)
+        .with_context(|| format!("failed reading validation record in {path} around line {}", n + 1))?
+    {
+        n += 1;
+
+        anyhow::ensure!(
+            rec.len() == 12,
+            "{path} line {n}: expected 12 fields, got {}",
+            rec.len()
+        );
+
+        anyhow::ensure!(
+            rec.get(0) == Some(expected_contig.as_bytes()),
+            "{path} line {n}: unexpected contig {:?}, expected {expected_contig}",
+            rec.get(0).map(String::from_utf8_lossy)
+        );
+    }
+
+    anyhow::ensure!(
+        n == expected_rows,
+        "{path}: expected {expected_rows} rows, got {n}"
+    );
+
+    Ok(())
+}
+
 /// Identifies and marks outliers in a slice of `BinCount` objects based on various ratio metrics.
 ///
 /// # Parameters