|
|
@@ -1,4 +1,3 @@
|
|
|
-use std::cell::RefCell;
|
|
|
use std::{fmt, fs, io::Write};
|
|
|
|
|
|
use anyhow::Context;
|
|
|
@@ -10,9 +9,12 @@ use rayon::{
|
|
|
};
|
|
|
use rust_htslib::bam::IndexedReader;
|
|
|
|
|
|
+use crate::collection::{Initialize, ShouldRun};
|
|
|
+use crate::helpers::is_file_older;
|
|
|
use crate::io::writers::get_gz_writer;
|
|
|
use crate::math::filter_outliers_modified_z_score_with_indices;
|
|
|
|
|
|
+use crate::runners::Run;
|
|
|
use crate::{config::Config, io::dict::read_dict, scan::bin::Bin};
|
|
|
|
|
|
/// Represents a count of reads in a genomic bin, including various metrics and outlier information.
|
|
|
@@ -261,13 +263,25 @@ impl fmt::Display for BinOutlier {
|
|
|
/// - The dictionary file cannot be read.
|
|
|
/// - A `Bin` object cannot be created for a specific region.
|
|
|
/// - Any I/O operation (e.g., writing results) fails.
|
|
|
-pub fn par_whole_scan(out_dir: &str, bam_path: &str, config: &Config) -> anyhow::Result<()> {
|
|
|
+pub fn par_whole_scan(id: &str, time_point: &str, config: &Config) -> anyhow::Result<()> {
|
|
|
let bin_size = config.count_bin_size;
|
|
|
let chunk_n_bin = config.count_n_chunks;
|
|
|
+ let bam_path = &config.solo_bam(id, time_point);
|
|
|
+ let out_dir = config.somatic_scan_solo_output_dir(id, time_point);
|
|
|
+
|
|
|
info!("Starting whole genome scan for {bam_path}, with bin size of {bin_size} nt and by chunks of {chunk_n_bin} bins.");
|
|
|
- fs::create_dir_all(out_dir)?;
|
|
|
+ fs::create_dir_all(&out_dir)?;
|
|
|
|
|
|
for (contig, length) in read_dict(&config.dict_file)? {
|
|
|
+ let out_file = config.somatic_scan_solo_count_file(id, time_point, &contig);
|
|
|
+ // let out_file = format!("{out_dir}/{contig}_count.tsv.gz");
|
|
|
+
|
|
|
+ // Skip this file if it already exists and is up-to-date compared to the input BAM,
|
|
|
+ // unless forced by the `somatic_scan_force` flag.
|
|
|
+ if !is_file_older(&out_file, bam_path).unwrap_or(true) && !config.somatic_scan_force {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
let n_bin = length / bin_size;
|
|
|
// Calculate number of chunks using ceiling division
|
|
|
let n_chunks = n_bin.div_ceil(chunk_n_bin);
|
|
|
@@ -321,7 +335,7 @@ pub fn par_whole_scan(out_dir: &str, bam_path: &str, config: &Config) -> anyhow:
|
|
|
.collect::<Vec<BinCount>>()
|
|
|
},
|
|
|
)
|
|
|
- .flatten()
|
|
|
+ .flatten()
|
|
|
.collect();
|
|
|
|
|
|
debug!("Scan {contig}, sorting bins");
|
|
|
@@ -330,7 +344,6 @@ pub fn par_whole_scan(out_dir: &str, bam_path: &str, config: &Config) -> anyhow:
|
|
|
debug!("Scan {contig}, computing outliers");
|
|
|
fill_outliers(&mut bins);
|
|
|
|
|
|
- let out_file = format!("{out_dir}/{contig}_count.tsv.gz");
|
|
|
debug!("Scan {contig}, writing file");
|
|
|
|
|
|
let mut file = get_gz_writer(&out_file, true)
|
|
|
@@ -342,85 +355,92 @@ pub fn par_whole_scan(out_dir: &str, bam_path: &str, config: &Config) -> anyhow:
|
|
|
Ok(())
|
|
|
}
|
|
|
|
|
|
-thread_local! {
|
|
|
- static BAM_READER: RefCell<Option<IndexedReader>> = const { RefCell::new(None) };
|
|
|
-}
|
|
|
-
|
|
|
-pub fn par_whole_scan_local(out_dir: &str, bam_path: &str, config: &Config) -> anyhow::Result<()> {
|
|
|
- let bin_size = config.count_bin_size;
|
|
|
- let chunk_n_bin = config.count_n_chunks;
|
|
|
- info!("Starting whole genome scan for {bam_path}, with bin size of {bin_size} nt and by chunks of {chunk_n_bin} bins.");
|
|
|
- fs::create_dir_all(out_dir)?;
|
|
|
-
|
|
|
- for (contig, length) in read_dict(&config.dict_file)? {
|
|
|
- let n_bin = length / bin_size;
|
|
|
- let n_chunks = n_bin.div_ceil(chunk_n_bin);
|
|
|
- info!("Scan of contig: {contig}");
|
|
|
-
|
|
|
- let bins: Vec<BinCount> = (0..n_chunks)
|
|
|
- .into_par_iter()
|
|
|
- .flat_map(|i| {
|
|
|
- let chunk_start = i * chunk_n_bin * bin_size;
|
|
|
- let chunk_length = if i == n_chunks - 1 {
|
|
|
- length - chunk_start
|
|
|
- } else {
|
|
|
- chunk_n_bin * bin_size
|
|
|
- };
|
|
|
- let n_bins_in_chunk = chunk_length.div_ceil(bin_size);
|
|
|
-
|
|
|
- // Use thread-local BAM reader
|
|
|
- let result = BAM_READER.with(|reader_cell| {
|
|
|
- let mut reader_ref = reader_cell.borrow_mut();
|
|
|
-
|
|
|
- // Initialize if not already set
|
|
|
- if reader_ref.is_none() {
|
|
|
- let reader = IndexedReader::from_path(bam_path)
|
|
|
- .with_context(|| format!("Failed to open BAM file: {}", bam_path))
|
|
|
- .ok()?; // handle error as Option
|
|
|
- *reader_ref = Some(reader);
|
|
|
- }
|
|
|
-
|
|
|
- let reader = reader_ref.as_mut().unwrap();
|
|
|
-
|
|
|
- // Seek to contig start for this chunk
|
|
|
- let mut bins_in_chunk = Vec::new();
|
|
|
- for j in 0..n_bins_in_chunk {
|
|
|
- let bin_start = chunk_start + j * bin_size;
|
|
|
- let bin_length = std::cmp::min(bin_size, chunk_length - j * bin_size);
|
|
|
- match Bin::new(reader, &contig, bin_start, bin_length, config.bam_min_mapq)
|
|
|
- {
|
|
|
- Ok(bin) => bins_in_chunk.push(BinCount::from(&bin)),
|
|
|
- Err(e) => {
|
|
|
- error!("Failed to get Bin at chunk {i} bin {j}: {e}");
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- Some(bins_in_chunk)
|
|
|
- });
|
|
|
-
|
|
|
- result.into_iter().flatten().collect::<Vec<_>>()
|
|
|
- })
|
|
|
- .collect();
|
|
|
-
|
|
|
- debug!("Scan {contig}, sorting bins");
|
|
|
- let mut bins = bins;
|
|
|
- bins.par_sort_unstable_by(|a, b| a.start.cmp(&b.start));
|
|
|
-
|
|
|
- debug!("Scan {contig}, computing outliers");
|
|
|
- fill_outliers(&mut bins);
|
|
|
-
|
|
|
- let out_file = format!("{out_dir}/{contig}_count.tsv.gz");
|
|
|
- debug!("Scan {contig}, writing file");
|
|
|
-
|
|
|
- let mut file = get_gz_writer(&out_file, true)
|
|
|
- .with_context(|| anyhow::anyhow!("failed to open the file: {out_file}"))?;
|
|
|
- for bin in bins {
|
|
|
- writeln!(file, "{}", bin.to_tsv_row())?;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- Ok(())
|
|
|
-}
|
|
|
+// thread_local! {
|
|
|
+// static BAM_READER: RefCell<Option<IndexedReader>> = const { RefCell::new(None) };
|
|
|
+// }
|
|
|
+//
|
|
|
+// pub fn par_whole_scan_local(out_dir: &str, bam_path: &str, config: &Config) -> anyhow::Result<()> {
|
|
|
+// let bin_size = config.count_bin_size;
|
|
|
+// let chunk_n_bin = config.count_n_chunks;
|
|
|
+// info!("Starting whole genome scan for {bam_path}, with bin size of {bin_size} nt and by chunks of {chunk_n_bin} bins.");
|
|
|
+// fs::create_dir_all(out_dir)?;
|
|
|
+//
|
|
|
+// for (contig, length) in read_dict(&config.dict_file)? {
|
|
|
+// let out_file = format!("{out_dir}/{contig}_count.tsv.gz");
|
|
|
+//
|
|
|
+// // Skip this file if it already exists and is up-to-date compared to the input BAM,
|
|
|
+// // unless forced by the `somatic_scan_force` flag.
|
|
|
+// if !is_file_older(&out_file, bam_path).unwrap_or(true) && !config.somatic_scan_force {
|
|
|
+// continue;
|
|
|
+// }
|
|
|
+//
|
|
|
+// let n_bin = length / bin_size;
|
|
|
+// let n_chunks = n_bin.div_ceil(chunk_n_bin);
|
|
|
+// info!("Scan of contig: {contig}");
|
|
|
+//
|
|
|
+// let bins: Vec<BinCount> = (0..n_chunks)
|
|
|
+// .into_par_iter()
|
|
|
+// .flat_map(|i| {
|
|
|
+// let chunk_start = i * chunk_n_bin * bin_size;
|
|
|
+// let chunk_length = if i == n_chunks - 1 {
|
|
|
+// length - chunk_start
|
|
|
+// } else {
|
|
|
+// chunk_n_bin * bin_size
|
|
|
+// };
|
|
|
+// let n_bins_in_chunk = chunk_length.div_ceil(bin_size);
|
|
|
+//
|
|
|
+// // Use thread-local BAM reader
|
|
|
+// let result = BAM_READER.with(|reader_cell| {
|
|
|
+// let mut reader_ref = reader_cell.borrow_mut();
|
|
|
+//
|
|
|
+// // Initialize if not already set
|
|
|
+// if reader_ref.is_none() {
|
|
|
+// let reader = IndexedReader::from_path(bam_path)
|
|
|
+// .with_context(|| format!("Failed to open BAM file: {}", bam_path))
|
|
|
+// .ok()?; // handle error as Option
|
|
|
+// *reader_ref = Some(reader);
|
|
|
+// }
|
|
|
+//
|
|
|
+// let reader = reader_ref.as_mut().unwrap();
|
|
|
+//
|
|
|
+// // Seek to contig start for this chunk
|
|
|
+// let mut bins_in_chunk = Vec::new();
|
|
|
+// for j in 0..n_bins_in_chunk {
|
|
|
+// let bin_start = chunk_start + j * bin_size;
|
|
|
+// let bin_length = std::cmp::min(bin_size, chunk_length - j * bin_size);
|
|
|
+// match Bin::new(reader, &contig, bin_start, bin_length, config.bam_min_mapq)
|
|
|
+// {
|
|
|
+// Ok(bin) => bins_in_chunk.push(BinCount::from(&bin)),
|
|
|
+// Err(e) => {
|
|
|
+// error!("Failed to get Bin at chunk {i} bin {j}: {e}");
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|
|
|
+// Some(bins_in_chunk)
|
|
|
+// });
|
|
|
+//
|
|
|
+// result.into_iter().flatten().collect::<Vec<_>>()
|
|
|
+// })
|
|
|
+// .collect();
|
|
|
+//
|
|
|
+// debug!("Scan {contig}, sorting bins");
|
|
|
+// let mut bins = bins;
|
|
|
+// bins.par_sort_unstable_by(|a, b| a.start.cmp(&b.start));
|
|
|
+//
|
|
|
+// debug!("Scan {contig}, computing outliers");
|
|
|
+// fill_outliers(&mut bins);
|
|
|
+//
|
|
|
+// debug!("Scan {contig}, writing file");
|
|
|
+//
|
|
|
+// let mut file = get_gz_writer(&out_file, true)
|
|
|
+// .with_context(|| anyhow::anyhow!("failed to open the file: {out_file}"))?;
|
|
|
+// for bin in bins {
|
|
|
+// writeln!(file, "{}", bin.to_tsv_row())?;
|
|
|
+// }
|
|
|
+// }
|
|
|
+//
|
|
|
+// Ok(())
|
|
|
+// }
|
|
|
|
|
|
/// Identifies and marks outliers in a slice of `BinCount` objects based on various ratio metrics.
|
|
|
///
|
|
|
@@ -541,3 +561,96 @@ pub fn somatic_scan(id: &str, config: &Config) -> anyhow::Result<()> {
|
|
|
config,
|
|
|
)
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
+/// A pipeline runner for executing SomaticScan on matched tumor and normal samples.
|
|
|
+///
|
|
|
+/// This struct encapsulates:
|
|
|
+/// - Initialization and conditional cleanup of prior outputs
|
|
|
+/// - Logic for checking whether a re-run is necessary (based on BAM and output timestamps)
|
|
|
+/// - Coordinated parallel scanning of both normal and tumoral inputs
|
|
|
+#[derive(Debug)]
|
|
|
+pub struct SomaticScan {
|
|
|
+ id: String,
|
|
|
+ config: Config,
|
|
|
+}
|
|
|
+
|
|
|
+impl Initialize for SomaticScan {
|
|
|
+ /// Initializes a SomaticScan runner.
|
|
|
+ ///
|
|
|
+ /// If force is enabled in the config, both the normal and tumoral output directories
|
|
|
+ /// are deleted to ensure a clean re-run.
|
|
|
+ ///
|
|
|
+ /// # Arguments
|
|
|
+ /// * `id` - The sample ID for the scan
|
|
|
+ /// * `config` - Configuration for input/output paths and behavior
|
|
|
+ ///
|
|
|
+ /// # Returns
|
|
|
+ /// A fully initialized `SomaticScan` instance ready for execution
|
|
|
+ ///
|
|
|
+ /// # Errors
|
|
|
+ /// Returns an error if directory deletion fails during force cleanup.
|
|
|
+ fn initialize(id: &str, config: Config) -> anyhow::Result<Self> {
|
|
|
+ info!("Initialize SomaticScan for {id}.");
|
|
|
+
|
|
|
+ let somatic_scan = Self {
|
|
|
+ id: id.to_string(),
|
|
|
+ config,
|
|
|
+ };
|
|
|
+
|
|
|
+ // Force re-run: clean up previous output directories for both normal and tumoral scans
|
|
|
+ if somatic_scan.config.somatic_scan_force {
|
|
|
+ fs::remove_dir_all(somatic_scan.config.somatic_scan_normal_output_dir(id))?;
|
|
|
+ fs::remove_dir_all(somatic_scan.config.somatic_scan_tumoral_output_dir(id))?;
|
|
|
+ }
|
|
|
+
|
|
|
+ Ok(somatic_scan)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl ShouldRun for SomaticScan {
|
|
|
+ /// Determines whether SomaticScan should re-run by checking whether
|
|
|
+ /// any of the count output files are outdated or missing relative to the BAMs.
|
|
|
+ fn should_run(&self) -> bool {
|
|
|
+ let mrd_bam_path = &self.config.normal_bam(&self.id);
|
|
|
+ let diag_bam_path = &self.config.tumoral_bam(&self.id);
|
|
|
+
|
|
|
+ match read_dict(&self.config.dict_file) {
|
|
|
+ Ok(dict) => {
|
|
|
+ for (contig, _) in dict {
|
|
|
+ let diag_count_file = self
|
|
|
+ .config
|
|
|
+ .somatic_scan_tumoral_count_file(&self.id, &contig);
|
|
|
+ if is_file_older(&diag_count_file, diag_bam_path).unwrap_or(true) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ let mrd_count_file = self
|
|
|
+ .config
|
|
|
+ .somatic_scan_normal_count_file(&self.id, &contig);
|
|
|
+ if is_file_older(&mrd_count_file, mrd_bam_path).unwrap_or(true) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ false
|
|
|
+ }
|
|
|
+ Err(e) => {
|
|
|
+ error!("Failed to read dict file: {}\n{e}", self.config.dict_file);
|
|
|
+ // Don't run if dict is unreadable
|
|
|
+ false
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl Run for SomaticScan {
|
|
|
+ /// Executes the full scan pipeline in parallel for first normal then tumoral samples.
|
|
|
+ ///
|
|
|
+ /// # Returns
|
|
|
+ /// An error if the underlying scan function (`par_whole_scan`) fails for either sample.
|
|
|
+ fn run(&mut self) -> anyhow::Result<()> {
|
|
|
+ info!("Starting scan for {} normal.", self.id);
|
|
|
+ par_whole_scan(&self.id, &self.config.normal_name, &self.config)?;
|
|
|
+ info!("Starting scan for {} tumoral.", self.id);
|
|
|
+ par_whole_scan(&self.id, &self.config.tumoral_name, &self.config)
|
|
|
+ }
|
|
|
+}
|