Browse Source

working scan no mem leaks because of pileup

Thomas 8 months ago
parent
commit
0b6f1f3ffe

+ 23 - 38
src/annotation/vep.rs

@@ -692,49 +692,31 @@ pub fn run_vep(in_path: &str, out_path: &str) -> anyhow::Result<()> {
     Ok(())
 }
 
-/// Selects the best Variant Effect Predictor (VEP) annotation from a slice of VEP annotations.
+/// Selects the "best" VEP annotation from a list, based on biological impact and transcript priority.
 ///
-/// This function implements a prioritization algorithm to choose the most relevant VEP annotation
-/// based on impact severity and transcript preference.
+/// # Selection Strategy
+/// 1. **Group by minimal VEP impact** (e.g., `HIGH`, `MODERATE`, etc.).  
+///    The lowest-impact group is selected (e.g., `HIGH` preferred over `MODIFIER`).
+///
+/// 2. **If only one annotation in the best-impact group**, return it immediately.
+///
+/// 3. **If multiple remain**, rank by:
+///     - Transcript type: prefer `"NM"` (manually curated) over others
+///     - Accession number (lower is better)
 ///
 /// # Arguments
-/// * `d` - A slice of VEP annotations
+/// * `d` - A slice of `VEP` annotations to choose from.
 ///
 /// # Returns
-/// * `Ok(VEP)` containing the best VEP annotation if successful
-/// * `Err(anyhow::Error)` if no valid VEP annotation can be selected
-///
-/// # Algorithm
-/// 1. Groups VEPs by their impact (HIGH > MODERATE > LOW > MODIFIER)
-/// 2. Selects the group with the highest impact
-/// 3. If there's only one VEP in this group, it's returned
-/// 4. Otherwise, further filtering is done based on transcript identifiers:
-///    - Parses NCBI accessions from the feature field
-///    - Prioritizes 'NM' prefixed accessions over others
-///    - Sorts by accession number (higher numbers are preferred)
+/// * `Ok(VEP)` - The best annotation according to the rules above
+/// * `Err(anyhow::Error)` - If the input is empty or contains no valid features
 ///
 /// # Errors
-/// This function will return an error if:
-/// * The input slice is empty
-/// * No valid NCBI accession can be parsed from the selected VEPs
-///
-/// # Example
-/// ```
-/// let veps = vec![VEP { /* ... */ }, VEP { /* ... */ }];
-/// match get_best_vep(&veps) {
-///     Ok(best_vep) => println!("Best VEP: {:?}", best_vep),
-///     Err(e) => eprintln!("Failed to get best VEP: {}", e),
-/// }
-/// ```
-///
-/// # Note
-/// This function assumes that the VEP annotations are well-formed and contain
-/// the necessary fields for comparison (consequence, feature). It will log warnings
-/// for any parsing errors encountered during the selection process.        
-
-
-
+/// * Returns an error if:
+///     - `d` is empty
+///     - None of the best-impact VEPs have a parsable `feature` field
 pub fn get_best_vep(d: &[VEP]) -> anyhow::Result<VEP> {
+    // Step 1: Group by minimal VEP impact
     let best_impact_veps = d
         .iter()
         .chunk_by(|vep| {
@@ -748,12 +730,14 @@ pub fn get_best_vep(d: &[VEP]) -> anyhow::Result<VEP> {
         .into_iter()
         .min_by_key(|(impact, _)| impact.clone())
         .map(|(_, group)| group.cloned().collect::<Vec<_>>())
-        .ok_or_else(|| anyhow!("No element in VEP vector"))?;
+        .ok_or_else(|| anyhow::anyhow!("No element in VEP vector"))?;
 
+    // Step 2: If only one, return it
     if best_impact_veps.len() == 1 {
         return Ok(best_impact_veps[0].clone());
     }
 
+    // Step 3: Rank by NCBI accession priority
     let parsed_veps = best_impact_veps
         .iter()
         .enumerate()
@@ -761,15 +745,16 @@ pub fn get_best_vep(d: &[VEP]) -> anyhow::Result<VEP> {
             vep.feature.as_ref().and_then(|feat| {
                 feat.parse::<NCBIAcc>()
                     .map(|acc| (i, acc))
-                    .map_err(|e| warn!("Can't parse {}: {}", feat, e))
+                    .map_err(|e| warn!("Can't parse NCBI accession '{}': {}", feat, e))
                     .ok()
             })
         })
         .sorted_by_key(|(_, acc)| (Reverse(acc.prefix == "NM"), acc.number))
         .collect::<Vec<_>>();
 
+    // Pick the top-ranked transcript
     parsed_veps
         .first()
         .map(|(k, _)| best_impact_veps[*k].clone())
-        .ok_or_else(|| anyhow!("No valid NCBI accession found"))
+        .ok_or_else(|| anyhow::anyhow!("No valid NCBI accession found"))
 }

+ 8 - 8
src/callers/savana.rs

@@ -20,8 +20,8 @@ use log::info;
 use rayon::prelude::*;
 use serde::{Deserialize, Serialize};
 use std::{
-    fs::{self, File},
-    io::{self, BufRead},
+    fs::{self},
+    io::BufRead,
     path::Path,
     str::FromStr,
 };
@@ -483,11 +483,12 @@ impl SavanaCN {
                     .context(format!("Failed to parse {name} at line {line_num}"))
             };
 
-            let parse_field_u32 = |field: &str, name: &str, line_num: usize| -> anyhow::Result<u32> {
-                field
-                    .parse()
-                    .context(format!("Failed to parse {name} at line {line_num}"))
-            };
+            let parse_field_u32 =
+                |field: &str, name: &str, line_num: usize| -> anyhow::Result<u32> {
+                    field
+                        .parse()
+                        .context(format!("Failed to parse {name} at line {line_num}"))
+                };
 
             let line_num = index + 1;
 
@@ -523,5 +524,4 @@ impl SavanaCN {
 
         Ok(Self { segments })
     }
-
 }

+ 0 - 2
src/cases/case.rs

@@ -1,5 +1,3 @@
-use std::path::Path;
-
 use rusqlite::{Connection, OptionalExtension};
 
 use crate::config::Config;

+ 0 - 4
src/collection/bam.rs

@@ -670,7 +670,3 @@ pub fn get_genome_sizes(header: &rust_htslib::bam::Header) -> anyhow::Result<Has
     }
     Ok(genome)
 }
