Thomas 1 месяц назад
Родитель
Сommit
1c81c5bf58
1 измененных файлов с 82 добавлено и 45 удалено
  1. 82 45
      src/callers/longcallD.rs

+ 82 - 45
src/callers/longcallD.rs

@@ -11,7 +11,7 @@ use uuid::Uuid;
 use crate::{
     commands::{
         bcftools::{BcftoolsConcat, BcftoolsKeepPass},
-        samtools::{SamtoolsIndex, SamtoolsMergeMany},
+        samtools::{SamtoolsIndex, SamtoolsMergeMany, SamtoolsSort},
         Command as JobCommand, LocalBatchRunner, LocalRunner, SbatchRunner, SlurmParams,
         SlurmRunner,
     },
@@ -284,6 +284,85 @@ fn merge_parts(base: &LongcallD, n_parts: usize) -> anyhow::Result<()> {
     Ok(())
 }
 
+fn merge_bam_parts(
+    base: &LongcallD,
+    jobs: &[LongcallD],
+    actual_n_parts: usize,
+) -> anyhow::Result<()> {
+    // Sort each part BAM (raw -> sorted)
+    info!("Sorting {} part BAMs in parallel", actual_n_parts);
+    let sort_jobs: Vec<_> = jobs
+        .iter()
+        .map(|job| {
+            let input = PathBuf::from(job.output_bam_path());
+            let output = PathBuf::from(job.output_bam_path().replace(".bam", ".sorted.bam"));
+            SamtoolsSort::from_config(&job.config, input, output)
+        })
+        .collect();
+    run_many!(&base.config, sort_jobs)?
+        .iter()
+        .enumerate()
+        .try_for_each(|(i, output)| {
+            output
+                .save_to_file(format!("{}/samtools_sort_part{}_", base.log_dir, i + 1))
+                .map(|_| ())
+        })?;
+
+    // Index sorted part BAMs
+    info!("Indexing {} sorted part BAMs in parallel", actual_n_parts);
+    let sorted_bams: Vec<PathBuf> = jobs
+        .iter()
+        .map(|job| PathBuf::from(job.output_bam_path().replace(".bam", ".sorted.bam")))
+        .collect();
+    let index_jobs: Vec<_> = sorted_bams
+        .iter()
+        .map(|bam| SamtoolsIndex::from_config(&base.config, bam.to_str().unwrap()))
+        .collect();
+    run_many!(&base.config, index_jobs)?
+        .iter()
+        .enumerate()
+        .try_for_each(|(i, output)| {
+            output
+                .save_to_file(format!("{}/samtools_index_part{}_", base.log_dir, i + 1))
+                .map(|_| ())
+        })?;
+
+    // Merge sorted part BAMs into final BAM
+    let final_bam = PathBuf::from(
+        base.config
+            .longcalld_solo_bam(&base.case_id, &base.time_point),
+    );
+    info!(
+        "Merging {} sorted part BAMs into {}",
+        actual_n_parts,
+        final_bam.display()
+    );
+    run!(
+        &base.config,
+        &mut SamtoolsMergeMany::from_config(final_bam.clone(), sorted_bams, &base.config)
+    )
+    .context("Failed to merge longcallD part BAMs")?
+    .save_to_file(format!("{}/samtools_merge_", base.log_dir))
+    .map(|_| ())?;
+
+    // Index final merged BAM
+    run!(
+        &base.config,
+        &mut SamtoolsIndex::from_config(&base.config, final_bam.to_str().unwrap())
+    )
+    .context("Failed to index merged longcallD BAM")?
+    .save_to_file(format!("{}/samtools_index_final_", base.log_dir))
+    .map(|_| ())?;
+
+    info!(
+        "Successfully merged {} sorted BAM parts into {}",
+        actual_n_parts,
+        final_bam.display()
+    );
+
+    Ok(())
+}
+
 fn run_chunked(
     case_id: &str,
     time_point: &str,
@@ -349,50 +428,8 @@ fn run_chunked(
     // Merge PASS VCFs
     merge_parts(&base, actual_n_parts)?;
 
-    // Index per-part BAMs before merging
-    info!("Indexing {} part BAMs in parallel", actual_n_parts);
-    let index_jobs: Vec<_> = jobs
-        .iter()
-        .map(|job| SamtoolsIndex::from_config(&job.config, &job.output_bam_path()))
-        .collect();
-    run_many!(config, index_jobs)?
-        .iter()
-        .enumerate()
-        .try_for_each(|(i, output)| {
-            output
-                .save_to_file(format!("{}/samtools_index_part{}_", base.log_dir, i + 1))
-                .map(|_| ())
-        })?;
-
-    // Merge part BAMs into final BAM
-    let final_bam = PathBuf::from(
-        base.config
-            .longcalld_solo_bam(&base.case_id, &base.time_point),
-    );
-    let part_bams: Vec<PathBuf> = jobs
-        .iter()
-        .map(|job| PathBuf::from(job.output_bam_path()))
-        .collect();
-
-    info!(
-        "Merging {} part BAMs into {}",
-        actual_n_parts,
-        final_bam.display()
-    );
-    run!(
-        config,
-        &mut SamtoolsMergeMany::from_config(final_bam.clone(), part_bams, config)
-    )
-    .context("Failed to merge longcallD part BAMs")?
-    .save_to_file(format!("{}/samtools_merge_", base.log_dir))?;
-
-    // Index final merged BAM
-    run!(
-        config,
-        &mut SamtoolsIndex::from_config(config, final_bam.to_str().unwrap())
-    )
-    .context("Failed to index merged longcallD BAM")?
-    .save_to_file(format!("{}/samtools_index_final_", base.log_dir))?;
+    // Sort, index, and merge part BAMs into final BAM
+    merge_bam_parts(&base, &jobs, actual_n_parts)?;
 
     info!(
         "LongcallD completed for {} {}: {} parts merged",