Thomas 1 week ago
parent
commit
8b2c676daf
4 changed files with 143 additions and 45 deletions
  1. 1 1
      pandora-config.example.toml
  2. 9 0
      slurm-2507862.out
  3. 23 40
      src/callers/deep_variant.rs
  4. 110 4
      src/helpers.rs

+ 1 - 1
pandora-config.example.toml

@@ -165,7 +165,7 @@ min_n_callers = 1
 deepvariant_output_dir = "{result_dir}/{id}/{time}/DeepVariant"
 
 # Threads for DeepVariant.
-deepvariant_threads = 40
+deepvariant_threads = 20
 
 # DeepVariant singularity image path
 deepvariant_image = "/mnt/beegfs02/scratch/t_steimle/somatic_pipe_tools/deepvariant_latest-gpu.sif"

File diff suppressed because it is too large
+ 9 - 0
slurm-2507862.out


+ 23 - 40
src/callers/deep_variant.rs

@@ -2,6 +2,7 @@ use anyhow::Context;
 use log::{debug, info};
 use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
 use regex::Regex;
+use rust_htslib::bam::{self, Read};
 use std::{
     fmt, fs,
     path::{Path, PathBuf},
@@ -15,10 +16,13 @@ use crate::{
         vcf::Vcf,
     },
     commands::{
-        SlurmRunner, bcftools::{BcftoolsConcat, BcftoolsKeepPass}, run_many_sbatch
+        bcftools::{BcftoolsConcat, BcftoolsKeepPass},
+        run_many_sbatch, SlurmRunner,
     },
     config::Config,
-    helpers::{is_file_older, remove_dir_if_exists},
+    helpers::{
+        get_genome_sizes, is_file_older, remove_dir_if_exists, split_genome_into_n_regions_exact,
+    },
     io::vcf::read_vcf,
     pipes::{InitializeSolo, ShouldRun, Version},
     runners::Run,
@@ -236,14 +240,15 @@ impl JobCommand for DeepVariant {
 
         format!(
             "{singularity_bin} exec --nv \
-            --bind /data:/data \
+            --bind /mnt:/mnt \
+            --bind /home/t_steimle:/home/t_steimle \
             --bind {output_dir}:/output \
             {image} \
             /opt/deepvariant/bin/run_deepvariant \
             --model_type={model_type} \
             --ref={reference} \
             --reads={bam} \
-            --regions='{regions}' \
+            --regions=\"{regions}\" \
             {haploid_flag} \
             --par_regions_bed={par_bed} \
             --output_vcf={output_vcf} \
@@ -690,40 +695,6 @@ fn parse_deepvariant_version(output: &str) -> anyhow::Result<String> {
         .to_string())
 }
 
-/// Splits a slice of regions into `n` approximately equal chunks.
-///
-/// Each chunk is joined with commas into a single string suitable for
-/// DeepVariant's `--regions` argument.
-///
-/// # Arguments
-///
-/// * `all_regions` - Slice of region strings (e.g., `["chr1", "chr2", ...]`)
-/// * `n` - Target number of chunks
-///
-/// # Returns
-///
-/// A vector of comma-separated region strings. Length may be less than `n`
-/// if there are fewer regions than requested chunks.
-///
-/// # Examples
-///
-/// ```
-/// let regions = ["chr1", "chr2", "chr3", "chr4", "chr5"];
-/// let chunks = split_regions_into_n(&regions, 2);
-/// assert_eq!(chunks, vec!["chr1,chr2,chr3", "chr4,chr5"]);
-/// ```
-fn split_regions_into_n(all_regions: &[&str], n: usize) -> Vec<String> {
-    let len = all_regions.len();
-    if n == 0 || len == 0 {
-        return Vec::new();
-    }
-    let chunk_size = len.div_ceil(n); // ceil
-    all_regions
-        .chunks(chunk_size)
-        .map(|chunk| chunk.join(","))
-        .collect()
-}
-
 /// Run DeepVariant in 4 sbatch jobs (by regions), then merge their PASS VCFs
 /// into the final deepvariant_solo_passed_vcf().
 pub fn run_deepvariant_quartered_sbatch_with_merge(
@@ -731,6 +702,7 @@ pub fn run_deepvariant_quartered_sbatch_with_merge(
     time_point: &str,
     config: &Config,
 ) -> anyhow::Result<Vec<CapturedOutput>> {
+    let n_parts = 4;
     let base = DeepVariant::initialize(id, time_point, config)?;
 
     // if already up-to-date, nothing to do
@@ -739,9 +711,20 @@ pub fn run_deepvariant_quartered_sbatch_with_merge(
         return Ok(Vec::new());
     }
 
+    let bam_path = config.solo_bam(id, time_point);
+
     // 1) split regions into 4 chunks
-    let all_regions: Vec<&str> = base.regions.split(',').collect();
-    let region_chunks = split_regions_into_n(&all_regions, 4);
+    let reader = bam::Reader::from_path(&bam_path)
+        .with_context(|| format!("Failed to open BAM: {bam_path}"))?;
+    let header = bam::Header::from_template(reader.header());
+    let genome_sizes = get_genome_sizes(&header)?;
+
+    // Split genome into regions
+    let region_chunks = split_genome_into_n_regions_exact(&genome_sizes, n_parts)
+        .into_iter()
+        .map(|v| v.join(" "))
+        .collect::<Vec<String>>();
+
     let n_parts = region_chunks.len();
 
     // Build parallel jobs

+ 110 - 4
src/helpers.rs

@@ -794,9 +794,10 @@ impl Drop for TempDirGuard {
     }
 }
 
-
 /// Extracts genome sizes from BAM header.
-pub fn get_genome_sizes(header: &rust_htslib::bam::Header) -> anyhow::Result<FxHashMap<String, u64>> {
+pub fn get_genome_sizes(
+    header: &rust_htslib::bam::Header,
+) -> anyhow::Result<FxHashMap<String, u64>> {
     let mut sizes = FxHashMap::default();
 
     for (_, records) in header.to_hashmap() {
@@ -859,12 +860,91 @@ pub fn split_genome_into_n_regions(
     regions
 }
 
+/// Split genome into exactly `n_parts` chunks with (almost) equal total size.
+/// Each chunk is a *list* of regions `ctg:start-end`. A chunk may span several
+/// contigs, but each region is within a single contig.
+///
+/// The sizes of the parts will differ by at most 1 base.
+pub fn split_genome_into_n_regions_exact(
+    genome_sizes: &FxHashMap<String, u64>,
+    n_parts: usize,
+) -> Vec<Vec<String>> {
+    if n_parts == 0 || genome_sizes.is_empty() {
+        return Vec::new();
+    }
+
+    // Deterministic contig order
+    let mut contigs: Vec<(String, u64)> = genome_sizes
+        .iter()
+        .map(|(ctg, len)| (ctg.clone(), *len))
+        .collect();
+    contigs.sort_by(|a, b| a.0.cmp(&b.0));
+
+    let total_bases: u64 = contigs.iter().map(|(_, len)| *len).sum();
+    if total_bases == 0 {
+        return Vec::new();
+    }
+
+    // We cannot have more non-empty parts than bases if we use 1-based inclusive coordinates.
+    let parts = n_parts.min(total_bases as usize);
+
+    // Distribute bases as evenly as possible:
+    // first `remainder` parts get (base_per_part + 1), the rest get base_per_part.
+    let base_per_part = total_bases / parts as u64;
+    let remainder = total_bases % parts as u64;
+
+    let mut target_sizes: Vec<u64> = (0..parts)
+        .map(|i| {
+            if i < remainder as usize {
+                base_per_part + 1
+            } else {
+                base_per_part
+            }
+        })
+        .collect();
+
+    let mut chunks: Vec<Vec<String>> = Vec::with_capacity(parts);
+
+    let mut part_idx = 0;
+    let mut remaining_in_part = target_sizes[0];
+
+    let mut ctg_iter = contigs.into_iter();
+    while let Some((ctg, mut len)) = ctg_iter.next() {
+        let mut start: u64 = 1;
+
+        while len > 0 && part_idx < parts {
+            let take = len.min(remaining_in_part);
+            let end = start + take - 1;
+
+            if chunks.len() <= part_idx {
+                chunks.push(Vec::new());
+            }
+            chunks[part_idx].push(format!("{ctg}:{start}-{end}"));
+
+            len -= take;
+            start = end + 1;
+            remaining_in_part -= take;
+
+            if remaining_in_part == 0 {
+                part_idx += 1;
+                if part_idx == parts {
+                    break;
+                }
+                remaining_in_part = target_sizes[part_idx];
+            }
+        }
+    }
+
+    // Sanity: we should have exactly `parts` chunks, all non-empty
+    debug_assert_eq!(chunks.len(), parts);
+    chunks
+}
+
 /// Removes a BAM file and its associated index file (.bai).
 pub fn remove_bam_with_index(bam: &Path) -> anyhow::Result<()> {
     // Remove BAM
     if bam.exists() {
-        fs::remove_file(bam)
-            .with_context(|| format!("Failed to remove BAM: {}", bam.display()))?;
+        fs::remove_file(bam).with_context(|| format!("Failed to remove BAM: {}", bam.display()))?;
     }
 
     // Remove index (.bam.bai or .bai)
@@ -880,3 +960,29 @@ pub fn remove_bam_with_index(bam: &Path) -> anyhow::Result<()> {
 
     Ok(())
 }
+
+#[cfg(test)]
+mod tests {
+    use log::info;
+    use rust_htslib::bam::{self, Read};
+
+    use crate::helpers::test_init;
+
+    use super::*;
+
+    #[test]
+    fn split_g() -> anyhow::Result<()> {
+        test_init();
+        let n_parts = 4;
+        let reader = bam::Reader::from_path("/mnt/beegfs02/scratch/t_steimle/data/wgs/34528/norm/34528_norm_hs1.bam")
+            .with_context(|| format!("Failed to open BAM"))?;
+        let header = bam::Header::from_template(reader.header());
+        let genome_sizes = get_genome_sizes(&header)?;
+
+        // Split genome into regions
+        let regions = split_genome_into_n_regions_exact(&genome_sizes, n_parts);
+
+        info!("{:#?}", regions);
+        Ok(())
+    }
+}

Some files were not shown because too many files changed in this diff