-
-// fn softclipped_bases(read: &rust_htslib::bam::Record) -> u128 {
-//     (read.cigar().leading_softclips() + read.cigar().trailing_softclips()) as u128
-// }

+ 87 - 91
src/collection/mod.rs

@@ -2,7 +2,6 @@ use std::{
     collections::HashMap,
     fmt,
     fs::{self, metadata},
-    os::unix::fs::MetadataExt,
     path::{Path, PathBuf},
     thread,
     time::SystemTime,
@@ -27,9 +26,8 @@ use crate::{
     functions::{
         assembler::{Assembler, AssemblerConfig},
         variants::{RunVariantsAgg, VariantsConfig},
-        whole_scan::{WholeScan, WholeScanConfig},
     },
-    runners::Run,
+    runners::Run, scan::scan::{par_whole_scan, par_whole_scan_local},
 };
 
 pub mod bam;
@@ -138,37 +136,6 @@ impl Collections {
             .into_values()
             .for_each(|data| tasks.push(CollectionsTasks::DemuxAlign(data)));
 
-        // Whole scan
-        for bam in self
-            .bam
-            .by_id_completed(self.config.min_diag_cov, self.config.min_mrd_cov)
-        {
-            let config = WholeScanConfig::default();
-            let scan_dir = format!(
-                "{}/{}/{}/{}",
-                &config.result_dir, bam.id, bam.time_point, config.scan_dir
-            );
-            if PathBuf::from(&scan_dir).exists() {
-                let dir_mod: DateTime<Utc> = fs::metadata(&scan_dir)?.modified()?.into();
-                if bam.modified > dir_mod {
-                    fs::remove_dir_all(&scan_dir)?;
-                }
-            }
-
-            if !PathBuf::from(&scan_dir).exists() {
-                tasks.push(CollectionsTasks::WholeScan {
-                    id: bam.id,
-                    time_point: bam.time_point,
-                    bam: bam
-                        .path
-                        .to_str()
-                        .context("Cant convert path to string")?
-                        .to_string(),
-                    config: WholeScanConfig::default(),
-                });
-            }
-        }
-
         // de novo
         // tasks.extend(self.todo_assembler()?);
 
@@ -313,6 +280,38 @@ impl Collections {
         self.tasks = hs.into_values().collect();
     }
 
+    pub fn todo_bam_count(&mut self, config: &Config) -> anyhow::Result<()> {
+        // Whole scan
+        for wgs_bam in self
+            .bam
+            .by_id_completed(self.config.min_diag_cov, self.config.min_mrd_cov)
+        {
+            let id = wgs_bam.id.as_str();
+
+            let count_dir = match wgs_bam.time_point.as_str() {
+                "diag" => config.tumoral_dir_count(id),
+                "mrd" => config.normal_dir_count(id),
+                _ => anyhow::bail!("Unknown bam time point {}", wgs_bam.time_point),
+            };
+
+            if PathBuf::from(&count_dir).exists() {
+                let dir_mod: DateTime<Utc> = fs::metadata(&count_dir)?.modified()?.into();
+                if wgs_bam.modified > dir_mod {
+                    fs::remove_dir_all(&count_dir)?;
+                }
+            }
+
+            if !PathBuf::from(&count_dir).exists() {
+                self.tasks.push(CollectionsTasks::CountBam {
+                    bam_path: wgs_bam.path.to_string_lossy().to_string(),
+                    count_dir,
+                    config: config.clone(),
+                });
+            }
+        }
+        Ok(())
+    }
+
     // No pair needed
     pub fn todo_assembler(&self) -> anyhow::Result<Vec<CollectionsTasks>> {
         let mut tasks = Vec::new();
@@ -546,47 +545,47 @@ impl Collections {
     /// * `anyhow::Result<Vec<CollectionsTasks>>` - A Result containing a vector of `CollectionsTasks::Variants`
     ///   if successful, or an error if file metadata cannot be accessed.
     ///
-    pub fn todo_variants_agg(&self) -> anyhow::Result<Vec<CollectionsTasks>> {
-        let mut tasks = Vec::new();
-        let config = VariantsConfig::default();
-        let vcfs_ids = self.vcf.group_by_id();
-        for pair in &self.bam_pairs() {
-            if self.config.id_black_list.contains(&pair.0.id) {
-                continue;
-            }
-            let const_path = format!(
-                "{}/{}/diag/{}_constit.bytes.gz",
-                &config.result_dir, &pair.0.id, &pair.0.id
-            );
-            let constit = Path::new(&const_path);
-
-            if constit.exists() {
-                let vcfs: Vec<_> = vcfs_ids.iter().filter(|(id, _)| id == &pair.0.id).collect();
-                if let Some((_, vcfs)) = vcfs.first() {
-                    let mtime = constit
-                        .metadata()
-                        .context(format!("Can't access file metadata {const_path}."))?
-                        .mtime();
-                    let n_new = vcfs
-                        .iter()
-                        .filter(|vcf| mtime < vcf.file_metadata.mtime())
-                        .count();
-                    if n_new > 0 {
-                        tasks.push(CollectionsTasks::SomaticVariants {
-                            id: pair.0.id.clone(),
-                            config: config.clone(),
-                        });
-                    }
-                }
-            } else {
-                tasks.push(CollectionsTasks::SomaticVariants {
-                    id: pair.0.id.clone(),
-                    config: config.clone(),
-                });
-            }
-        }
-        Ok(tasks)
-    }
+    // pub fn todo_variants_agg(&self) -> anyhow::Result<Vec<CollectionsTasks>> {
+    //     let mut tasks = Vec::new();
+    //     let config = VariantsConfig::default();
+    //     let vcfs_ids = self.vcf.group_by_id();
+    //     for pair in &self.bam_pairs() {
+    //         if self.config.id_black_list.contains(&pair.0.id) {
+    //             continue;
+    //         }
+    //         let const_path = format!(
+    //             "{}/{}/diag/{}_constit.bytes.gz",
+    //             &config.result_dir, &pair.0.id, &pair.0.id
+    //         );
+    //         let constit = Path::new(&const_path);
+    //
+    //         if constit.exists() {
+    //             let vcfs: Vec<_> = vcfs_ids.iter().filter(|(id, _)| id == &pair.0.id).collect();
+    //             if let Some((_, vcfs)) = vcfs.first() {
+    //                 let mtime = constit
+    //                     .metadata()
+    //                     .context(format!("Can't access file metadata {const_path}."))?
+    //                     .mtime();
+    //                 let n_new = vcfs
+    //                     .iter()
+    //                     .filter(|vcf| mtime < vcf.file_metadata.mtime())
+    //                     .count();
+    //                 if n_new > 0 {
+    //                     tasks.push(CollectionsTasks::SomaticVariants {
+    //                         id: pair.0.id.clone(),
+    //                         config: config.clone(),
+    //                     });
+    //                 }
+    //             }
+    //         } else {
+    //             tasks.push(CollectionsTasks::SomaticVariants {
+    //                 id: pair.0.id.clone(),
+    //                 config: config.clone(),
+    //             });
+    //         }
+    //     }
+    //     Ok(tasks)
+    // }
 
     /// Runs all tasks in the collection.
     ///
@@ -710,11 +709,10 @@ impl Collections {
 pub enum CollectionsTasks {
     Align(FlowCellCase),
     DemuxAlign(Vec<FlowCellCase>),
-    WholeScan {
-        id: String,
-        time_point: String,
-        bam: String,
-        config: WholeScanConfig,
+    CountBam {
+        bam_path: String,
+        count_dir: String,
+        config: Config,
     },
     Assemble {
         id: String,
@@ -767,12 +765,11 @@ impl CollectionsTasks {
             CollectionsTasks::NanomonSV { id, .. } => {
                 NanomonSV::initialize(&id, Config::default())?.run()
             }
-            CollectionsTasks::WholeScan {
-                id,
-                time_point,
-                bam,
+            CollectionsTasks::CountBam {
+                bam_path,
+                count_dir,
                 config,
-            } => WholeScan::new(id, time_point, bam, config)?.run(),
+            } => par_whole_scan(&count_dir, &bam_path, &config),
             CollectionsTasks::SomaticVariants { id, config } => {
                 RunVariantsAgg::new(id, config).run()
             }
@@ -791,7 +788,7 @@ impl CollectionsTasks {
             CollectionsTasks::DemuxAlign(_) => 1,
             CollectionsTasks::ModPileup { .. } => 2,
             CollectionsTasks::DMRCDiagMrd { .. } => 3,
-            CollectionsTasks::WholeScan { .. } => 4,
+            CollectionsTasks::CountBam { .. } => 4,
             CollectionsTasks::Assemble { .. } => 5,
             CollectionsTasks::DeepVariant { .. } => 6,
             CollectionsTasks::ClairS { .. } => 7,
@@ -851,12 +848,11 @@ impl fmt::Display for CollectionsTasks {
             NanomonSV { id } => {
                 write!(f, "NanomonSV calling task for {id}")
             }
-            WholeScan {
-                id,
-                bam,
-                time_point,
+            CountBam {
+                bam_path,
+                count_dir,
                 ..
-            } => write!(f, "Whole scan for {} {}, bam: {}", id, time_point, bam),
+            } => write!(f, "Whole bam count for bam: {bam_path} into {count_dir}"),
             SomaticVariants { id, .. } => write!(f, "Variants aggregation for {}", id),
             Assemble { id, time_point, .. } => {
                 write!(f, "De novo assemblage for {} {}", id, time_point)

+ 3 - 2
src/collection/pod5.rs

@@ -740,9 +740,10 @@ impl FlowCellExperiment {
     }
 }
 
+type ExperimentData = (MinKnowSampleSheet, Vec<(String, u64, DateTime<Utc>)>);
 pub fn scan_archive(
     tar_path: &str,
-) -> anyhow::Result<(MinKnowSampleSheet, Vec<(String, u64, DateTime<Utc>)>)> {
+) -> anyhow::Result<ExperimentData> {
     info!("Scanning archive: {tar_path}");
 
     let file = File::open(tar_path)
@@ -794,7 +795,7 @@ pub fn scan_archive(
 
 pub fn scan_local(
     dir: &str,
-) -> anyhow::Result<(MinKnowSampleSheet, Vec<(String, u64, DateTime<Utc>)>)> {
+) -> anyhow::Result<ExperimentData> {
     let mut result = Vec::new();
     let mut sample_sheet: Option<String> = None;
 

+ 3 - 3
src/config.rs

@@ -170,9 +170,9 @@ impl Default for Config {
             modkit_summary_file: "{result_dir}/{id}/{time}/{id}_{time}_5mC_5hmC_summary.txt"
                 .to_string(),
 
-            /// Nanomonsv
-            /// tabix, bgzip, mafft in PATH
-            /// pip install pysam, parasail;  pip install nanomonsv
+            // Nanomonsv
+            // tabix, bgzip, mafft in PATH
+            // pip install pysam, parasail;  pip install nanomonsv
             nanomonsv_bin: "/home/prom/.local/bin/nanomonsv".to_string(),
             nanomonsv_output_dir: "{result_dir}/{id}/{time}/nanomonsv".to_string(),
             nanomonsv_threads: 150,

+ 7 - 9
src/helpers.rs

@@ -1,6 +1,5 @@
 use anyhow::Context;
 use bitcode::{Decode, Encode};
-use chrono::{DateTime, TimeZone, Utc};
 use glob::glob;
 use serde::{Deserialize, Serialize};
 use std::{
@@ -10,7 +9,6 @@ use std::{
     iter::Sum,
     ops::{Add, Div},
     path::{Path, PathBuf},
-    time::{SystemTime, UNIX_EPOCH},
 };
 
 pub fn find_unique_file(dir_path: &str, suffix: &str) -> anyhow::Result<String> {
@@ -65,7 +63,7 @@ pub fn path_prefix(out: &str) -> anyhow::Result<String> {
     Ok(format!("{}/{stem}", out_dir.display()))
 }
 
-pub fn force_or_not(path: &str, force: bool) -> anyhow::Result<()> {
+pub fn force_or_not(_path: &str, _force: bool) -> anyhow::Result<()> {
     // let path = Path::new(path);
     // let mut output_exists = path.exists();
     // let dir = path
@@ -439,12 +437,12 @@ pub fn find_files(pattern: &str) -> Vec<PathBuf> {
     result
 }
 
-fn system_time_to_utc(system_time: SystemTime) -> Option<DateTime<Utc>> {
-    system_time
-        .duration_since(UNIX_EPOCH)
-        .ok()
-        .map(|duration| Utc.timestamp(duration.as_secs() as i64, duration.subsec_nanos()))
-}
+// fn system_time_to_utc(system_time: SystemTime) -> Option<DateTime<Utc>> {
+//     system_time
+//         .duration_since(UNIX_EPOCH)
+//         .ok()
+//         .map(|duration| Utc.timestamp(duration.as_secs() as i64, duration.subsec_nanos()))
+// }
 
 pub fn list_directories(dir_path: &str) -> std::io::Result<Vec<String>> {
     let mut directories = Vec::new();

+ 57 - 32
src/lib.rs

@@ -45,7 +45,7 @@ mod tests {
 
     use self::{collection::pod5::{FlowCellCase, Pod5Collection}, commands::dorado, config::Config};
     use super::*;
-    use crate::{annotation::Annotation, callers::{clairs::ClairS, deep_variant::DeepVariant, nanomonsv::{NanomonSV, NanomonSVSolo}, savana::{CNSegment, SavanaCN}, Callers}, collection::{bam, pod5::{scan_archive, FlowCells}, run_tasks, vcf::VcfCollection, Collections, CollectionsConfig}, commands::dorado::Dorado, io::{dict::read_dict, gff::features_ranges}, pipes::somatic::const_stats, positions::{merge_overlapping_genome_ranges, par_overlaps, range_intersection_par, sort_ranges}, scan::scan::{par_whole_scan, somatic_scan}, variant::{variant::AlterationCategory, variants_stats::{self, VariantsStats}}};
+    use crate::{annotation::Annotation, callers::{clairs::ClairS, deep_variant::DeepVariant, nanomonsv::{NanomonSV, NanomonSVSolo}, savana::SavanaCN}, collection::{bam, pod5::{scan_archive, FlowCells}, run_tasks, vcf::VcfCollection, Collections, CollectionsConfig}, commands::dorado::Dorado, io::gff::features_ranges, pipes::somatic::const_stats, positions::{merge_overlapping_genome_ranges, range_intersection_par, sort_ranges}, scan::scan::somatic_scan, variant::{variant::AlterationCategory, variants_stats::{self, VariantsStats}}};
 
     // export RUST_LOG="debug"
     fn init() {
@@ -63,7 +63,9 @@ mod tests {
     #[test]
     fn run_dorado() -> anyhow::Result<()> {
         let case = FlowCellCase { 
-            id: "CONSIGNY".to_string(), time_point: "mrd".to_string(), barcode: "07".to_string(), pod_dir: "/data/run_data/20240326-CL/CONSIGNY-MRD-NB07_RICCO-DIAG-NB08/20240326_1355_1E_PAU78333_bc25da25/pod5_pass/barcode07".into() };
+            id: "CONSIGNY".to_string(), 
+            time_point: "mrd".to_string(), barcode: "07".to_string(), pod_dir: "/data/run_data/20240326-CL/CONSIGNY-MRD-NB07_RICCO-DIAG-NB08/20240326_1355_1E_PAU78333_bc25da25/pod5_pass/barcode07".into() 
+        };
         dorado::Dorado::init(case, Config::default())?.run_pipe()
     }
 
@@ -169,32 +171,32 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn todo_agg() -> anyhow::Result<()> {
-        init();
-        let config = CollectionsConfig::default();
-        info!("Runing todo with config: {:#?}", config);
-        let collections = Collections::new(config)?;
-        let agg_tasks = collections.todo_variants_agg()?;
-        println!("{:#?}", agg_tasks);
-        println!("{}", agg_tasks.len());
-        Ok(())
-    }
+    // #[test]
+    // fn todo_agg() -> anyhow::Result<()> {
+    //     init();
+    //     let config = CollectionsConfig::default();
+    //     info!("Runing todo with config: {:#?}", config);
+    //     let collections = Collections::new(config)?;
+    //     let agg_tasks = collections.todo_variants_agg()?;
+    //     println!("{:#?}", agg_tasks);
+    //     println!("{}", agg_tasks.len());
+    //     Ok(())
+    // }
 
-    #[test]
-    fn run_agg() -> anyhow::Result<()> {
-        init();
-        let config = CollectionsConfig {
-            id_black_list: vec!["MANCUSO".to_string(),"HAMROUNE".to_string()],
-            ..Default::default()
-        };
-        info!("Runing todo with config: {:#?}", config);
-        let mut collections = Collections::new(config)?;
-        collections.tasks = collections.todo_variants_agg()?;
-        collections.run()?;
-        
-        Ok(())
-    }
+    // #[test]
+    // fn run_agg() -> anyhow::Result<()> {
+    //     init();
+    //     let config = CollectionsConfig {
+    //         id_black_list: vec!["MANCUSO".to_string(),"HAMROUNE".to_string()],
+    //         ..Default::default()
+    //     };
+    //     info!("Runing todo with config: {:#?}", config);
+    //     let mut collections = Collections::new(config)?;
+    //     collections.tasks = collections.todo_variants_agg()?;
+    //     collections.run()?;
+    //     
+    //     Ok(())
+    // }
 
     // export RUST_LOG="debug"
     #[test]
@@ -707,8 +709,7 @@ mod tests {
     fn run_somatic_case() -> anyhow::Result<()> {
         init();
         let id = "ADJAGBA";
-        let mut config = Config::default();
-        config.somatic_pipe_force = true;
+        let config = Config { somatic_pipe_force: true, ..Default::default() };
         match Somatic::initialize(id, config)?.run() {
             Ok(_) => (),
             Err(e) => error!("{id} {e}"),
@@ -765,8 +766,7 @@ mod tests {
         let path = format!("{}/{id}/diag/somatic_variants.json.gz", config.result_dir);
         let variants = variant_collection::Variants::load_from_json(&path)?;
 
-        VariantsStats::new(&variants, id, &config)?.save_to_json(&format!("{}/{id}/diag/somatic_variants_stats.json.gz", config.result_dir));
-Ok(())
+        VariantsStats::new(&variants, id, &config)?.save_to_json(&format!("{}/{id}/diag/somatic_variants_stats.json.gz", config.result_dir))
     }
 
     #[test]
@@ -775,7 +775,7 @@ Ok(())
         let id = "ADJAGBA";
         let config = Config::default();
 
-        const_stats(id.to_string(), config);
+        let _ = const_stats(id.to_string(), config);
     }
 
     #[test]
@@ -1025,4 +1025,29 @@ Ok(())
         assert_eq!(result, expected);
     }
 
+    #[test]
+    fn todo_scan() -> anyhow::Result<()> {
+        init();
+        let mut collections = Collections::new(
+            CollectionsConfig::default()
+        )?;
+        collections.todo_bam_count(&Config::default())?;
+        collections.tasks.iter().for_each(|t| info!("{t}"));
+
+        let pool = rayon::ThreadPoolBuilder::new()
+            .num_threads(100)
+            .build()
+            .unwrap();
+
+        pool.install(move || {
+            collections.tasks.into_iter().for_each(|t| {
+                // info!("{t}");
+                if let Err(e) = t.run() {
+                    error!("{e}");
+                }
+            });
+        });
+
+        Ok(())
+    }
 }

+ 2 - 2
src/pipes/somatic.rs

@@ -239,8 +239,8 @@ impl Run for Somatic {
         let config = self.config.clone();
         let id = self.id.clone();
 
-        let result_json = format!("{}/somatic_variants.json.gz", config.tumoral_dir(&id));
-        let result_bit = format!("{}/somatic_variants.bit", config.tumoral_dir(&id));
+        let result_json = format!("{}/{id}_somatic_variants.json.gz", config.tumoral_dir(&id));
+        let result_bit = format!("{}/{id}_somatic_variants.bit", config.tumoral_dir(&id));
 
         if Path::new(&result_json).exists() && !config.somatic_pipe_force {
             return Err(anyhow::anyhow!("already exists"));

+ 1 - 4
src/positions.rs

@@ -1,7 +1,6 @@
 use std::{
     cmp::{max, Ordering},
     fmt::Display,
-    iter::Sum,
     ops::Range,
     str::FromStr,
 };
@@ -705,9 +704,7 @@ pub fn range_intersection_par(a: &[&GenomeRange], b: &[&GenomeRange]) -> Vec<Gen
 
     a_contigs.into_par_iter()
         .filter_map(|(contig, a_start, a_end)| {
-            let Some((b_start, b_end)) = find_contig_indices(&b_contigs, contig) else {
-                return None;
-            };
+            let (b_start, b_end) = find_contig_indices(&b_contigs, contig)?;
 
             let a_ranges = &a[a_start..a_end];
             let b_ranges = &b[b_start..b_end];

+ 2 - 0
src/runners.rs

@@ -165,6 +165,8 @@ impl Run for DockerRun {
             } else {
                 "?".to_string()
             };
+
+            #[allow(clippy::zombie_processes)] // the process is not zombie but waited by Wait trait
             let mut child = Command::new("docker")
                 .args(["logs", "--follow", &log_id])
                 .stdout(Stdio::piped())

+ 78 - 29
src/scan/bin.rs

@@ -1,7 +1,7 @@
 use std::collections::HashMap;
 
 use anyhow::Context;
-use log::debug;
+use log::error;
 use rust_htslib::bam::{ext::BamRecordExtensions, record::Aux, IndexedReader, Read, Record};
 
 use crate::io::bam::{primary_record, primary_records};
@@ -58,6 +58,72 @@ impl Bin {
         start: u32,
         length: u32,
         min_mapq: u8,
+    ) -> anyhow::Result<Self> {
+        let end = start + length - 1;
+
+        bam_reader
+            .fetch((contig, start as i64, end as i64))
+            .with_context(|| format!("Failed to fetch region {}:{}-{}", contig, start, end))?;
+
+        let mut reads_store: HashMap<Vec<u8>, Record> = HashMap::new();
+        let mut n_low_mapq = 0;
+        let mut depths = vec![0u32; length as usize]; // Optional pseudo-depth
+
+        for record_result in bam_reader.rc_records() {
+            let rc_record = match record_result {
+                Ok(rc) => rc,
+                Err(e) => {
+                    error!(
+                        "Failed to parse record in {}:{}-{}: {}",
+                        contig, start, end, e
+                    );
+                    continue;
+                }
+            };
+
+            let record = rc_record.as_ref();
+
+            if record.mapq() < min_mapq {
+                n_low_mapq += 1;
+                continue;
+            }
+
+            if let Some((read_start, read_end)) = read_range(record) {
+                if read_end < start || read_start > end {
+                    continue; // Not overlapping this bin
+                }
+
+                // Clone the underlying Record
+                reads_store.insert(record.qname().to_vec(), record.clone());
+
+                // Optional: depth accumulation
+                let local_start = start.max(read_start);
+                let local_end = end.min(read_end);
+                for pos in local_start..=local_end {
+                    let i = (pos - start) as usize;
+                    if i < depths.len() {
+                        depths[i] += 1;
+                    }
+                }
+            }
+        }
+
+        Ok(Bin {
+            contig: contig.to_string(),
+            start,
+            end,
+            reads_store,
+            n_low_mapq,
+            depths,
+        })
+    }
+
+    pub fn new_old(
+        bam_reader: &mut IndexedReader,
+        contig: &str,
+        start: u32,
+        length: u32,
+        min_mapq: u8,
     ) -> anyhow::Result<Self> {
         // let mut bam_reader = IndexedReader::from_path(bam_path)
         //     .with_context(|| format!("Can't open BAM file: {}", bam_path))?;
@@ -74,34 +140,6 @@ impl Bin {
 
         let mut reads_store: HashMap<Vec<u8>, Record> = HashMap::new();
         let mut n_low_mapq = 0;
-        // let mut lengths = Vec::new();
-
-        // Process all records in the region
-        // for read_result in bam_reader.records() {
-        //     let record = read_result.context("Error while parsing BAM record")?;
-        //
-        //     // Skip reads with low mapping quality
-        //     if record.mapq() < min_mapq {
-        //         n_low_mapq += 1;
-        //         continue;
-        //     }
-        //
-        //     // Store the read length
-        //     lengths.push(
-        //         record
-        //             .reference_end()
-        //             .saturating_sub(record.reference_start()),
-        //     );
-        //
-        //     // Store the record by query name
-        //     reads_store.insert(record.qname().to_vec(), record);
-        // }
-        // Calculate mean read length, handling the empty case
-        // let reads_mean_len = if lengths.is_empty() {
-        //     0
-        // } else {
-        //     (lengths.iter().sum::<i64>() as f64 / lengths.len() as f64) as u32
-        // };
 
         let mut depths = Vec::new();
         for pileup in bam_reader.pileup() {
@@ -377,3 +415,14 @@ impl Bin {
         self.depths.iter().sum::<u32>() as f64 / self.depths.len() as f64
     }
 }
+
+/// Helper: get start and end position of a read
+fn read_range(record: &Record) -> Option<(u32, u32)> {
+    let start = record.pos();
+    let end = record.cigar().end_pos();
+    if start >= 0 && end > start {
+        Some((start as u32, end as u32 - 1))
+    } else {
+        None
+    }
+}

+ 114 - 28
src/scan/scan.rs

@@ -1,3 +1,4 @@
+use std::cell::RefCell;
 use std::{fmt, fs, io::Write};
 
 use anyhow::Context;
@@ -273,52 +274,136 @@ pub fn par_whole_scan(out_dir: &str, bam_path: &str, config: &Config) -> anyhow:
         info!("Scan of contig: {contig}");
 
         let mut bins: Vec<BinCount> = (0..n_chunks)
+            .into_par_iter()
+            .map_init(
+                || IndexedReader::from_path(bam_path).unwrap(), // per-thread reader
+                |mut bam_reader, i| {
+                    // .flat_map(|i| {
+                    // Calculate chunk start position
+                    let chunk_start = i * chunk_n_bin * bin_size;
+
+                    // Calculate chunk length
+                    let chunk_length = if i == n_chunks - 1 {
+                        length - chunk_start // Handle last chunk
+                    } else {
+                        chunk_n_bin * bin_size // Standard chunk size
+                    };
+
+                    // Calculate number of bins in this chunk with ceiling division
+                    let n_bins_in_chunk = chunk_length.div_ceil(bin_size);
+
+                    // let mut bam_reader = IndexedReader::from_path(bam_path)
+                    //     .with_context(|| format!("Failed to open BAM file: {}", bam_path))
+                    //     .unwrap();
+
+                    // Process each bin in the chunk
+                    (0..n_bins_in_chunk)
+                        // .into_iter()
+                        .filter_map(|j| {
+                            let bin_start = chunk_start + j * bin_size;
+                            // Ensure we don't exceed remaining length
+                            let bin_length = std::cmp::min(bin_size, chunk_length - j * bin_size);
+                            // debug!("chunk start:{chunk_start}, length: {chunk_length}, n_bins: {n_bins_in_chunk}, first bin start: {bin_start} bin length: {bin_length}");
+                            match Bin::new(
+                                &mut bam_reader,
+                                &contig,
+                                bin_start,
+                                bin_length,
+                                config.bam_min_mapq,
+                            ) {
+                                Ok(bin) => Some(BinCount::from(&bin)),
+                                Err(e) => {
+                                    error!("Failed to get Bin: {e}");
+                                    None
+                                }
+                            }
+                        })
+                        .collect::<Vec<BinCount>>()
+                },
+            )
+                .flatten()
+            .collect();
+
+        debug!("Scan {contig}, sorting bins");
+        bins.par_sort_unstable_by(|a, b| a.start.cmp(&b.start));
+
+        debug!("Scan {contig}, computing outliers");
+        fill_outliers(&mut bins);
+
+        let out_file = format!("{out_dir}/{contig}_count.tsv.gz");
+        debug!("Scan {contig}, writing file");
+
+        let mut file = get_gz_writer(&out_file, true)
+            .with_context(|| anyhow::anyhow!("failed to open the file: {out_file}"))?;
+        for bin in bins {
+            writeln!(file, "{}", bin.to_tsv_row())?;
+        }
+    }
+    Ok(())
+}
+
+thread_local! {
+    static BAM_READER: RefCell<Option<IndexedReader>> = const { RefCell::new(None) };
+}
+
+pub fn par_whole_scan_local(out_dir: &str, bam_path: &str, config: &Config) -> anyhow::Result<()> {
+    let bin_size = config.count_bin_size;
+    let chunk_n_bin = config.count_n_chunks;
+    info!("Starting whole genome scan for {bam_path}, with bin size of {bin_size} nt and by chunks of {chunk_n_bin} bins.");
+    fs::create_dir_all(out_dir)?;
+
+    for (contig, length) in read_dict(&config.dict_file)? {
+        let n_bin = length / bin_size;
+        let n_chunks = n_bin.div_ceil(chunk_n_bin);
+        info!("Scan of contig: {contig}");
+
+        let bins: Vec<BinCount> = (0..n_chunks)
             .into_par_iter()
             .flat_map(|i| {
-                // Calculate chunk start position
                 let chunk_start = i * chunk_n_bin * bin_size;
-
-                // Calculate chunk length
                 let chunk_length = if i == n_chunks - 1 {
-                    length - chunk_start // Handle last chunk
+                    length - chunk_start
                 } else {
-                    chunk_n_bin * bin_size // Standard chunk size
+                    chunk_n_bin * bin_size
                 };
-
-                // Calculate number of bins in this chunk with ceiling division
                 let n_bins_in_chunk = chunk_length.div_ceil(bin_size);
 
-                let mut bam_reader = IndexedReader::from_path(bam_path)
-                    .with_context(|| format!("Failed to open BAM file: {}", bam_path))
-                    .unwrap();
+                // Use thread-local BAM reader
+                let result = BAM_READER.with(|reader_cell| {
+                    let mut reader_ref = reader_cell.borrow_mut();
 
-                // Process each bin in the chunk
-                (0..n_bins_in_chunk)
-                    // .into_iter()
-                    .filter_map(|j| {
+                    // Initialize if not already set
+                    if reader_ref.is_none() {
+                        let reader = IndexedReader::from_path(bam_path)
+                            .with_context(|| format!("Failed to open BAM file: {}", bam_path))
+                            .ok()?; // handle error as Option
+                        *reader_ref = Some(reader);
+                    }
+
+                    let reader = reader_ref.as_mut().unwrap();
+
+                    // Seek to contig start for this chunk
+                    let mut bins_in_chunk = Vec::new();
+                    for j in 0..n_bins_in_chunk {
                         let bin_start = chunk_start + j * bin_size;
-                        // Ensure we don't exceed remaining length
                         let bin_length = std::cmp::min(bin_size, chunk_length - j * bin_size);
-                        // debug!("chunk start:{chunk_start}, length: {chunk_length}, n_bins: {n_bins_in_chunk}, first bin start: {bin_start} bin length: {bin_length}");
-                        match Bin::new(
-                            &mut bam_reader,
-                            &contig,
-                            bin_start,
-                            bin_length,
-                            config.bam_min_mapq,
-                        ) {
-                            Ok(bin) => Some(BinCount::from(&bin)),
+                        match Bin::new(reader, &contig, bin_start, bin_length, config.bam_min_mapq)
+                        {
+                            Ok(bin) => bins_in_chunk.push(BinCount::from(&bin)),
                             Err(e) => {
-                                error!("Failed to get Bin: {e}");
-                                None
+                                error!("Failed to get Bin at chunk {i} bin {j}: {e}");
                             }
                         }
-                    })
-                    .collect::<Vec<BinCount>>()
+                    }
+                    Some(bins_in_chunk)
+                });
+
+                result.into_iter().flatten().collect::<Vec<_>>()
             })
             .collect();
 
         debug!("Scan {contig}, sorting bins");
+        let mut bins = bins;
         bins.par_sort_unstable_by(|a, b| a.start.cmp(&b.start));
 
         debug!("Scan {contig}, computing outliers");
@@ -333,6 +418,7 @@ pub fn par_whole_scan(out_dir: &str, bam_path: &str, config: &Config) -> anyhow:
             writeln!(file, "{}", bin.to_tsv_row())?;
         }
     }
+
     Ok(())
 }
 

+ 5 - 15
src/variant/variant_collection.rs

@@ -1,5 +1,5 @@
 use std::{
-    collections::{BTreeMap, HashMap, HashSet},
+    collections::{HashMap, HashSet},
     fs::{self, File},
     io::{Read, Write},
     path::Path,
@@ -9,9 +9,7 @@ use anyhow::Context;
 use bgzip::{BGZFReader, BGZFWriter};
 use bitcode::{Decode, Encode};
 use csv::ReaderBuilder;
-use dashmap::DashMap;
 use log::{debug, error, info, warn};
-use ordered_float::OrderedFloat;
 use rayon::prelude::*;
 use serde::{Deserialize, Serialize};
 use uuid::Uuid;
@@ -22,23 +20,16 @@ use crate::{
         cosmic::Cosmic,
         echtvar::{parse_echtvar_val, run_echtvar},
         gnomad::GnomAD,
-        vep::{get_best_vep, run_vep, VepImpact, VepLine, VEP},
+        vep::{get_best_vep, run_vep, VepLine, VEP},
         Annotation, Annotations,
     },
     collection::{
         bam::{counts_at, counts_ins_at},
         vcf::Vcf,
     },
-    config::Config,
-    helpers::{app_storage_dir, bin_data, estimate_shannon_entropy, mean, temp_file_path, Hash128},
-    io::{
-        dict::read_dict, fasta::sequence_at, gff::features_ranges, readers::get_reader,
-        vcf::vcf_header,
-    },
-    positions::{
-        merge_overlapping_genome_ranges, par_overlaps, GenomePosition, GenomeRange,
-        GetGenomePosition,
-    },
+    helpers::{app_storage_dir, estimate_shannon_entropy, mean, temp_file_path, Hash128},
+    io::{fasta::sequence_at, readers::get_reader, vcf::vcf_header},
+    positions::{GenomePosition, GetGenomePosition},
 };
 
 /// A collection of VCF variants along with associated metadata.
@@ -794,7 +785,6 @@ impl Variants {
     }
 }
 
-
 /// Creates a new Variant instance from a collection of VcfVariants and annotations.
 ///
 /// This function consolidates information from one or more VcfVariants into a single Variant,

+ 34 - 22
src/variant/variants_stats.rs

@@ -12,7 +12,10 @@ use crate::{
     config::Config,
     helpers::bin_data,
     io::{dict::read_dict, gff::features_ranges, readers::get_gz_reader, writers::get_gz_writer},
-    positions::{contig_to_num, merge_overlapping_genome_ranges, par_overlaps, range_intersection_par, GenomeRange},
+    positions::{
+        contig_to_num, merge_overlapping_genome_ranges, par_overlaps, range_intersection_par,
+        GenomeRange,
+    },
     scan::scan::BinCount,
 };
 
@@ -64,7 +67,7 @@ where
 }
 
 impl VariantsStats {
-    pub fn new(variants: &Variants, id:&str, config: &Config) -> anyhow::Result<Self> {
+    pub fn new(variants: &Variants, id: &str, config: &Config) -> anyhow::Result<Self> {
         let n = variants.data.len() as u32;
         let alteration_categories: DashMap<String, u32> = DashMap::new();
         let vep_impact: DashMap<String, u32> = DashMap::new();
@@ -145,19 +148,21 @@ impl VariantsStats {
             })
             .collect();
 
-        let exon_ranges = features_ranges("exon", &config)?;
+        let exon_ranges = features_ranges("exon", config)?;
         let exon_ranges = merge_overlapping_genome_ranges(&exon_ranges);
 
-        let all_somatic_rates = somatic_rates(&variants.data, &exon_ranges, &config)?;
+        let all_somatic_rates = somatic_rates(&variants.data, &exon_ranges, config)?;
 
-        let mut high_depth_ranges = high_depth_somatic(id, &config)?;
-        high_depth_ranges.par_sort_by_key(|r| ( r.contig, r.range.start ));
+        let mut high_depth_ranges = high_depth_somatic(id, config)?;
+        high_depth_ranges.par_sort_by_key(|r| (r.contig, r.range.start));
 
         let exon_ranges_ref: Vec<&GenomeRange> = exon_ranges.iter().collect();
-        let exons_high_depth = range_intersection_par(&high_depth_ranges.iter().collect::<Vec<&GenomeRange>>(), &exon_ranges_ref);
-
-        let high_depth = somatic_rates(&variants.data, &exons_high_depth, &config)?;
+        let exons_high_depth = range_intersection_par(
+            &high_depth_ranges.iter().collect::<Vec<&GenomeRange>>(),
+            &exon_ranges_ref,
+        );
 
+        let high_depth = somatic_rates(&variants.data, &exons_high_depth, config)?;
 
         Ok(Self {
             n,
@@ -173,7 +178,7 @@ impl VariantsStats {
             genes_nonsynonymus,
             n_gnomad,
             somatic_rates: all_somatic_rates,
-            high_depth_somatic_rates: high_depth
+            high_depth_somatic_rates: high_depth,
         })
     }
 
@@ -385,38 +390,45 @@ pub fn high_depth_somatic(id: &str, config: &Config) -> anyhow::Result<Vec<Genom
     Ok(results.into_iter().flatten().collect())
 }
 
+/// Converts a slice of booleans into a list of `GenomeRange`s representing
+/// consecutive `true` values, offset by a `start` position and tagged with a contig ID.
+///
+/// # Arguments
+/// - `vec`: A slice of booleans (`true` means "active" position)
+/// - `start`: The offset to add to all index positions (e.g., bin or genomic start position)
+/// - `contig`: The contig name (e.g., "chr1", "chrX") to be mapped to a numerical ID
+///
+/// # Returns
+/// A vector of `GenomeRange` objects, each corresponding to a consecutive sequence of `true` values.
 fn ranges_from_consecutive_true(vec: &[bool], start: u32, contig: &str) -> Vec<GenomeRange> {
     let contig = contig_to_num(contig);
     let mut ranges = Vec::new();
     let mut current_start: Option<u32> = None;
 
-    // Iterate through elements starting from specified position
-    for (i, &value) in vec.iter().enumerate() {
-        let i = i as u32 + start;
-        match (value, current_start) {
-            // Begin new range
-            (true, None) => current_start = Some(i),
-            // Finalize current range
+    for (idx, &is_true) in vec.iter().enumerate() {
+        let i = idx as u32 + start;
+
+        match (is_true, current_start) {
+            (true, None) => current_start = Some(i), // Start new range
             (false, Some(start_idx)) => {
-                // ranges.push(start_idx..i);
                 ranges.push(GenomeRange {
                     contig,
                     range: start_idx..i,
                 });
                 current_start = None;
             }
-            _ => {}
+            _ => {} // Continue range or stay idle
         }
     }
 
-    // Add any remaining active range
+    // If the final values were true, we need to flush the last range
     if let Some(start_idx) = current_start {
-        // ranges.push(start_idx..vec.len() as u32);
         ranges.push(GenomeRange {
             contig,
-            range: start_idx..vec.len() as u32 + start,
+            range: start_idx..(vec.len() as u32 + start),
         });
     }
 
     ranges
 }
+