Переглянути джерело

assembler pipe with recursion

Thomas 1 місяць тому
батько
коміт
dcd1aa64f7
7 змінених файлів з 591 додано та 57 видалено
  1. 0 0
      cargo
  2. 66 2
      pandora-config.example.toml
  3. 34 9
      src/aligner/minimap.rs
  4. 6 0
      src/config.rs
  5. 288 39
      src/de_novo/de_novo_pipe.rs
  6. 17 0
      src/de_novo/medaka.rs
  7. 180 7
      src/io/bam.rs

+ 66 - 2
pandora-config.example.toml

@@ -431,11 +431,75 @@ coral_max_seg_gap = 100000
 #######################################
 # Flye
 #######################################
+ 
+# Path to the Flye binary. Can be a python-prefixed call if Flye is not
+# installed as a standalone executable.
 flye_bin = "/usr/bin/python /home/t_steimle/somatic_pipe_tools/Flye/bin/flye"
-
+ 
+# Number of threads allocated to Flye. 8–16 is sufficient for local assembly
+# of a single locus; diminishing returns above 16.
 flye_threads = 12
-
+ 
+# Memory allocated to the Flye SLURM job. 16G is comfortable for local
+# assembly (<1 Mb target). Increase to 32G+ for larger regions.
 flye_slurm_mem = "16G"
+ 
+#######################################
+# Medaka
+#######################################
+ 
+# Name of the conda environment containing medaka.
+# Activated via conda_sh before running medaka_consensus.
+medaka_env = "medaka_env"
+ 
+# Path to the medaka_consensus binary within the conda environment.
+# Usually just "medaka_consensus" if the env is correctly activated.
+medaka_consensus_bin = "medaka_consensus"
+ 
+# Number of threads for medaka. Used for the minimap2 alignment step;
+# the neural network inference step is GPU-bound when a GPU is available.
+medaka_threads = 8
+ 
+# Memory allocated to the Medaka SLURM job. 16G is sufficient for local
+# polishing of a small assembly.
+medaka_slurm_mem = "16G"
+ 
+# Medaka model — MUST match the basecalling chemistry and Dorado version exactly.
+# Using the wrong model silently degrades polishing quality.
+#
+# Model naming: {chemistry}_{flowcell}_{speed}bps_{caller}_{version}
+#   r1041_e82 = R10.4.1 flowcell
+#   400bps    = 400 bps sampling rate (standard; 260bps is legacy)
+#   sup       = Dorado sup basecalling (use hac if basecalled with hac)
+#
+# Current default (medaka tools list_models): r1041_e82_400bps_sup_v5.2.0
+#
+# v5.2.0 also has dwell-time variants for improved homopolymer resolution:
+#   r1041_e82_400bps_sup_v5.2.0_rl_lstm384_dwells    — use if Dorado called with dwell times
+#   r1041_e82_400bps_sup_v5.2.0_rl_lstm384_no_dwells — use if Dorado called without dwell times
+#
+# For R9.4.1 data use r941_min_sup_g507 (MinION) or r941_prom_sup_g507 (PromethION).
+# Run `medaka tools list_models` to list all available models.
+medaka_model = "r1041_e82_400bps_sup_v5.2.0"
+
+#######################################
+# Minimap2
+#######################################
+ 
+# Path to the minimap2 binary. Use a versioned path to ensure reproducibility
+# across pipeline runs — minimap2 output is version-sensitive.
+minimap2_bin = "/home/t_steimle/somatic_pipe_tools/minimap2-2.30_x64-linux/minimap2"
+ 
+# Number of threads for minimap2 alignment. Scales linearly up to ~16;
+# 16 is appropriate for read→reference alignment on a full WGS BAM.
+# For local assembly realignment (few hundred reads) 8 is sufficient.
+minimap2_threads = 16
+ 
+# Memory allocated to the minimap2 SLURM job.
+# 32G is required for read→reference alignment against a human genome
+# (minimap2 loads the MMI index into memory: ~14G for hg38 map-ont).
+# Can be reduced to 8G for contig→contig or local assembly realignment.
+minimap2_slurm_mem = "32G"
 
 #######################################
 # Marlin

