浏览代码

better concurrency in job execution

Thomas 6 天之前
父节点
当前提交
1ce719023c
共有 5 个文件被更改,包括 628 次插入185 次删除
  1. 13 11
      src/callers/mod.rs
  2. 15 4
      src/callers/nanomonsv.rs
  3. 2 2
      src/callers/savana.rs
  4. 133 1
      src/scan/bin.rs
  5. 465 167
      src/scan/scan.rs

+ 13 - 11
src/callers/mod.rs

@@ -144,7 +144,8 @@ use crate::{
     commands::longphase::run_phasing_somatic,
     config::Config,
     pipes::{Initialize, InitializeSolo},
-    runners::Run, scan::scan::SomaticScan,
+    runners::Run,
+    scan::scan::SomaticScan,
 };
 
 pub mod clairs;
@@ -225,8 +226,6 @@ pub fn run_somatic_callers(id: &str, config: &Config) -> anyhow::Result<()> {
 
     run_phasing_somatic(id, config)?;
 
-
-
     // if slurm send jobs in parallel else run caller sequentially
     if config.slurm_runner {
         let config = Arc::new(config.clone());
@@ -264,9 +263,7 @@ pub fn run_somatic_callers(id: &str, config: &Config) -> anyhow::Result<()> {
             {
                 let config = Arc::clone(&config);
                 let id = Arc::clone(&id);
-                thread::spawn(move || -> anyhow::Result<()> {
-                    DeepSomatic::initialize(&id, &config)?.run()
-                })
+                thread::spawn(move || -> anyhow::Result<()> { run_chunkeds(&id, &config) })
             },
         ];
 
@@ -277,10 +274,17 @@ pub fn run_somatic_callers(id: &str, config: &Config) -> anyhow::Result<()> {
         Severus::initialize(id, config)?.run()?;
         Savana::initialize(id, config)?.run()?;
         NanomonSV::initialize(id, config)?.run()?;
-        // DeepSomatic - somatic SNV/indels
-        DeepSomatic::initialize(id, config)?.run()?;
+
+        run_chunkeds(id, config)?;
     }
 
+    Ok(())
+}
+
+pub fn run_chunkeds(id: &str, config: &Config) -> anyhow::Result<()> {
+    // DeepSomatic - somatic SNV/indels
+    DeepSomatic::initialize(id, config)?.run()?;
+
     // DeepVariant - germline variants for normal sample
     DeepVariant::initialize(id, &config.normal_name, config)?.run()?;
 
@@ -288,9 +292,7 @@ pub fn run_somatic_callers(id: &str, config: &Config) -> anyhow::Result<()> {
     DeepVariant::initialize(id, &config.tumoral_name, config)?.run()?;
 
     // Straglr - Short Tandem Repeat (STR) genotyper
-    Straglr::initialize(id, config)?.run()?;
-
-    Ok(())
+    Straglr::initialize(id, config)?.run()
 }
 
 #[cfg(test)]

+ 15 - 4
src/callers/nanomonsv.rs

@@ -81,12 +81,23 @@ use anyhow::Context;
 use log::{debug, error, info, warn};
 
 use crate::{
-    annotation::{Annotation, Annotations, Caller, CallerCat, Sample}, collection::vcf::Vcf, commands::{
-        CapturedOutput, Command as JobCommand, LocalRunner, SbatchRunner, SlurmParams, SlurmRunner, bcftools::{BcftoolsConcat, BcftoolsKeepPass}
-    }, config::Config, helpers::{is_file_older, remove_dir_if_exists}, io::vcf::read_vcf, locker::SampleLock, pipes::{Initialize, InitializeSolo, ShouldRun, Version}, run, runners::Run, variant::{
+    annotation::{Annotation, Annotations, Caller, CallerCat, Sample},
+    collection::vcf::Vcf,
+    commands::{
+        bcftools::{BcftoolsConcat, BcftoolsKeepPass},
+        CapturedOutput, Command as JobCommand, LocalRunner, SbatchRunner, SlurmParams, SlurmRunner,
+    },
+    config::Config,
+    helpers::{is_file_older, remove_dir_if_exists},
+    io::vcf::read_vcf,
+    locker::SampleLock,
+    pipes::{Initialize, InitializeSolo, ShouldRun, Version},
+    run,
+    runners::Run,
+    variant::{
         variant_collection::VariantCollection,
         vcf_variant::{Label, Variants},
-    }
+    },
 };
 
 /// NanomonSV paired (tumor-normal) structural variant caller.

+ 2 - 2
src/callers/savana.rs

@@ -206,7 +206,7 @@ impl LocalRunner for Savana {}
 impl SlurmRunner for Savana {
     fn slurm_args(&self) -> Vec<String> {
         SlurmParams {
-            job_name: Some("savana".into()),
+            job_name: Some(format!("savana_{}", self.id)),
             cpus_per_task: Some(self.config.savana_threads as u32),
             mem: Some("80G".into()),
             partition: Some("mediumq".into()),
@@ -218,7 +218,7 @@ impl SlurmRunner for Savana {
 impl SbatchRunner for Savana {
     fn slurm_params(&self) -> SlurmParams {
         SlurmParams {
-            job_name: Some("savana".into()),
+            job_name: Some(format!("savana_{}", self.id)),
             cpus_per_task: Some(self.config.savana_threads as u32),
             mem: Some("80G".into()),
             partition: Some("mediumq".into()),

+ 133 - 1
src/scan/bin.rs

@@ -3,7 +3,7 @@ use std::collections::HashMap;
 use anyhow::Context;
 use csv::ByteRecord;
 use log::error;
-use rust_htslib::bam::{ext::BamRecordExtensions, record::Aux, IndexedReader, Read, Record};
+use rust_htslib::bam::{HeaderView, IndexedReader, Read, Record, ext::BamRecordExtensions, record::Aux};
 
 use crate::io::{bam::{fb_inv_from_record, primary_record, primary_records}, tsv::{parse_csv_u32_into, parse_u32}};
 
@@ -417,3 +417,135 @@ pub fn parse_bin_record_into<'a>(
     Ok((start, &buf.depths, &buf.lowq))
 }
 
+#[derive(Debug)]
+pub struct BinStats {
+    pub contig: String,
+    pub start: u32,
+    pub end: u32,
+
+    pub n_reads: u32,
+    pub n_sa: u32,
+    pub max_start_count: u32,
+    pub max_end_count: u32,
+
+    pub depths: Vec<u32>,
+    pub low_qualities: Vec<u32>,
+    // keep fb_invs too if you want
+    pub fb_invs: Vec<u32>,
+}
+
+impl BinStats {
+    #[inline]
+    pub fn mean_coverage_from_depths(&self) -> f64 {
+        if self.depths.is_empty() { return 0.0; }
+        self.depths.iter().sum::<u32>() as f64 / self.depths.len() as f64
+    }
+}
+
+impl BinStats {
+    pub fn from_bam_region(
+        bam_reader: &mut IndexedReader,
+        header: &HeaderView,
+        contig: &str,
+        start: u32,
+        length: u32,
+        min_mapq: u8,
+    ) -> anyhow::Result<Self> {
+        anyhow::ensure!(length > 0, "bin length must be > 0");
+        let end = start + length - 1;
+
+        bam_reader
+            .fetch((contig, start as i64, end as i64))
+            .with_context(|| format!("Failed to fetch region {}:{}-{}", contig, start, end))?;
+
+        let mut depths = vec![0u32; length as usize];
+        let mut low_qualities = vec![0u32; length as usize];
+        let mut fb_invs = vec![0u32; length as usize];
+
+        let mut n_reads: u32 = 0;
+        let mut n_sa: u32 = 0;
+
+        let mut start_counts: HashMap<u32, u32> = HashMap::new();
+        let mut end_counts: HashMap<u32, u32> = HashMap::new();
+
+        for record_result in bam_reader.rc_records() {
+            let rc_record = match record_result {
+                Ok(rc) => rc,
+                Err(e) => {
+                    error!(
+                        "Failed to parse record in {}:{}-{}: {}",
+                        contig, start, end, e
+                    );
+                    continue;
+                }
+            };
+            let record: &Record = rc_record.as_ref();
+
+            // read genomic span (inclusive)
+            let rs_i64 = record.pos();
+            let re_i64 = record.cigar().end_pos(); // exclusive
+            if rs_i64 < 0 || re_i64 <= rs_i64 {
+                continue;
+            }
+            let read_start = rs_i64 as u32;
+            let read_end = (re_i64 as u32).saturating_sub(1);
+
+            if read_end < start || read_start > end {
+                continue;
+            }
+
+            // counts
+            n_reads += 1;
+
+            if matches!(record.aux(b"SA"), Ok(Aux::String(_))) {
+                n_sa += 1;
+            }
+
+            let aln_start = record.reference_start() as u32;
+            let aln_end = record.reference_end() as u32;
+            if (start..=end).contains(&aln_start) {
+                *start_counts.entry(aln_start).or_insert(0) += 1;
+            }
+            if (start..=end).contains(&aln_end) {
+                *end_counts.entry(aln_end).or_insert(0) += 1;
+            }
+
+            // per-record flags (hoisted)
+            let is_lowq = record.mapq() < min_mapq;
+            let has_fbinv =
+                fb_inv_from_record(record, header, min_mapq, Some(150), Some(200)).is_some();
+
+            // per-base accumulation (same semantics as your original)
+            let local_start = start.max(read_start);
+            let local_end = end.min(read_end);
+
+            for pos in local_start..=local_end {
+                let i = (pos - start) as usize;
+                depths[i] += 1;
+                if is_lowq {
+                    low_qualities[i] += 1;
+                }
+                if has_fbinv {
+                    fb_invs[i] += 1;
+                }
+            }
+        }
+
+        let max_start_count = start_counts.values().copied().max().unwrap_or(0);
+        let max_end_count = end_counts.values().copied().max().unwrap_or(0);
+
+        Ok(Self {
+            contig: contig.to_string(),
+            start,
+            end,
+            n_reads,
+            n_sa,
+            max_start_count,
+            max_end_count,
+            depths,
+            low_qualities,
+            fb_invs,
+        })
+    }
+}
+

+ 465 - 167
src/scan/scan.rs

@@ -1,47 +1,99 @@
+//! # Somatic scan: contig-streaming bin counts
+//!
+//! This module performs a whole-genome scan over BAM alignments and summarizes read signals in
+//! fixed-size genomic bins.
+//!
+//! ## Output format (TSV)
+//! Each output line corresponds to one genomic bin and is tab-separated.
+//!
+//! - 11 columns (legacy):
+//!   1. contig
+//!   2. start (0-based, inclusive)
+//!   3. end (0-based, inclusive)
+//!   4. n_reads
+//!   5. coverage
+//!   6. ratio_sa
+//!   7. ratio_start
+//!   8. ratio_end
+//!   9. outlier (comma+space separated labels, or empty)
+//!   10. depths (comma-separated per-base counts)
+//!   11. low_qualities (comma-separated per-base counts)
+//!
+//! - 12 columns (extended, optional):
+//!   12. fb_invs (comma-separated per-base counts)
+//!
+//! Parsers accept either 11 or 12 columns; if the 12th column is absent, `fb_invs` is empty.
+//!
+//! ## Semantics (bitwise-identical mode)
+//!
+//! - **Per-base vectors** (`depths`, `low_qualities`, `fb_invs`) are accumulated **per alignment**
+//!   for each bin it overlaps.
+//! - **`n_reads` and ratios** are computed using a per-bin **qname-deduplicated representative record**:
+//!   the representative is the **last** alignment encountered for a given qname within that bin
+//!   (“last wins”), matching the previous `HashMap` overwrite behavior.
+//! - Ratios are:
+//!   - `ratio_sa`    = (number of representative records with an SA tag) / `n_reads`
+//!   - `ratio_start` = (maximum representative-start pileup at a single position) / `n_reads`
+//!   - `ratio_end`   = (maximum representative-end pileup at a single position) / `n_reads`
+//!
+//! ## Performance notes
+//! - The largest win comes from avoiding thousands of small random indexed reads.
+//! - Parallelism is across contigs; you may want to cap Rayon threads in very high-contig-count
+//!   references or on shared storage.
+//!
+//! ## Key entry points
+//! - [`par_whole_scan`]: scan one BAM (per-sample / per-time-point) and write per-contig TSVs.
+//! - [`somatic_scan`]: run both normal and tumor scans for a sample.
+//!
+use std::collections::HashMap;
 use std::path::Path;
 use std::{fmt, fs, io::Write};
 
 use anyhow::Context;
 use log::{debug, error, info};
-use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator};
-use rayon::{
-    iter::{IntoParallelIterator, ParallelIterator},
-    slice::ParallelSliceMut,
-};
-use rust_htslib::bam::{self, IndexedReader, Read};
+use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
+use rayon::slice::ParallelSliceMut;
+use rust_htslib::bam::ext::BamRecordExtensions;
+use rust_htslib::bam::{self, IndexedReader, Read, Record};
 
 use crate::helpers::{bam_contigs, get_genome_sizes, is_file_older};
+use crate::io::bam::fb_inv_from_record;
 use crate::io::writers::get_gz_writer;
 use crate::math::filter_outliers_modified_z_score_with_indices;
 
+use crate::config::Config;
 use crate::pipes::{Initialize, ShouldRun};
 use crate::runners::Run;
+use crate::scan::bin::{Bin, BinStats};
 use crate::variant::vcf_variant::Label;
-use crate::{config::Config, scan::bin::Bin};
 
 /// Represents a count of reads in a genomic bin, including various metrics and outlier information.
 #[derive(Debug)]
 pub struct BinCount {
     /// The name of the contig (chromosome) this bin belongs to.
     pub contig: String,
-    /// The start position of the bin in the contig.
+    /// The 0-based inclusive start position of the bin.
     pub start: u32,
-    /// The end position of the bin in the contig.
+    /// The 0-based inclusive end position of the bin.
     pub end: u32,
-    /// The total number of reads in this bin.
+    /// Number of reads in the bin (qname-deduplicated representative count in bitwise-identical mode).
     pub n_reads: u32,
-    /// The average coverage of reads in this bin.
+    /// Mean bin coverage computed from `depths`.
     pub coverage: f64,
-    /// The ratio of supplementary alignments to total reads.
+    /// Ratio of representative reads with SA tag.
     pub ratio_sa: f64,
-    /// The ratio of reads starting in this bin to total reads.
+    /// Ratio of max start pileup (representative reads) to total representative reads.
     pub ratio_start: f64,
-    /// The ratio of reads ending in this bin to total reads.
+    /// Ratio of max end pileup (representative reads) to total representative reads.
     pub ratio_end: f64,
     /// Optional vector of outlier types for this bin.
     pub outlier: Option<Vec<BinOutlier>>,
+    /// Per-base depth over the bin (comma-separated in TSV).
     pub depths: Vec<u32>,
+    /// Per-base low-mapq counts over the bin (comma-separated in TSV).
     pub low_qualities: Vec<u32>,
+    /// Per-base fb_inv counts over the bin (comma-separated in TSV; optional column).
+    pub fb_invs: Vec<u32>,
 }
 
 impl From<&Bin> for BinCount {
@@ -69,18 +121,51 @@ impl From<&Bin> for BinCount {
             outlier: None,
             depths: bin.depths.clone(),
             low_qualities: bin.low_qualities.clone(),
+            fb_invs: bin.fb_invs.clone(),
+        }
+    }
+}
+
+impl From<&BinStats> for BinCount {
+    fn from(bin: &BinStats) -> Self {
+        let n_reads_f = bin.n_reads as f64;
+
+        let (ratio_sa, ratio_start, ratio_end) = if bin.n_reads == 0 {
+            (f64::NAN, f64::NAN, f64::NAN)
+        } else {
+            (
+                bin.n_sa as f64 / n_reads_f,
+                bin.max_start_count as f64 / n_reads_f,
+                bin.max_end_count as f64 / n_reads_f,
+            )
+        };
+
+        Self {
+            contig: bin.contig.clone(),
+            start: bin.start,
+            end: bin.end,
+            n_reads: bin.n_reads,
+            coverage: bin.mean_coverage_from_depths(),
+            ratio_sa,
+            ratio_start,
+            ratio_end,
+            outlier: None,
+            depths: bin.depths.clone(),
+            low_qualities: bin.low_qualities.clone(),
+            fb_invs: bin.fb_invs.clone(),
         }
     }
 }
 
 impl BinCount {
-    /// Converts the `BinCount` instance to a TSV (Tab-Separated Values) row string.
+    /// Converts the `BinCount` instance to a TSV (tab-separated) row.
     ///
-    /// # Returns
-    /// A `String` representing the `BinCount` as a TSV row.
+    /// ## Columns
+    /// Always writes **12 columns** (including `fb_invs` as the last column).
+    /// Downstream parsers may accept 11 columns for legacy files without `fb_invs`.
     pub fn to_tsv_row(&self) -> String {
         format!(
-            "{}\t{}\t{}\t{}\t{:.6}\t{:.6}\t{:.6}\t{:.6}\t{}\t{}\t{}",
+            "{}\t{}\t{}\t{}\t{:.6}\t{:.6}\t{:.6}\t{:.6}\t{}\t{}\t{}\t{}",
             self.contig,
             self.start,
             self.end,
@@ -94,40 +179,43 @@ impl BinCount {
                 .map(|v| v
                     .iter()
                     .map(|e| e.to_string())
-                    .collect::<Vec<String>>()
+                    .collect::<Vec<_>>()
                     .join(", "))
                 .unwrap_or_default(),
             self.depths
                 .iter()
                 .map(|e| e.to_string())
-                .collect::<Vec<String>>()
+                .collect::<Vec<_>>()
                 .join(","),
             self.low_qualities
                 .iter()
                 .map(|e| e.to_string())
-                .collect::<Vec<String>>()
-                .join(",")
+                .collect::<Vec<_>>()
+                .join(","),
+            self.fb_invs
+                .iter()
+                .map(|e| e.to_string())
+                .collect::<Vec<_>>()
+                .join(","),
         )
     }
 
-    /// Parses a TSV row and creates a BinCount object.
+    /// Parses a TSV row and creates a [`BinCount`].
     ///
-    /// # Parameters
-    /// - `row: &str`: A string slice representing a TSV row.
-    ///
-    /// # Returns
-    /// A `Result` containing either a `BinCount` object if parsing is successful,
-    /// or an `anyhow::Error` if parsing fails.
+    /// Accepts either:
+    /// - **11 columns** (legacy; without `fb_invs`)
+    /// - **12 columns** (extended; with `fb_invs`)
     ///
-    /// # Errors
-    /// This function will return an error if:
-    /// - The row does not contain the expected number of fields.
-    /// - Any of the numeric fields fail to parse.
-    /// - The outlier field is not in the expected format.
+    /// ## Errors
+    /// Returns an error if:
+    /// - Column count is not 11 or 12
+    /// - Any numeric field fails to parse
+    /// - Outlier labels are not recognized
     pub fn from_tsv_row(row: &str) -> anyhow::Result<Self> {
         let fields: Vec<&str> = row.split('\t').collect();
-        if fields.len() != 11 {
-            anyhow::bail!("Invalid number of fields in TSV row");
+        // wont block if fb_inv has been added or not
+        if fields.len() != 11 && fields.len() != 12 {
+            anyhow::bail!("Invalid number of fields in TSV row (got {})", fields.len());
         }
 
         let outlier = if !fields[8].is_empty() {
@@ -172,6 +260,23 @@ impl BinCount {
                 .collect::<anyhow::Result<Vec<u32>>>()?
         };
 
+        let fb_invs = if fields.len() == 12 {
+            if fields[11].trim().is_empty() {
+                Vec::new()
+            } else {
+                fields[11]
+                    .split(',')
+                    .map(|s| {
+                        s.trim()
+                            .parse::<u32>()
+                            .with_context(|| format!("Failed to parse '{}' as u32", s))
+                    })
+                    .collect::<anyhow::Result<Vec<u32>>>()?
+            }
+        } else {
+            Vec::new()
+        };
+
         Ok(BinCount {
             contig: fields[0].to_string(),
             start: fields[1].parse()?,
@@ -184,6 +289,7 @@ impl BinCount {
             outlier,
             depths,
             low_qualities,
+            fb_invs,
         })
     }
 }
@@ -198,12 +304,6 @@ fn parse_f64_allow_nan(s: &str) -> anyhow::Result<f64> {
     }
 }
 
-// impl GetGenomeRange for BinCount {
-//     fn range(&self) -> GenomeRange {
-//         GenomeRange { contig: contig_to_num(&self.contig), range: self.start..(self.end + 1) }
-//     }
-// }
-
 /// Represents types of outliers that can be detected in a genomic bin.
 #[derive(Debug, Clone)]
 pub enum BinOutlier {
@@ -232,65 +332,306 @@ impl fmt::Display for BinOutlier {
     }
 }
 
-/// Performs a parallel whole genome scan on a BAM file, counting reads in bins across the genome
-/// and identifying outliers. The results are written to TSV files, one per contig.
-///
-/// # Parameters
-/// - `out_dir: &str`: The output directory where results will be saved.
-/// - `bam_path: &str`: The path to the input BAM file.
-/// - `config: &Config`: A configuration object containing the following fields:
-///   - `count_bin_size`: The size of each bin in base pairs.
-///   - `count_n_chunks`: The number of bins per chunk for parallel processing.
-///
-/// # Returns
-/// - `anyhow::Result<()>`: Returns `Ok(())` if successful, or an error if any operation fails.
-///
-/// # Description
-/// This function processes the genome in parallel by dividing each contig into chunks of bins.
-/// Each bin represents a region of the genome, and the function counts reads in these bins.
-/// After processing, bins are sorted, outliers are identified, and results are written to TSV files.
-///
-/// ## Workflow:
-/// 1. **Initialization**:
-///    - Logs the start of the scan with bin size and chunk information.
-///    - Creates the output directory if it does not exist.
-///
-/// 2. **Contig Processing**:
-///    - Reads contigs and their lengths from the dictionary file.
-///    - Skips the mitochondrial chromosome ("chrM").
+// Definitions matching your *current* bin enumeration (bitwise identical ranges)
+#[derive(Clone, Copy)]
+struct BinDef {
+    start: u32,
+    end: u32,
+    len: usize,
+}
+
+fn build_bin_defs_grid(
+    contig_len: u32,
+    bin_size: u32,
+    chunk_n_bin: u32,
+) -> (Vec<BinDef>, Vec<Option<usize>>) {
+    let n_bin = contig_len / bin_size; // keep your bitwise logic
+    let n_chunks = n_bin.div_ceil(chunk_n_bin); // keep your bitwise logic
+
+    // Grid is the natural bin_size tiling across the contig:
+    let n_grid = contig_len.div_ceil(bin_size) as usize;
+    let mut grid_to_def = vec![None; n_grid];
+
+    let mut defs = Vec::new();
+
+    for i in 0..n_chunks {
+        let chunk_start = i * chunk_n_bin * bin_size;
+
+        let chunk_length = if i == n_chunks - 1 {
+            contig_len - chunk_start
+        } else {
+            chunk_n_bin * bin_size
+        };
+
+        let n_bins_in_chunk = chunk_length.div_ceil(bin_size);
+
+        for j in 0..n_bins_in_chunk {
+            let bin_start = chunk_start + j * bin_size;
+            let bin_length = std::cmp::min(bin_size, chunk_length - j * bin_size);
+            let bin_end = bin_start + bin_length - 1;
+
+            let def_idx = defs.len();
+            defs.push(BinDef {
+                start: bin_start,
+                end: bin_end,
+                len: bin_length as usize,
+            });
+
+            let grid_idx = (bin_start / bin_size) as usize;
+            if grid_idx < grid_to_def.len() {
+                grid_to_def[grid_idx] = Some(def_idx);
+            }
+        }
+    }
+
+    (defs, grid_to_def)
+}
+
+// Per-bin state to preserve semantics
+#[derive(Clone, Copy)]
+struct RepRec {
+    // representative record per qname ("last wins" per your HashMap overwrite)
+    aln_start: u32,
+    aln_end: u32,
+    has_sa: bool,
+}
+
+struct BinState {
+    // alignment-based per-base arrays (bitwise identical to current)
+    depths: Vec<u32>,
+    lowq: Vec<u32>,
+    fb_invs: Vec<u32>,
+
+    // qname-dedup representative (bitwise identical to current reads_store overwrite)
+    reps: HashMap<Vec<u8>, RepRec>,
+}
+
+impl BinState {
+    fn new(len: usize) -> Self {
+        Self {
+            depths: vec![0; len],
+            lowq: vec![0; len],
+            fb_invs: vec![0; len],
+            reps: HashMap::new(),
+        }
+    }
+}
+
+// helper to get inclusive read span like your old code
+#[inline]
+fn read_range_inclusive(rec: &Record) -> Option<(u32, u32)> {
+    let start = rec.pos();
+    let end_excl = rec.cigar().end_pos();
+    if start >= 0 && end_excl > start {
+        Some((start as u32, end_excl as u32 - 1))
+    } else {
+        None
+    }
+}
+
+fn scan_contig(
+    bam_path: &str,
+    contig: &str,
+    contig_len: u32,
+    bin_size: u32,
+    chunk_n_bin: u32,
+    min_mapq: u8,
+) -> anyhow::Result<Vec<BinCount>> {
+    // Build bin definitions EXACTLY like the old code (bitwise-identical ranges),
+    // plus a safe mapping from grid-bin index -> defs index.
+    let (defs, grid_to_def) = build_bin_defs_grid(contig_len, bin_size, chunk_n_bin);
+    if grid_to_def.is_empty() {
+        return Ok(Vec::new());
+    }
+
+    let mut bins: Vec<BinState> = defs.iter().map(|d| BinState::new(d.len)).collect();
+
+    let mut reader = IndexedReader::from_path(bam_path)
+        .with_context(|| format!("Failed to open BAM: {bam_path}"))?;
+    let header = reader.header().clone();
+
+    // one fetch per contig (optimized I/O)
+    reader
+        .fetch((contig, 0_i64, contig_len as i64))
+        .with_context(|| format!("Failed to fetch contig {contig}"))?;
+
+    for rr in reader.rc_records() {
+        let rc = match rr {
+            Ok(x) => x,
+            Err(e) => {
+                error!("Failed to parse record in contig {contig}: {e}");
+                continue;
+            }
+        };
+        let rec = rc.as_ref();
+
+        let (read_start, read_end) = match read_range_inclusive(rec) {
+            Some(x) => x,
+            None => continue,
+        };
+
+        // Convert read span to grid-bin indices (safe bounds), then map to defs indices.
+        let grid_first = (read_start / bin_size) as usize;
+        if grid_first >= grid_to_def.len() {
+            continue;
+        }
+        let mut grid_last = (read_end / bin_size) as usize;
+        if grid_last >= grid_to_def.len() {
+            grid_last = grid_to_def.len() - 1;
+        }
+
+        let is_lowq = rec.mapq() < min_mapq;
+        let has_fbinv = fb_inv_from_record(rec, &header, min_mapq, Some(150), Some(200)).is_some();
+        let has_sa = matches!(rec.aux(b"SA"), Ok(rust_htslib::bam::record::Aux::String(_)));
+
+        // Representative fields (same ones used by old count_reads_sa_start_end on reads_store.values())
+        let aln_start = rec.reference_start() as u32;
+        let aln_end = rec.reference_end() as u32;
+
+        // "last wins" representative per qname (same as HashMap overwrite behavior)
+        let qname = rec.qname().to_vec();
+
+        for &def_opt in grid_to_def[grid_first..=grid_last].iter() {
+            let Some(def_idx) = def_opt else {
+                continue;
+            };
+            let def = defs[def_idx];
+
+            if read_end < def.start || read_start > def.end {
+                continue;
+            }
+            let ov_start = read_start.max(def.start);
+            let ov_end = read_end.min(def.end);
+
+            let b = &mut bins[def_idx];
+
+            let off0 = (ov_start - def.start) as usize;
+            let off1 = (ov_end - def.start) as usize;
+            for i in off0..=off1 {
+                b.depths[i] += 1;
+                if is_lowq {
+                    b.lowq[i] += 1;
+                }
+                if has_fbinv {
+                    b.fb_invs[i] += 1;
+                }
+            }
+
+            b.reps.insert(
+                qname.clone(),
+                RepRec {
+                    aln_start,
+                    aln_end,
+                    has_sa,
+                },
+            );
+        }
+    }
+
+    // Materialize BinCount rows in the same order/ranges as defs
+    let mut out = Vec::with_capacity(defs.len());
+
+    for (def_idx, def) in defs.into_iter().enumerate() {
+        let b = &mut bins[def_idx];
+
+        // n_reads is unique-qname count (same as old reads_store.len())
+        let n_reads = b.reps.len() as u32;
+
+        // SA count from representative records (same as old reads_store.values())
+        let n_sa = b.reps.values().filter(|r| r.has_sa).count() as u32;
+
+        // max start/end pileups from representative records (same as old HashMap counting)
+        let mut start_counts: HashMap<u32, u32> = HashMap::new();
+        let mut end_counts: HashMap<u32, u32> = HashMap::new();
+
+        for r in b.reps.values() {
+            if (def.start..=def.end).contains(&r.aln_start) {
+                *start_counts.entry(r.aln_start).or_insert(0) += 1;
+            }
+            if (def.start..=def.end).contains(&r.aln_end) {
+                *end_counts.entry(r.aln_end).or_insert(0) += 1;
+            }
+        }
+
+        let max_start = start_counts.values().copied().max().unwrap_or(0);
+        let max_end = end_counts.values().copied().max().unwrap_or(0);
+
+        // coverage from alignment-based depths vector (same as old mean_coverage_from_depths)
+        let coverage = if b.depths.is_empty() {
+            0.0
+        } else {
+            b.depths.iter().sum::<u32>() as f64 / (b.depths.len() as f64)
+        };
+
+        let n_reads_f = n_reads as f64;
+        let (ratio_sa, ratio_start, ratio_end) = if n_reads == 0 {
+            (f64::NAN, f64::NAN, f64::NAN)
+        } else {
+            (
+                n_sa as f64 / n_reads_f,
+                max_start as f64 / n_reads_f,
+                max_end as f64 / n_reads_f,
+            )
+        };
+
+        out.push(BinCount {
+            contig: contig.to_string(),
+            start: def.start,
+            end: def.end,
+            n_reads,
+            coverage,
+            ratio_sa,
+            ratio_start,
+            ratio_end,
+            outlier: None,
+            depths: std::mem::take(&mut b.depths),
+            low_qualities: std::mem::take(&mut b.lowq),
+            fb_invs: std::mem::take(&mut b.fb_invs),
+        });
+    }
+
+    Ok(out)
+}
+
+/// Performs a whole-genome scan on a BAM file, counting reads in fixed-size bins and writing
+/// one TSV (gzipped) file per contig.
 ///
-/// 3. **Parallel Scanning**:
-///    - For each contig:
-///      - Calculates the number of bins and chunks using ceiling division.
-///      - Processes chunks in parallel using `into_par_iter()`.
-///      - For each chunk:
-///        - Calculates chunk start position and length.
-///        - Processes individual bins within the chunk by creating `Bin` objects and converting them to `BinCount`.
+/// ## Output
+/// For each contig, writes `BinCount` rows to:
+/// - `config.somatic_scan_solo_count_file(id, time_point, contig)`
 ///
-/// 4. **Post-Processing**:
-///    - Sorts bins by their start positions using parallel sorting.
-///    - Identifies outlier bins using a custom function (`fill_outliers`).
+/// Rows are written in ascending bin `start` order (bins are sorted before writing).
+/// TSV rows are produced by [`BinCount::to_tsv_row`].
 ///
-/// 5. **Output**:
-///    - Writes results for each contig to a TSV file in the specified output directory.
+/// ## Parameters
+/// - `id`: sample identifier used to resolve input/output paths from `config`.
+/// - `time_point`: name of the time point (e.g. normal/tumoral label) used to resolve paths.
+/// - `config`: configuration containing:
+///   - `count_bin_size`: bin width (bp)
+///   - `count_n_chunks`: legacy chunking parameter used for **range enumeration** (still affects
+///     which bins exist in bitwise-identical mode)
+///   - `bam_min_mapq`: threshold used for `low_qualities` and `fb_invs` counting
+///   - `somatic_scan_force`: if `true`, outputs are regenerated regardless of timestamps
 ///
-/// ## Notes:
-/// - The function uses ceiling division (`div_ceil`) to handle edge cases in bin and chunk calculations.
-/// - It includes debug logging for various stages of processing.
-/// - Handles edge cases for the last chunk and bin to ensure proper processing of all data.
+/// ## Skipping / re-run logic
+/// Per contig, the output file is skipped if it exists and is not older than the input BAM,
+/// unless `config.somatic_scan_force` is set.
 ///
-/// # Errors
-/// This function will return an error if:
-/// - The output directory cannot be created.
-/// - A `Bin` object cannot be created for a specific region.
-/// - Any I/O operation (e.g., writing results) fails.
+/// ## Errors
+/// Returns an error if:
+/// - the input BAM cannot be opened
+/// - output directories/files cannot be created
+/// - contig scanning fails (fetch/iteration)
+/// - writing the output TSV fails
 pub fn par_whole_scan(id: &str, time_point: &str, config: &Config) -> anyhow::Result<()> {
-    let bin_size = config.count_bin_size ;
-    let chunk_n_bin = config.count_n_chunks ;
+    let bin_size = config.count_bin_size;
+    let chunk_n_bin = config.count_n_chunks;
     let bam_path = &config.solo_bam(id, time_point);
     let out_dir = config.somatic_scan_solo_output_dir(id, time_point);
 
-    info!("Starting whole genome scan for {bam_path}, with bin size of {bin_size} nt and by chunks of {chunk_n_bin} bins.");
+    info!(
+        "Starting whole genome scan for {bam_path}, bin={bin_size} nt, chunk_n_bin={chunk_n_bin}."
+    );
     fs::create_dir_all(&out_dir)?;
 
     let reader = bam::Reader::from_path(bam_path)
@@ -305,87 +646,44 @@ pub fn par_whole_scan(id: &str, time_point: &str, config: &Config) -> anyhow::Re
         })
         .collect::<anyhow::Result<_>>()?;
 
-    for (contig, length) in contigs {
-        let out_file = config.somatic_scan_solo_count_file(id, time_point, &contig);
-        // let out_file = format!("{out_dir}/{contig}_count.tsv.gz");
+    contigs
+        .par_iter()
+        .try_for_each(|(contig, length)| -> anyhow::Result<()> {
+            let out_file = config.somatic_scan_solo_count_file(id, time_point, contig);
 
-        // Skip this file if it already exists and is up-to-date compared to the input BAM,
-        // unless forced by the `somatic_scan_force` flag.
-        if !is_file_older(&out_file, bam_path, false).unwrap_or(true) && !config.somatic_scan_force
-        {
-            continue;
-        }
+            if !is_file_older(&out_file, bam_path, false).unwrap_or(true)
+                && !config.somatic_scan_force
+            {
+                return Ok(());
+            }
 
-        let n_bin = length / bin_size;
-        // Calculate number of chunks using ceiling division
-        let n_chunks = n_bin.div_ceil(chunk_n_bin);
-        info!("Scan of contig: {contig}");
-
-        let mut bins: Vec<BinCount> = (0..n_chunks)
-            .into_par_iter()
-            .map_init(
-                || IndexedReader::from_path(bam_path).unwrap(), // per-thread reader
-                |bam_reader, i| {
-                    // .flat_map(|i| {
-                    // Calculate chunk start position
-                    let chunk_start = i * chunk_n_bin * bin_size;
-
-                    // Calculate chunk length
-                    let chunk_length = if i == n_chunks - 1 {
-                        length - chunk_start // Handle last chunk
-                    } else {
-                        chunk_n_bin * bin_size // Standard chunk size
-                    };
-
-                    // Calculate number of bins in this chunk with ceiling division
-                    let n_bins_in_chunk = chunk_length.div_ceil(bin_size);
-
-                    // let mut bam_reader = IndexedReader::from_path(bam_path)
-                    //     .with_context(|| format!("Failed to open BAM file: {}", bam_path))
-                    //     .unwrap();
-
-                    // Process each bin in the chunk
-                    (0..n_bins_in_chunk)
-                        // .into_iter()
-                        .filter_map(|j| {
-                            let bin_start = chunk_start + j * bin_size;
-                            // Ensure we don't exceed remaining length
-                            let bin_length = std::cmp::min(bin_size, chunk_length - j * bin_size);
-                            // debug!("chunk start:{chunk_start}, length: {chunk_length}, n_bins: {n_bins_in_chunk}, first bin start: {bin_start} bin length: {bin_length}");
-                            match Bin::new(
-                                bam_reader,
-                                &contig,
-                                bin_start,
-                                bin_length,
-                                config.bam_min_mapq,
-                            ) {
-                                Ok(bin) => Some(BinCount::from(&bin)),
-                                Err(e) => {
-                                    error!("Failed to get Bin: {e}");
-                                    None
-                                }
-                            }
-                        })
-                        .collect::<Vec<BinCount>>()
-                },
-            )
-            .flatten()
-            .collect();
+            info!("Scan of contig: {contig}");
 
-        debug!("Scan {contig}, sorting bins");
-        bins.par_sort_unstable_by(|a, b| a.start.cmp(&b.start));
+            let mut bins = scan_contig(
+                bam_path,
+                contig,
+                *length,
+                bin_size,
+                chunk_n_bin,
+                config.bam_min_mapq,
+            )?;
 
-        debug!("Scan {contig}, computing outliers");
-        fill_outliers(&mut bins);
+            debug!("Scan {contig}, sorting bins");
+            bins.par_sort_unstable_by(|a, b| a.start.cmp(&b.start));
 
-        debug!("Scan {contig}, writing file");
+            debug!("Scan {contig}, computing outliers");
+            fill_outliers(&mut bins);
+
+            debug!("Scan {contig}, writing file");
+            let mut file = get_gz_writer(&out_file, true)
+                .with_context(|| anyhow::anyhow!("failed to open the file: {out_file}"))?;
+            for bin in bins {
+                writeln!(file, "{}", bin.to_tsv_row())?;
+            }
+
+            Ok(())
+        })?;
 
-        let mut file = get_gz_writer(&out_file, true)
-            .with_context(|| anyhow::anyhow!("failed to open the file: {out_file}"))?;
-        for bin in bins {
-            writeln!(file, "{}", bin.to_tsv_row())?;
-        }
-    }
     Ok(())
 }