|
|
@@ -80,9 +80,7 @@ use crate::{
|
|
|
commands::{
|
|
|
dorado::DoradoAlign,
|
|
|
modkit::ModkitSummary,
|
|
|
- run_many_sbatch,
|
|
|
samtools::{SamtoolsIndex, SamtoolsMergeMany, SamtoolsSort},
|
|
|
- SlurmRunner,
|
|
|
},
|
|
|
config::Config,
|
|
|
helpers::{get_genome_sizes, list_files_recursive, remove_bam_with_index, TempFileGuard},
|
|
|
@@ -312,50 +310,51 @@ impl PromRun {
|
|
|
Ok(prom_run)
|
|
|
}
|
|
|
|
|
|
- /// Validates that all provided BAMs are unaligned.
|
|
|
+ /// Partitions BAMs into unaligned and already-aligned groups.
|
|
|
///
|
|
|
- /// Returns an error if any BAM contains reference sequences in its header.
|
|
|
- fn validate_bams_unaligned(
|
|
|
+ /// Returns (unaligned, already_aligned) BAM lists.
|
|
|
+ fn partition_bams_by_alignment<'a>(
|
|
|
&self,
|
|
|
- pass_bams: &[&PromBam],
|
|
|
- pool: &rayon::ThreadPool,
|
|
|
- ) -> anyhow::Result<()> {
|
|
|
- let aligned_bams: Vec<PathBuf> = pool.install(|| {
|
|
|
- pass_bams
|
|
|
- .par_iter()
|
|
|
- .filter_map(|bam| match bam::Reader::from_path(&bam.path) {
|
|
|
+ pass_bams: &[&'a PromBam],
|
|
|
+ ) -> (Vec<&'a PromBam>, Vec<&'a PromBam>) {
|
|
|
+ let results: Vec<_> = pass_bams
|
|
|
+ .par_iter()
|
|
|
+ .map(|&bam| {
|
|
|
+ let is_aligned = match bam::Reader::from_path(&bam.path) {
|
|
|
Ok(reader) => {
|
|
|
let header = bam::Header::from_template(reader.header());
|
|
|
match get_genome_sizes(&header) {
|
|
|
- Ok(sizes) if !sizes.is_empty() => Some(bam.path.clone()),
|
|
|
- Ok(_) => None,
|
|
|
+ Ok(sizes) => !sizes.is_empty(),
|
|
|
Err(_) => {
|
|
|
warn!(
|
|
|
"Failed to parse header for {}, assuming unaligned",
|
|
|
bam.path.display()
|
|
|
);
|
|
|
- None
|
|
|
+ false
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
Err(e) => {
|
|
|
warn!("Failed to open BAM {}: {}", bam.path.display(), e);
|
|
|
- None
|
|
|
+ false
|
|
|
}
|
|
|
- })
|
|
|
- .collect()
|
|
|
- });
|
|
|
+ };
|
|
|
+ (bam, is_aligned)
|
|
|
+ })
|
|
|
+ .collect();
|
|
|
|
|
|
- if !aligned_bams.is_empty() {
|
|
|
- bail!(
|
|
|
- "Found {} BAM file(s) that appear to be already aligned. \
|
|
|
- This method only processes unaligned BAMs. First aligned BAM: {}",
|
|
|
- aligned_bams.len(),
|
|
|
- aligned_bams[0].display()
|
|
|
- );
|
|
|
+ let mut unaligned = Vec::new();
|
|
|
+ let mut aligned = Vec::new();
|
|
|
+
|
|
|
+ for (bam, is_aligned) in results {
|
|
|
+ if is_aligned {
|
|
|
+ aligned.push(bam);
|
|
|
+ } else {
|
|
|
+ unaligned.push(bam);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- Ok(())
|
|
|
+ (unaligned, aligned)
|
|
|
}
|
|
|
|
|
|
/// Maps BAM files to cases based on kit type (multiplexed vs non-multiplexed).
|
|
|
@@ -528,68 +527,100 @@ impl PromRun {
|
|
|
|
|
|
info!("Filtered to {} BAM files from bam_pass", pass_bams.len());
|
|
|
|
|
|
- let pool = ThreadPoolBuilder::new()
|
|
|
- .num_threads(config.threads.into())
|
|
|
- .build()
|
|
|
- .context("Failed to build thread pool")?;
|
|
|
+ info!("Step 1/5: Validating BAMs are unaligned");
|
|
|
+ let (unaligned_bams, aligned_bams) = self.partition_bams_by_alignment(&pass_bams);
|
|
|
|
|
|
- info!("Step 1/6: Validating BAMs are unaligned");
|
|
|
- self.validate_bams_unaligned(&pass_bams, &pool)?;
|
|
|
+ if !aligned_bams.is_empty() {
|
|
|
+ info!(
|
|
|
+ "Found {} already-aligned BAMs — will skip alignment for these",
|
|
|
+ aligned_bams.len()
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ if unaligned_bams.is_empty() && aligned_bams.is_empty() {
|
|
|
+ bail!("No valid BAM files to process");
|
|
|
+ }
|
|
|
|
|
|
metrics.validation_duration = validation_start.elapsed();
|
|
|
metrics.bams_processed = pass_bams.len();
|
|
|
metrics.bytes_input = pass_bams.iter().map(|b| b.bam_size).sum();
|
|
|
|
|
|
- info!("✅ All {} BAM files are unaligned", pass_bams.len());
|
|
|
+ info!(
|
|
|
+ "✅ {} unaligned, {} already aligned",
|
|
|
+ unaligned_bams.len(),
|
|
|
+ aligned_bams.len()
|
|
|
+ );
|
|
|
|
|
|
// =====================================================================
|
|
|
// Step 3: Map BAMs to cases
|
|
|
// =====================================================================
|
|
|
|
|
|
- info!("Step 2/6: Mapping BAM files to cases");
|
|
|
- let (bam_to_case, unmatched) = self.map_bams_to_cases(&pass_bams, kit_type)?;
|
|
|
+ info!("Step 2/5: Mapping BAM files to cases");
|
|
|
+ // Map unaligned BAMs (need alignment)
|
|
|
+ let (unaligned_bam_to_case, unmatched_unaligned) =
|
|
|
+ self.map_bams_to_cases(&unaligned_bams, kit_type)?;
|
|
|
|
|
|
- if bam_to_case.is_empty() {
|
|
|
- bail!("No BAMs were successfully mapped to cases");
|
|
|
- }
|
|
|
+ // Map aligned BAMs (skip alignment, go straight to sort/merge)
|
|
|
+ let (aligned_bam_to_case, unmatched_aligned) =
|
|
|
+ self.map_bams_to_cases(&aligned_bams, kit_type)?;
|
|
|
+
|
|
|
+ let total_mapped = unaligned_bam_to_case.len() + aligned_bam_to_case.len();
|
|
|
+ let total_unmatched = unmatched_unaligned.len() + unmatched_aligned.len();
|
|
|
|
|
|
- if !unmatched.is_empty() {
|
|
|
- warn!("{} BAMs could not be matched to cases", unmatched.len());
|
|
|
+ if total_mapped == 0 {
|
|
|
+ bail!("No BAMs were successfully mapped to cases");
|
|
|
}
|
|
|
|
|
|
info!(
|
|
|
"✅ Mapped {} BAMs to cases ({} unmatched)",
|
|
|
- bam_to_case.len(),
|
|
|
- unmatched.len()
|
|
|
+ total_mapped, total_unmatched
|
|
|
);
|
|
|
|
|
|
// =====================================================================
|
|
|
// Step 4: Prepare and run alignment
|
|
|
// =====================================================================
|
|
|
|
|
|
- info!("Step 3/6: Preparing alignment jobs");
|
|
|
let align_start = Timer::start();
|
|
|
- let (align_jobs, case_bam_map) = prepare_alignment_jobs(&bam_to_case, &tmp_dir, config)?;
|
|
|
+ let mut case_bam_map: HashMap<String, Vec<PathBuf>> = HashMap::new();
|
|
|
|
|
|
- // Track temp files for cleanup
|
|
|
- for bams in case_bam_map.values() {
|
|
|
- temp_guard.track_many(bams.clone());
|
|
|
+ // Add already-aligned BAMs directly to the map (no alignment needed)
|
|
|
+ for (bam_path, case) in &aligned_bam_to_case {
|
|
|
+ let sample_type_dir = map_sample_type_to_dir(&case.sample_type, config);
|
|
|
+ let key = format!("{}_{}", case.case_id, sample_type_dir);
|
|
|
+ case_bam_map.entry(key).or_default().push(bam_path.clone());
|
|
|
}
|
|
|
|
|
|
- info!("Step 4/6: Running {} alignment jobs", align_jobs.len());
|
|
|
- run_many!(config, align_jobs).context("Alignment batch failed")?;
|
|
|
+ if !unaligned_bam_to_case.is_empty() {
|
|
|
+ let (align_jobs, aligned_case_map) =
|
|
|
+ prepare_alignment_jobs(&unaligned_bam_to_case, &tmp_dir, config)?;
|
|
|
|
|
|
- self.verify_outputs_exist(&case_bam_map, "alignment")?;
|
|
|
+ // Track temp files for cleanup
|
|
|
+ for bams in aligned_case_map.values() {
|
|
|
+ temp_guard.track_many(bams.clone());
|
|
|
+ }
|
|
|
|
|
|
- metrics.alignment_duration = align_start.elapsed();
|
|
|
+ info!("Step 3/5: Running {} alignment jobs", align_jobs.len());
|
|
|
+ run_many!(config, align_jobs).context("Alignment batch failed")?;
|
|
|
+
|
|
|
+ self.verify_outputs_exist(&aligned_case_map, "alignment")?;
|
|
|
+
|
|
|
+ // Merge newly aligned BAMs into the case map
|
|
|
+ for (key, bams) in aligned_case_map {
|
|
|
+ case_bam_map.entry(key).or_default().extend(bams);
|
|
|
+ }
|
|
|
|
|
|
- info!("✅ Alignments completed");
|
|
|
+ info!("✅ Alignments completed");
|
|
|
+ } else {
|
|
|
+ info!("Step 3/5: Skipping alignment — all BAMs already aligned");
|
|
|
+ }
|
|
|
+
|
|
|
+ metrics.alignment_duration = align_start.elapsed();
|
|
|
|
|
|
// =====================================================================
|
|
|
// Step 5: Sort and index chunks
|
|
|
// =====================================================================
|
|
|
|
|
|
- info!("Step 5/6: Sorting and indexing chunks");
|
|
|
+ info!("Step 4/5: Sorting and indexing chunks");
|
|
|
let sort_start = Timer::start();
|
|
|
let sorted_bam_map = sort_and_index_chunks(&case_bam_map, config)?;
|
|
|
|
|
|
@@ -607,7 +638,7 @@ impl PromRun {
|
|
|
// Step 6: Merge and finalize per case
|
|
|
// =====================================================================
|
|
|
|
|
|
- info!("Step 6/6: Finalizing per-case BAMs");
|
|
|
+ info!("Step 5/5: Finalizing per-case BAMs");
|
|
|
let finalize_start = Timer::start();
|
|
|
|
|
|
let finalized = finalize_case_bams(&self.cases, &sorted_bam_map, &tmp_dir, config)?;
|
|
|
@@ -1175,8 +1206,7 @@ fn merge_chunk_bams(
|
|
|
|
|
|
let mut merge_cmd =
|
|
|
SamtoolsMergeMany::from_config(merged_path.clone(), chunk_bams.to_vec(), config);
|
|
|
-
|
|
|
- SlurmRunner::exec(&mut merge_cmd).with_context(|| {
|
|
|
+ run!(config, &mut merge_cmd).with_context(|| {
|
|
|
format!(
|
|
|
"Failed to merge {} chunk BAMs for case {}",
|
|
|
chunk_bams.len(),
|
|
|
@@ -1229,7 +1259,9 @@ fn atomic_replace(source: &Path, destination: &Path) -> anyhow::Result<()> {
|
|
|
}
|
|
|
|
|
|
// Remove backup and its orphaned index
|
|
|
- remove_bam_with_index(&backup).ok();
|
|
|
+ if let Err(e) = remove_bam_with_index(&backup) {
|
|
|
+ log::warn!("Failed to clean up backup file {}: {}", backup.display(), e);
|
|
|
+ }
|
|
|
} else {
|
|
|
fs::rename(source, destination).with_context(|| {
|
|
|
format!(
|
|
|
@@ -1244,12 +1276,9 @@ fn atomic_replace(source: &Path, destination: &Path) -> anyhow::Result<()> {
|
|
|
}
|
|
|
/// Indexes a BAM file using samtools.
|
|
|
fn index_bam(bam: &Path, config: &Config) -> anyhow::Result<()> {
|
|
|
- let mut index_cmd = SamtoolsIndex::from_config(config, &bam.to_string_lossy().to_string());
|
|
|
+ let mut index_cmd = SamtoolsIndex::from_config(config, bam.to_string_lossy().as_ref());
|
|
|
run!(config, &mut index_cmd)?;
|
|
|
|
|
|
- // SlurmRunner::run(&mut index_cmd)?;
|
|
|
- // LocalRunner::run(&mut index_cmd)?;
|
|
|
-
|
|
|
let index_path = PathBuf::from(format!("{}.bai", bam.display()));
|
|
|
if !index_path.exists() {
|
|
|
bail!("Index file not created: {}", index_path.display());
|
|
|
@@ -1294,7 +1323,13 @@ fn merge_into_existing_final(
|
|
|
index_bam(destination, config)
|
|
|
.with_context(|| format!("Failed to index final BAM: {}", destination.display()))?;
|
|
|
|
|
|
- remove_bam_with_index(source).ok();
|
|
|
+ if let Err(e) = remove_bam_with_index(source) {
|
|
|
+ log::warn!(
|
|
|
+ "Failed to clean up merged source {}: {}",
|
|
|
+ source.display(),
|
|
|
+ e
|
|
|
+ );
|
|
|
+ }
|
|
|
|
|
|
Ok(())
|
|
|
}
|
|
|
@@ -1369,7 +1404,7 @@ fn sort_and_index_chunks(
|
|
|
}
|
|
|
|
|
|
info!("Submitting {} sort jobs", sort_jobs.len());
|
|
|
- run_many_sbatch(sort_jobs).context("Sort batch failed")?;
|
|
|
+ run_many!(config, sort_jobs).context("Sort batch failed")?;
|
|
|
|
|
|
// Verify sorted BAM exists.
|
|
|
let missing: Vec<_> = original_to_sorted
|
|
|
@@ -1388,11 +1423,11 @@ fn sort_and_index_chunks(
|
|
|
// Index every sorted BAM.
|
|
|
let index_jobs: Vec<SamtoolsIndex> = original_to_sorted
|
|
|
.values()
|
|
|
- .map(|sorted| SamtoolsIndex::from_config(config, &sorted.to_string_lossy().as_ref()))
|
|
|
+ .map(|sorted| SamtoolsIndex::from_config(config, sorted.to_string_lossy().as_ref()))
|
|
|
.collect();
|
|
|
|
|
|
info!("Submitting {} index jobs", index_jobs.len());
|
|
|
- run_many_sbatch(index_jobs).context("Index batch failed")?;
|
|
|
+ run_many!(config, index_jobs).context("Sort batch failed")?;
|
|
|
|
|
|
// Verify index exists for each BAM.
|
|
|
let missing_indices: Vec<_> = original_to_sorted
|
|
|
@@ -1411,7 +1446,13 @@ fn sort_and_index_chunks(
|
|
|
|
|
|
// Remove input BAMs.
|
|
|
for original in original_to_sorted.keys() {
|
|
|
- fs::remove_file(original).ok();
|
|
|
+ if let Err(e) = fs::remove_file(original) {
|
|
|
+ log::warn!(
|
|
|
+ "Failed to remove unsorted BAM {}: {}",
|
|
|
+ original.display(),
|
|
|
+ e
|
|
|
+ );
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// Construct the new map(case_id) -> BAMs
|
|
|
@@ -1474,7 +1515,9 @@ fn finalize_case_bams(
|
|
|
|
|
|
for chunk in chunk_bams {
|
|
|
if chunk.exists() {
|
|
|
- remove_bam_with_index(chunk).ok();
|
|
|
+ if let Err(e) = remove_bam_with_index(chunk) {
|
|
|
+ log::warn!("Failed to clean up temp chunk {}: {}", chunk.display(), e);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -1515,6 +1558,14 @@ fn normalize_barcode(barcode: &str) -> String {
|
|
|
stripped.to_string()
|
|
|
}
|
|
|
|
|
|
+lazy_static! {
|
|
|
+ static ref BARCODE_PATTERNS: [regex::Regex; 3] = [
|
|
|
+ regex::Regex::new(r"(?i)barcode(\d+)").unwrap(),
|
|
|
+ regex::Regex::new(r"(?i)nb(\d+)").unwrap(),
|
|
|
+ regex::Regex::new(r"(?i)bc(\d+)").unwrap(),
|
|
|
+ ];
|
|
|
+}
|
|
|
+
|
|
|
/// Extract barcode number from various filename patterns.
|
|
|
///
|
|
|
/// Recognizes:
|
|
|
@@ -1522,14 +1573,7 @@ fn normalize_barcode(barcode: &str) -> String {
|
|
|
/// - `*_NB01_*`
|
|
|
/// - `barcode01/`
|
|
|
fn extract_and_normalize_barcode(text: &str) -> Option<String> {
|
|
|
- // Pattern: barcode followed by digits
|
|
|
- let patterns = [
|
|
|
- regex::Regex::new(r"(?i)barcode(\d+)").ok()?,
|
|
|
- regex::Regex::new(r"(?i)nb(\d+)").ok()?,
|
|
|
- regex::Regex::new(r"(?i)bc(\d+)").ok()?,
|
|
|
- ];
|
|
|
-
|
|
|
- for pattern in &patterns {
|
|
|
+ for pattern in BARCODE_PATTERNS.iter() {
|
|
|
if let Some(caps) = pattern.captures(text) {
|
|
|
if let Some(num_str) = caps.get(1) {
|
|
|
if let Ok(num) = num_str.as_str().parse::<u32>() {
|
|
|
@@ -1616,35 +1660,6 @@ mod tests {
|
|
|
|
|
|
use crate::helpers::test_init;
|
|
|
|
|
|
- #[test]
|
|
|
- fn prom_run_bam() -> anyhow::Result<()> {
|
|
|
- test_init();
|
|
|
-
|
|
|
- let bam_file = "/mnt/beegfs02/scratch/t_steimle/test_data/inputs/test_run_A/bam_pass/barcode02/PBI55810_pass_barcode02_22582b29_d02c5bb8_0.bam";
|
|
|
- let bam = PromBam::from_path(bam_file)?;
|
|
|
- info!("{bam}");
|
|
|
-
|
|
|
- let bam_file = "/home/t_steimle/mnt/prom/20251121_001_01_CD/01/20251121_1531_P2I-00461-A_PBI52256_b1dd5673/bam_pass/PBI52256_pass_b1dd5673_414982db_0.bam";
|
|
|
- let bam = PromBam::from_path(bam_file)?;
|
|
|
- info!("{bam}");
|
|
|
- Ok(())
|
|
|
- }
|
|
|
-
|
|
|
- #[test]
|
|
|
- fn prom_run_import() -> anyhow::Result<()> {
|
|
|
- test_init();
|
|
|
- let config = Config::default();
|
|
|
-
|
|
|
- let dir = "/home/t_steimle/beegfs02/prom_runs/20251107_OL_001_A-B/A/20251117_0915_P2I-00461-A_PBI55810_22582b29";
|
|
|
- let prom_run = PromRun::from_dir(dir, &config)?;
|
|
|
- info!("{prom_run}");
|
|
|
-
|
|
|
- let dir = "/mnt/beegfs02/scratch/t_steimle/data/prom/20251121_001_01_CD/01/20251121_1531_P2I-00461-A_PBI52256_b1dd5673";
|
|
|
- let prom_run = PromRun::from_dir(dir, &config)?;
|
|
|
- info!("{prom_run}");
|
|
|
- Ok(())
|
|
|
- }
|
|
|
-
|
|
|
#[test]
|
|
|
fn prom_run_process() -> anyhow::Result<()> {
|
|
|
test_init();
|