+ 34 - 9
src/aligner/minimap.rs

@@ -14,7 +14,7 @@ use crate::{
 
 // ─── Minimap2BuildMmi ─────────────────────────────────────────────────────────
 
-#[derive(Clone)]
+#[derive(Debug, Clone, PartialEq, Eq)]
 pub enum Minimap2Preset {
     /// Raw ONT reads → reference genome.
     /// Reference is taken from config.reference — MMI always precomputed.
@@ -22,6 +22,9 @@ pub enum Minimap2Preset {
     /// Assembled contigs → single-contig FASTA.
     /// Reference is provided explicitly — no MMI.
     Asm5(PathBuf),
+    /// Assembled contigs → reference genome.
+    /// Uses asm5 preset with reference FASTA directly — no MMI.
+    Asm5Reference,
 }
 
 impl Minimap2Preset {
@@ -29,20 +32,24 @@ impl Minimap2Preset {
         match self {
             Minimap2Preset::MapOntRef => "map-ont",
             Minimap2Preset::Asm5(_) => "asm5",
+            Minimap2Preset::Asm5Reference => "asm5",
         }
     }
 }
 
 /// Precompute a minimap2 index (.mmi) for a reference FASTA.
 ///
-/// Only dispatched if the MMI does not already exist — callers should check
-/// `Minimap2BuildMmi::needed()` before constructing and running this job.
+/// Idempotent — if the MMI already exists at the expected path, run() is a no-op.
+/// Dispatched to SLURM when slurm_runner is set, locally otherwise.
 ///
-/// Runs on the cluster (SbatchRunner) when slurm_runner is set in config,
-/// locally otherwise — consistent with all other Pandora jobs.
+/// Used for both map-ont (read alignment) and asm5 (contig alignment) presets.
+/// The MMI path is derived as `{reference}.{preset}.mmi` e.g.:
+///   chm13v2.0.fa → chm13v2.0.map-ont.mmi
+///   chm13v2.0.fa → chm13v2.0.asm5.mmi
 pub struct Minimap2BuildMmi {
     pub reference: PathBuf,
     pub out_mmi: PathBuf,
+    pub preset: &'static str,
     pub minimap2_bin: String,
     pub threads: u8,
     pub slurm_mem: Option<String>,
@@ -50,8 +57,17 @@ pub struct Minimap2BuildMmi {
 }
 
 impl Minimap2BuildMmi {
-    pub fn init(config: &Config, reference: PathBuf) -> Self {
-        let out_mmi = reference.with_extension("map-ont.mmi");
+    pub fn init(config: &Config, reference: PathBuf, preset: &'static str) -> Self {
+        // Derive MMI path: reference.{preset}.mmi
+        // e.g. chm13v2.0.fa → chm13v2.0.map-ont.mmi or chm13v2.0.asm5.mmi
+        let out_mmi = reference.with_extension(format!(
+            "{}.{}.mmi",
+            reference
+                .extension()
+                .and_then(|e| e.to_str())
+                .unwrap_or("fa"),
+            preset
+        ));
 
         let log_dir = format!("{}/log/minimap2_index", config.result_dir);
         let slurm_mem = if config.slurm_runner {
@@ -63,6 +79,7 @@ impl Minimap2BuildMmi {
         Self {
             reference,
             out_mmi,
+            preset,
             minimap2_bin: config.minimap2_bin.clone(),
             threads: config.minimap2_threads,
             slurm_mem,
@@ -85,8 +102,9 @@ impl Minimap2BuildMmi {
 impl JobCommand for Minimap2BuildMmi {
     fn cmd(&self) -> String {
         format!(
-            "{bin} -x map-ont -t {threads} -d {out} {reference}",
+            "{bin} -x {preset} -t {threads} -d {out} {reference}",
             bin = self.minimap2_bin,
+            preset = self.preset,
             threads = self.threads,
             out = self.out_mmi.display(),
             reference = self.reference.display(),
@@ -203,7 +221,14 @@ impl Minimap2AlignOnt {
     ) -> anyhow::Result<Self> {
         let reference = match &preset {
             Minimap2Preset::MapOntRef => {
-                let mut mmi_job = Minimap2BuildMmi::init(config, config.reference.clone().into());
+                let mut mmi_job =
+                    Minimap2BuildMmi::init(config, config.reference.clone().into(), "map-ont");
+                mmi_job.run()?;
+                mmi_job.out_mmi
+            }
+            Minimap2Preset::Asm5Reference => {
+                let mut mmi_job =
+                    Minimap2BuildMmi::init(config, config.reference.clone().into(), "asm5");
                 mmi_job.run()?;
                 mmi_job.out_mmi
             }

+ 6 - 0
src/config.rs

@@ -535,6 +535,12 @@ pub struct Config {
     pub minimap2_bin: String,
     pub minimap2_threads: u8,
     pub minimap2_slurm_mem: String,
+
+    pub medaka_env: String,
+    pub medaka_consensus_bin: String,
+    pub medaka_threads: u8,
+    pub medaka_slurm_mem: String,
+    pub medaka_model: String,
 }
 
 #[derive(Debug, Clone, Serialize, Deserialize)]

+ 288 - 39
src/de_novo/de_novo_pipe.rs

@@ -10,10 +10,14 @@ use crate::{
     aligner::minimap::{Minimap2AlignOnt, Minimap2Preset},
     commands::samtools::{SamtoolsIndex, SamtoolsSort},
     config::Config,
-    de_novo::{Assembler, Polisher},
+    de_novo::{flye::FlyeAssembler, medaka::MedakaPolisher, Assembler, Polisher},
     helpers::TempFileGuard,
-    io::{fasta::split_fasta, fastq::write_fastq},
-    run, run_many,
+    io::{
+        bam::{bam_to_aligned_bed, primary_record},
+        fasta::split_fasta,
+        fastq::write_fastq,
+    },
+    run,
     runners::Run,
 };
 
@@ -26,6 +30,7 @@ pub struct ContigAssemblyResult {
     pub contig_fasta: PathBuf,
     /// Reads realigned to this contig with MM/ML/MN tags from original records.
     pub aligned_bam: PathBuf,
+    pub contig_ref_bed: PathBuf,
     /// Read names excluded because they produced supplementary alignments on
     /// this contig. Non-empty signals that the input region should be tightened.
     /// Note: a read suspicious on contig A but absent here was cleanly placed
@@ -46,16 +51,6 @@ pub struct LocalAssemblyConfig {
     pub config: Config,
 }
 
-impl LocalAssemblyConfig {
-    fn from_config(case_id: String, min_records: usize, config: Config) -> Self {
-        Self {
-            min_records,
-            case_id,
-            config,
-        }
-    }
-}
-
 // ─── Pipeline orchestrator ────────────────────────────────────────────────────
 
 /// Orchestrate local de novo assembly from a set of primary htslib records.
@@ -65,12 +60,15 @@ impl LocalAssemblyConfig {
 ///
 /// Steps 1, 5, 6 run locally (pure Rust, Windows-safe).
 /// Steps 2–4 are dispatched via `run!` / `run_many!` through SlurmRunner.
+/// Run a single round of local assembly.
+/// Accepts boxed assembler and polisher to allow dynamic dispatch
+/// and iterative/recursive invocation with different tools per round.
 pub fn run_local_assembly(
     records: &[bam::Record],
-    mut assembler: impl Assembler,
-    mut polisher: impl Polisher,
+    mut assembler: BoxedAssembler,
+    mut polisher: BoxedPolisher,
     work_dir: PathBuf,
-    config: LocalAssemblyConfig,
+    config: &LocalAssemblyConfig,
 ) -> anyhow::Result<Vec<ContigAssemblyResult>> {
     if records.len() < config.min_records {
         anyhow::bail!(
@@ -85,11 +83,10 @@ pub fn run_local_assembly(
     write_fastq(records, &reads_path).context("FASTQ write failed")?;
 
     // Step 2 — assemble (cluster)
-    run!(&config.config, &mut assembler)?;
+    run!(&config.config, &mut *assembler)?;
 
     // Step 3 — polish (cluster)
-    // draft set at construction to assembler.assembly_fasta()
-    run!(&config.config, &mut polisher)?;
+    run!(&config.config, &mut *polisher)?;
 
     // Step 4 — split polished FASTA into per-contig files (local)
     let contigs_dir = work_dir.join("contigs");
@@ -110,7 +107,6 @@ pub fn run_local_assembly(
     let mut guard = TempFileGuard::new();
 
     // Step 5 — realign reads to each contig independently (cluster, sequential)
-    // Asm5(contig_fasta) — no MMI, reference is the single-contig FASTA
     let mut realigners: Vec<Minimap2AlignOnt> = contigs
         .iter()
         .map(|c| {
@@ -129,7 +125,6 @@ pub fn run_local_assembly(
     }
 
     // Step 6 — align contigs to reference genome (cluster, sequential)
-    // MapOnt — MMI precomputed inside init if not already present
     let mut contig_aligners: Vec<Minimap2AlignOnt> = contigs
         .iter()
         .map(|c| {
@@ -137,8 +132,8 @@ pub fn run_local_assembly(
                 &config.case_id,
                 &config.config,
                 c.fasta_path.clone(),
-                work_dir.join(format!("{}.ref.bam", c.name)),
-                Minimap2Preset::MapOntRef,
+                work_dir.join(format!("{}_{}.bam", c.name, config.config.reference_name)),
+                Minimap2Preset::Asm5Reference,
             )
         })
         .collect::<anyhow::Result<Vec<_>>>()?;
@@ -156,15 +151,13 @@ pub fn run_local_assembly(
         .zip(contig_aligners.iter())
         .map(|((contig, realigner), contig_aligner)| {
             let suspicious_qnames = collect_supplementary_qnames(&realigner.final_bam)
-                .with_context(|| {
-                    format!("Supplementary scan failed for contig {}", contig.name)
-                })?;
+                .with_context(|| format!("Supplementary scan failed for contig {}", contig.name))?;
 
             if !suspicious_qnames.is_empty() {
-                tracing::warn!(
-                    contig = %contig.name,
-                    count  = suspicious_qnames.len(),
-                    "Suspicious reads on contig — consider tightening input region"
+                log::warn!(
+                    "Suspicious reads on contig — consider tightening input region | contig={} count={}",
+                    contig.name,
+                    suspicious_qnames.len(),
                 );
             }
 
@@ -175,9 +168,7 @@ pub fn run_local_assembly(
                 &suspicious_qnames,
                 &out_bam,
             )
-            .with_context(|| {
-                format!("MM/ML tag transfer failed for contig {}", contig.name)
-            })?;
+            .with_context(|| format!("MM/ML tag transfer failed for contig {}", contig.name))?;
 
             let final_bam = work_dir.join(format!("{}.bam", contig.name));
             let mut sort_job = SamtoolsSort::from_config(&config.config, &out_bam, &final_bam);
@@ -187,22 +178,168 @@ pub fn run_local_assembly(
                 SamtoolsIndex::from_config(&config.config, final_bam.to_str().unwrap());
             let _log = run!(&config.config, &mut index_job)?;
 
-            // contig_aligner.run() already performed sort + index internally
+            // Generate BED from contig→reference alignment (local, pure Rust)
+            let contig_ref_bed = work_dir.join(format!("{}_{}.bed", contig.name, config.config.reference_name));
+            bam_to_aligned_bed(&contig_aligner.final_bam, &contig_ref_bed)
+                .with_context(|| format!("BED generation failed for contig {}", contig.name))?;
+
             Ok(ContigAssemblyResult {
-                contig_name:      contig.name,
-                contig_fasta:     contig.fasta_path,
-                aligned_bam:      final_bam,
-                contig_ref_bam:   contig_aligner.final_bam.clone(),
+                contig_name: contig.name,
+                contig_fasta: contig.fasta_path,
+                aligned_bam: final_bam,
+                contig_ref_bam: contig_aligner.final_bam.clone(),
+                contig_ref_bed,
                 suspicious_reads: suspicious_qnames.into_iter().collect(),
             })
         })
         .collect::<anyhow::Result<Vec<_>>>()?;
 
     guard.cleanup();
+    Ok(results)
+}
+
+// ─── Iterative wrapper ────────────────────────────────────────────────────────
+
+/// Run local assembly iteratively, extending the record set with suspicious
+/// reads at each round until convergence or max_rounds is reached.
+///
+/// Convergence = no suspicious reads across all contigs in a round.
+/// Each round outputs to `work_dir/round_{n}/`.
+pub fn run_local_assembly_iterative(
+    bam_path: &std::path::Path,
+    records: Vec<bam::Record>,
+    builder: &dyn LocalAssemblyBuilder,
+    work_dir: PathBuf,
+    config: LocalAssemblyConfig,
+    max_rounds: u8,
+) -> anyhow::Result<Vec<ContigAssemblyResult>> {
+    let mut current_records = records;
+    let mut results = Vec::new();
+
+    for round in 0..max_rounds {
+        let round_dir = work_dir.join(format!("round_{}", round));
+        std::fs::create_dir_all(&round_dir)?;
+
+        tracing::info!(
+            round,
+            records = current_records.len(),
+            "Starting assembly round"
+        );
+
+        let reads_path = round_dir.join("reads.fastq");
+        let (assembler, polisher) = builder.build(reads_path, &round_dir);
+
+        results = run_local_assembly(
+            &current_records,
+            assembler,
+            polisher,
+            round_dir.clone(),
+            &config,
+        )?;
+
+        // Collect suspicious qnames across all contigs
+        let suspicious_qnames: HashSet<Vec<u8>> = results
+            .iter()
+            .flat_map(|r| r.suspicious_reads.iter().cloned())
+            .collect();
+
+        if suspicious_qnames.is_empty() {
+            tracing::info!(round, "No suspicious reads — assembly converged");
+            break;
+        }
+
+        tracing::info!(
+            round,
+            count = suspicious_qnames.len(),
+            "Suspicious reads found — extending record set for next round"
+        );
+
+        // Extend current records, deduplicating by qname
+        let seen: HashSet<Vec<u8>> = current_records.iter().map(|r| r.qname().to_vec()).collect();
+
+        // Fetch suspicious reads using SA tag positions — no full BAM scan
+        let mut bam_reader = bam::IndexedReader::from_path(bam_path)
+            .context("Cannot open BAM for suspicious read resolution")?;
+
+        let added: Vec<bam::Record> = results
+            .iter()
+            .flat_map(|r| r.suspicious_reads.iter())
+            .filter_map(|qname| {
+                // Find the suspicious record in the realigned BAM to get its SA tag
+                results.iter().find_map(|r| {
+                    let mut reader = bam::Reader::from_path(&r.aligned_bam).ok()?;
+                    reader.records().find_map(|rec| {
+                        let rec = rec.ok()?;
+                        if rec.qname() == qname.as_slice() && rec.flags() & BAM_FSUPPLEMENTARY != 0
+                        {
+                            Some(rec)
+                        } else {
+                            None
+                        }
+                    })
+                })
+            })
+            .filter_map(|supp_rec| {
+                // Resolve primary from original BAM using SA tag
+                let resolved = primary_record(&mut bam_reader, supp_rec);
+                let qname = resolved.qname().to_vec();
+                if !seen.contains(&qname) {
+                    Some(resolved)
+                } else {
+                    None
+                }
+            })
+            .collect();
+
+        tracing::info!(
+            round,
+            added = added.len(),
+            total = current_records.len() + added.len(),
+            "Extended record set"
+        );
+
+        current_records.extend(added);
+
+        if round == max_rounds - 1 {
+            tracing::warn!(max_rounds, "Maximum rounds reached without convergence");
+        }
+    }
 
     Ok(results)
 }
 
+/// Constructs a FlyeAssembler + MedakaPolisher for each assembly round.
+pub struct FlyeMedakaBuilder {
+    pub config: Config,
+}
+
+pub type BoxedAssembler = Box<dyn Assembler>;
+pub type BoxedPolisher = Box<dyn Polisher>;
+
+pub trait LocalAssemblyBuilder {
+    fn build(&self, reads_path: PathBuf, round_dir: &Path) -> (BoxedAssembler, BoxedPolisher);
+}
+
+impl LocalAssemblyBuilder for FlyeMedakaBuilder {
+    fn build(&self, reads_path: PathBuf, round_dir: &Path) -> (BoxedAssembler, BoxedPolisher) {
+        let assembler = FlyeAssembler::from_config(
+            &self.config,
+            reads_path.clone(),
+            round_dir.join("flye"),
+            "10k".to_string(),
+        );
+
+        let polisher = MedakaPolisher::from_config(
+            &self.config,
+            reads_path,
+            assembler.assembly_fasta(),
+            round_dir.join("medaka"),
+        );
+
+        (Box::new(assembler), Box::new(polisher))
+    }
+}
+
 // ─── Supplementary detection ──────────────────────────────────────────────────
 
 const BAM_FSUPPLEMENTARY: u16 = 0x800;
@@ -261,7 +398,7 @@ fn transfer_methylation_tags(
 
         if let Some(&orig) = original_index.get(rec.qname()) {
             for tag in METHYLATION_TAGS {
-                rec.remove_aux(tag)?;
+                let _ = rec.remove_aux(tag);
                 if let Ok(aux) = orig.aux(tag) {
                     rec.push_aux(tag, aux).with_context(|| {
                         format!(
@@ -279,3 +416,115 @@ fn transfer_methylation_tags(
 
     Ok(())
 }
+
+#[cfg(test)]
+mod tests {
+    use std::path::PathBuf;
+
+    use env_logger::init;
+    use log::info;
+
+    use crate::io::bam::fetch_primary_records;
+
+    use super::*;
+
+    /// Integration test: fetch records from the NUP214 locus on chr9 and run
+    /// the full local assembly pipeline.
+    ///
+    /// ABL1 locus — chr9:142,958,315-142,958,913
+    ///
+    /// Requires:
+    /// - A valid indexed BAM at the path set in `bam_path`
+    /// - A valid Pandora `Config` with minimap2, flye, medaka binaries configured
+    /// - Cluster access (or slurm_runner = false for local execution)
+    #[test]
+    fn test_local_assembly() -> anyhow::Result<()> {
+        init();
+
+        let id = "CML2518";
+        let abl_locus = ("chr9", 142_958_315, 142_958_913);
+
+        let config = Config::default();
+
+        let bam_path = PathBuf::from(config.tumoral_bam(id));
+        let work_dir = PathBuf::from(format!(
+            "{}/{id}/{}/asm",
+            config.result_dir, config.tumoral_name
+        ));
+        if work_dir.exists() {
+            std::fs::remove_dir_all(&work_dir)?;
+        }
+        std::fs::create_dir_all(&work_dir)?;
+
+        let assembly_config = LocalAssemblyConfig {
+            min_records: 10,
+            case_id: id.to_string(),
+            config: config.clone(),
+        };
+
+        // Fetch initial primary records from locus
+        let records = fetch_primary_records(&bam_path, Some(abl_locus), None)?;
+
+        info!("Fetched {} primary records from locus", records.len());
+
+        assert!(
+            records.len() >= assembly_config.min_records,
+            "Too few records at locus: {} (need at least {})",
+            records.len(),
+            assembly_config.min_records
+        );
+
+        // Run iterative assembly — extends with suspicious reads each round
+        let results = run_local_assembly_iterative(
+            &bam_path,
+            records,
+            &FlyeMedakaBuilder {
+                config: config.clone(),
+            },
+            work_dir.clone(),
+            assembly_config,
+            3,
+        )?;
+
+        // Assertions
+        assert!(!results.is_empty(), "No contigs assembled");
+
+        info!("Assembled {} contig(s)", results.len());
+
+        for result in &results {
+            info!(
+                "Contig result | contig={} bam={} ref_bam={} suspicious={}",
+                result.contig_name,
+                result.aligned_bam.display(),
+                result.contig_ref_bam.display(),
+                result.suspicious_reads.len(),
+            );
+
+            assert!(
+                result.aligned_bam.exists(),
+                "aligned_bam missing for contig {}",
+                result.contig_name
+            );
+            assert!(
+                result.contig_ref_bam.exists(),
+                "contig_ref_bam missing for contig {}",
+                result.contig_name
+            );
+            assert!(
+                result.contig_fasta.exists(),
+                "contig_fasta missing for contig {}",
+                result.contig_name
+            );
+
+            if !result.suspicious_reads.is_empty() {
+                log::warn!(
+                    "Suspicious reads remain after final round | contig={} count={}",
+                    result.contig_name,
+                    result.suspicious_reads.len(),
+                );
+            }
+        }
+
+        Ok(())
+    }
+}

+ 17 - 0
src/de_novo/medaka.rs

@@ -2,6 +2,7 @@ use std::path::{Path, PathBuf};
 
 use crate::{
     commands::{Command as JobCommand, LocalRunner, SbatchRunner, SlurmParams, SlurmRunner},
+    config::Config,
     de_novo::Polisher,
 };
 
@@ -18,6 +19,22 @@ pub struct MedakaPolisher {
     pub medaka_consensus_bin: String,
 }
 
+impl MedakaPolisher {
+    pub fn from_config(config: &Config, reads: PathBuf, draft: PathBuf, out_dir: PathBuf) -> Self {
+        Self {
+            reads,
+            draft,
+            out_dir,
+            model: config.medaka_model.clone(),
+            threads: config.medaka_threads.into(),
+            conda_sh: config.conda_sh.clone(),
+            medaka_env: config.medaka_env.clone(),
+            medaka_consensus_bin: config.medaka_consensus_bin.clone(),
+            slurm_mem: config.medaka_slurm_mem.clone(),
+        }
+    }
+}
+
 impl JobCommand for MedakaPolisher {
     fn cmd(&self) -> String {
         format!(

+ 180 - 7
src/io/bam.rs

@@ -1,15 +1,16 @@
-use std::collections::{HashMap, HashSet};
+use std::io::Write;
+use std::{
+    collections::{HashMap, HashSet}, fs::File, path::Path
+};
 
 use anyhow::Context;
 use log::{info, warn};
-use rust_htslib::bam::{self, record::Aux, record::Cigar, IndexedReader, Read, Record};
-
-use crate::{
-    commands::samtools::{SamtoolsIndex, SamtoolsReheader},
-    config::Config,
-    runners::Run,
+use rust_htslib::bam::{
+    self, FetchDefinition, IndexedReader, Read, Record, ext::BamRecordExtensions, record::{Aux, Cigar}
 };
 
+use crate::{commands::samtools::SamtoolsReheader, config::Config, runners::Run};
+
 /// Parses an SA tag string and extracts chromosome and position information.
 ///
 /// # Arguments
@@ -855,6 +856,178 @@ pub fn read_sm_tag_or_inject(
         .context(format!("No @RG lines found in {bam_path}"))
 }
 
+/// Fetch primary records from a BAM file, with optional region and qname filtering.
+///
+/// # Arguments
+/// * `bam_path`     - Path to indexed BAM file
+/// * `region`       - Optional region to fetch (chr, start, end). None = whole file.
+/// * `qname_filter` - Optional qname filter. None = no filter.
+///
+/// # Notes
+/// - Supplementary alignments are resolved to their primary via SA tag using `primary_records`
+/// - White/blacklist are mutually exclusive — passing both is a logic error and will panic in debug
+pub fn fetch_primary_records(
+    bam_path: &Path,
+    region: Option<(&str, i64, i64)>,
+    qname_filter: Option<QnameFilter>,
+) -> anyhow::Result<Vec<bam::Record>> {
+    debug_assert!(
+        !matches!(
+            &qname_filter,
+            Some(QnameFilter::Whitelist(_)) | Some(QnameFilter::Blacklist(_))
+        ) || qname_filter.is_some(),
+        "Cannot pass both whitelist and blacklist"
+    );
+
+    let mut reader = bam::IndexedReader::from_path(bam_path)
+        .with_context(|| format!("Cannot open BAM: {}", bam_path.display()))?;
+
+    if let Some((chr, start, end)) = region {
+        reader
+            .fetch((chr, start, end))
+            .with_context(|| format!("Fetch failed for {}:{}-{}", chr, start, end))?;
+    } else {
+        reader
+            .fetch(FetchDefinition::All)
+            .context("Fetch all failed")?;
+    }
+
+    let records: Vec<bam::Record> = reader
+        .records()
+        .filter_map(|r| r.ok())
+        .filter(|r| !r.is_secondary())
+        .collect();
+
+    // Resolve supplementary → primary via SA tag
+    let records = primary_records(&mut reader, records);
+
+    // Deduplicate by qname (primary_records may introduce duplicates
+    // if a primary was already in the fetch region)
+    let mut seen = HashSet::new();
+    let mut records: Vec<bam::Record> = records
+        .into_iter()
+        .filter(|r| seen.insert(r.qname().to_vec()))
+        .collect();
+
+    // Apply qname filter
+    if let Some(filter) = qname_filter {
+        match filter {
+            QnameFilter::Whitelist(names) => {
+                records.retain(|r| names.contains(r.qname()));
+            }
+            QnameFilter::Blacklist(names) => {
+                records.retain(|r| !names.contains(r.qname()));
+            }
+        }
+    }
+
+    Ok(records)
+}
+
+pub enum QnameFilter {
+    Whitelist(HashSet<Vec<u8>>),
+    Blacklist(HashSet<Vec<u8>>),
+}
+
+/// Convert an input BAM/CRAM file into a BED file containing one interval per
+/// aligned record.
+///
+/// Each output line corresponds to a single aligned read and has the form:
+///
+/// ```text
+/// chrom    start    end    read_name
+/// ```
+///
+/// # Behavior
+///
+/// - Unmapped records are skipped.
+/// - Chromosome names are taken from the BAM header.
+/// - Coordinates follow BED conventions:
+///   - `start` is **0-based inclusive**
+///   - `end` is **0-based exclusive**
+/// - The interval `[start, end)` is computed using:
+///   - [`bam::Record::pos`] for the start
+///   - [`bam::Record::reference_end`] for the end
+/// - The end position accounts for all CIGAR operations consuming reference
+///   (`M`, `D`, `N`, `=`, `X`).
+///
+/// # Arguments
+///
+/// * `input_bam` - Path to input BAM or CRAM file
+/// * `output_bed` - Path to output BED file
+///
+/// # Errors
+///
+/// Returns an error if:
+/// - The input file cannot be opened or read
+/// - The output file cannot be created or written
+/// - UTF-8 conversion of reference names or read names fails
+///
+/// # Example
+///
+/// ```no_run
+/// use std::path::Path;
+///
+/// fn main() -> anyhow::Result<()> {
+///     bam_to_aligned_bed("input.bam", "output.bed")?;
+///     Ok(())
+/// }
+/// ```
+///
+/// # Notes
+///
+/// - Secondary, supplementary, and duplicate reads are **not filtered**.
+///   Add additional flags if stricter filtering is required.
+/// - For paired-end reads, each alignment record is emitted independently.
+///
+/// # See also
+///
+/// - [`rust_htslib::bam::Reader`]
+/// - [`rust_htslib::bam::Record`]
+pub fn bam_to_aligned_bed<P: AsRef<Path>>(
+    input_bam: P,
+    output_bed: P,
+) -> anyhow::Result<()> {
+    let mut bam = bam::Reader::from_path(input_bam)?;
+    let header = bam.header().to_owned();
+
+    let out = File::create(output_bed)?;
+    let mut writer = std::io::BufWriter::new(out);
+
+    for result in bam.records() {
+        let record = result?;
+
+        if record.is_unmapped() {
+            continue;
+        }
+
+        let tid = record.tid();
+        if tid < 0 {
+            continue;
+        }
+
+        let chrom = std::str::from_utf8(header.tid2name(tid as u32))?;
+
+        // BAM positions are already 0-based.
+        let start = record.pos();
+
+        // reference_end() accounts for CIGAR operations consuming reference:
+        // M, D, N, =, X
+        let end = record.reference_end();
+
+        if end > start {
+            let qname = std::str::from_utf8(record.qname())?;
+
+            writeln!(
+                writer,
+                "{chrom}\t{start}\t{end}\t{qname}"
+            )?;
+        }
+    }
+
+    Ok(())
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;