|
|
@@ -0,0 +1,281 @@
|
|
|
+use std::{
|
|
|
+ collections::{HashMap, HashSet},
|
|
|
+ path::{Path, PathBuf},
|
|
|
+};
|
|
|
+
|
|
|
+use anyhow::Context;
|
|
|
+use rust_htslib::bam::{self, Read};
|
|
|
+
|
|
|
+use crate::{
|
|
|
+ aligner::minimap::{Minimap2AlignOnt, Minimap2Preset},
|
|
|
+ commands::samtools::{SamtoolsIndex, SamtoolsSort},
|
|
|
+ config::Config,
|
|
|
+ de_novo::{Assembler, Polisher},
|
|
|
+ helpers::TempFileGuard,
|
|
|
+ io::{fasta::split_fasta, fastq::write_fastq},
|
|
|
+ run, run_many,
|
|
|
+ runners::Run,
|
|
|
+};
|
|
|
+
|
|
|
+/// Result for a single assembled contig.
|
|
|
+/// The function returns one of these per contig — caller reasons about them
|
|
|
+/// independently.
|
|
|
+pub struct ContigAssemblyResult {
|
|
|
+ pub contig_name: String,
|
|
|
+ /// Single-contig polished FASTA used as alignment reference.
|
|
|
+ pub contig_fasta: PathBuf,
|
|
|
+ /// Reads realigned to this contig with MM/ML/MN tags from original records.
|
|
|
+ pub aligned_bam: PathBuf,
|
|
|
+ /// Read names excluded because they produced supplementary alignments on
|
|
|
+ /// this contig. Non-empty signals that the input region should be tightened.
|
|
|
+ /// Note: a read suspicious on contig A but absent here was cleanly placed
|
|
|
+ /// on this contig — it likely bridges the two contigs.
|
|
|
+ pub suspicious_reads: Vec<Vec<u8>>,
|
|
|
+ pub contig_ref_bam: PathBuf, // contig → reference genome
|
|
|
+}
|
|
|
+
|
|
|
+// ─── Config ───────────────────────────────────────────────────────────────────
|
|
|
+
|
|
|
+pub struct LocalAssemblyConfig {
|
|
|
+ /// Minimum number of records required before attempting assembly.
|
|
|
+ /// Flye produces fragmented or empty output silently below ~30×.
|
|
|
+ /// Lower for targeted subclonal loci with expected sparse coverage.
|
|
|
+ /// Default: 30.
|
|
|
+ pub min_records: usize,
|
|
|
+ pub case_id: String,
|
|
|
+ pub config: Config,
|
|
|
+}
|
|
|
+
|
|
|
+impl LocalAssemblyConfig {
|
|
|
+ fn from_config(case_id: String, min_records: usize, config: Config) -> Self {
|
|
|
+ Self {
|
|
|
+ min_records,
|
|
|
+ case_id,
|
|
|
+ config,
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// ─── Pipeline orchestrator ────────────────────────────────────────────────────
|
|
|
+
|
|
|
+/// Orchestrate local de novo assembly from a set of primary htslib records.
|
|
|
+///
|
|
|
+/// Returns one `ContigAssemblyResult` per contig emitted by the assembler.
|
|
|
+/// All path dependencies must be set at construction time by the caller.
|
|
|
+///
|
|
|
+/// Steps 1, 5, 6 run locally (pure Rust, Windows-safe).
|
|
|
+/// Steps 2–4 are dispatched via `run!` / `run_many!` through SlurmRunner.
|
|
|
+pub fn run_local_assembly(
|
|
|
+ records: &[bam::Record],
|
|
|
+ mut assembler: impl Assembler,
|
|
|
+ mut polisher: impl Polisher,
|
|
|
+ work_dir: PathBuf,
|
|
|
+ config: LocalAssemblyConfig,
|
|
|
+) -> anyhow::Result<Vec<ContigAssemblyResult>> {
|
|
|
+ if records.len() < config.min_records {
|
|
|
+ anyhow::bail!(
|
|
|
+ "Too few records for local assembly: {} (minimum {})",
|
|
|
+ records.len(),
|
|
|
+ config.min_records
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ // Step 1 — write FASTQ (local)
|
|
|
+ let reads_path = work_dir.join("reads.fastq");
|
|
|
+ write_fastq(records, &reads_path).context("FASTQ write failed")?;
|
|
|
+
|
|
|
+ // Step 2 — assemble (cluster)
|
|
|
+ run!(&config.config, &mut assembler)?;
|
|
|
+
|
|
|
+ // Step 3 — polish (cluster)
|
|
|
+ // draft set at construction to assembler.assembly_fasta()
|
|
|
+ run!(&config.config, &mut polisher)?;
|
|
|
+
|
|
|
+ // Step 4 — split polished FASTA into per-contig files (local)
|
|
|
+ let contigs_dir = work_dir.join("contigs");
|
|
|
+ let contigs =
|
|
|
+ split_fasta(&polisher.polished_fasta(), &contigs_dir).context("FASTA split failed")?;
|
|
|
+
|
|
|
+ if contigs.is_empty() {
|
|
|
+ anyhow::bail!("Assembler produced no contigs");
|
|
|
+ }
|
|
|
+
|
|
|
+ tracing::info!(
|
|
|
+ count = contigs.len(),
|
|
|
+ "Assembly produced {} contig(s)",
|
|
|
+ contigs.len()
|
|
|
+ );
|
|
|
+
|
|
|
+ let tmp_dir = PathBuf::from(config.config.tmp_dir.clone());
|
|
|
+ let mut guard = TempFileGuard::new();
|
|
|
+
|
|
|
+ // Step 5 — realign reads to each contig independently (cluster, sequential)
|
|
|
+ // Asm5(contig_fasta) — no MMI, reference is the single-contig FASTA
|
|
|
+ let mut realigners: Vec<Minimap2AlignOnt> = contigs
|
|
|
+ .iter()
|
|
|
+ .map(|c| {
|
|
|
+ Minimap2AlignOnt::init(
|
|
|
+ &config.case_id,
|
|
|
+ &config.config,
|
|
|
+ reads_path.clone(),
|
|
|
+ guard.tmp_path(".tmp.bam", &tmp_dir),
|
|
|
+ Minimap2Preset::Asm5(c.fasta_path.clone()),
|
|
|
+ )
|
|
|
+ })
|
|
|
+ .collect::<anyhow::Result<Vec<_>>>()?;
|
|
|
+
|
|
|
+ for realigner in &mut realigners {
|
|
|
+ realigner.run()?;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Step 6 — align contigs to reference genome (cluster, sequential)
|
|
|
+ // MapOnt — MMI precomputed inside init if not already present
|
|
|
+ let mut contig_aligners: Vec<Minimap2AlignOnt> = contigs
|
|
|
+ .iter()
|
|
|
+ .map(|c| {
|
|
|
+ Minimap2AlignOnt::init(
|
|
|
+ &config.case_id,
|
|
|
+ &config.config,
|
|
|
+ c.fasta_path.clone(),
|
|
|
+ work_dir.join(format!("{}.ref.bam", c.name)),
|
|
|
+ Minimap2Preset::MapOntRef,
|
|
|
+ )
|
|
|
+ })
|
|
|
+ .collect::<anyhow::Result<Vec<_>>>()?;
|
|
|
+
|
|
|
+ for aligner in &mut contig_aligners {
|
|
|
+ aligner.run()?;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Step 7 — per-contig: supplementary scan + tag transfer (local)
|
|
|
+ let original_index = index_records_by_name(records);
|
|
|
+
|
|
|
+ let results = contigs
|
|
|
+ .into_iter()
|
|
|
+ .zip(realigners.iter())
|
|
|
+ .zip(contig_aligners.iter())
|
|
|
+ .map(|((contig, realigner), contig_aligner)| {
|
|
|
+ let suspicious_qnames = collect_supplementary_qnames(&realigner.final_bam)
|
|
|
+ .with_context(|| {
|
|
|
+ format!("Supplementary scan failed for contig {}", contig.name)
|
|
|
+ })?;
|
|
|
+
|
|
|
+ if !suspicious_qnames.is_empty() {
|
|
|
+ tracing::warn!(
|
|
|
+ contig = %contig.name,
|
|
|
+ count = suspicious_qnames.len(),
|
|
|
+ "Suspicious reads on contig — consider tightening input region"
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ let out_bam = guard.tmp_path(".tmp.bam", &tmp_dir);
|
|
|
+ transfer_methylation_tags(
|
|
|
+ &realigner.final_bam,
|
|
|
+ &original_index,
|
|
|
+ &suspicious_qnames,
|
|
|
+ &out_bam,
|
|
|
+ )
|
|
|
+ .with_context(|| {
|
|
|
+ format!("MM/ML tag transfer failed for contig {}", contig.name)
|
|
|
+ })?;
|
|
|
+
|
|
|
+ let final_bam = work_dir.join(format!("{}.bam", contig.name));
|
|
|
+ let mut sort_job = SamtoolsSort::from_config(&config.config, &out_bam, &final_bam);
|
|
|
+ let _log = run!(&config.config, &mut sort_job)?;
|
|
|
+
|
|
|
+ let mut index_job =
|
|
|
+ SamtoolsIndex::from_config(&config.config, final_bam.to_str().unwrap());
|
|
|
+ let _log = run!(&config.config, &mut index_job)?;
|
|
|
+
|
|
|
+ // contig_aligner.run() already performed sort + index internally
|
|
|
+ Ok(ContigAssemblyResult {
|
|
|
+ contig_name: contig.name,
|
|
|
+ contig_fasta: contig.fasta_path,
|
|
|
+ aligned_bam: final_bam,
|
|
|
+ contig_ref_bam: contig_aligner.final_bam.clone(),
|
|
|
+ suspicious_reads: suspicious_qnames.into_iter().collect(),
|
|
|
+ })
|
|
|
+ })
|
|
|
+ .collect::<anyhow::Result<Vec<_>>>()?;
|
|
|
+
|
|
|
+ guard.cleanup();
|
|
|
+
|
|
|
+ Ok(results)
|
|
|
+}
|
|
|
+
|
|
|
+// ─── Supplementary detection ──────────────────────────────────────────────────
|
|
|
+
|
|
|
+const BAM_FSUPPLEMENTARY: u16 = 0x800;
|
|
|
+
|
|
|
+/// Collect read names that have at least one supplementary alignment in `bam`.
|
|
|
+///
|
|
|
+/// Because the reference is a single contig, any supplementary alignment is
|
|
|
+/// unambiguously suspicious — the read spans a discontinuity within that contig,
|
|
|
+/// indicating the input record selection was too broad.
|
|
|
+fn collect_supplementary_qnames(bam: &Path) -> anyhow::Result<HashSet<Vec<u8>>> {
|
|
|
+ let mut reader = bam::Reader::from_path(bam)
|
|
|
+ .with_context(|| format!("Cannot open BAM: {}", bam.display()))?;
|
|
|
+
|
|
|
+ let mut supp_names = HashSet::new();
|
|
|
+
|
|
|
+ for result in reader.records() {
|
|
|
+ let rec = result.context("Error reading record during supplementary scan")?;
|
|
|
+ if rec.flags() & BAM_FSUPPLEMENTARY != 0 {
|
|
|
+ supp_names.insert(rec.qname().to_vec());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ Ok(supp_names)
|
|
|
+}
|
|
|
+
|
|
|
+// ─── MM / ML tag transfer ─────────────────────────────────────────────────────
|
|
|
+
|
|
|
+/// Tags carrying base modification data.
|
|
|
+/// MN is the sequence length at call time — required for correct MM parsing.
|
|
|
+const METHYLATION_TAGS: &[&[u8]] = &[b"MM", b"ML", b"MN"];
|
|
|
+
|
|
|
+pub fn index_records_by_name(records: &[bam::Record]) -> HashMap<Vec<u8>, &bam::Record> {
|
|
|
+ records.iter().map(|r| (r.qname().to_vec(), r)).collect()
|
|
|
+}
|
|
|
+
|
|
|
+/// Copy MM/ML/MN tags from original records onto the realigned BAM records.
|
|
|
+/// Reads in `suspicious_qnames` are excluded from the output.
|
|
|
+fn transfer_methylation_tags(
|
|
|
+ raw_bam: &Path,
|
|
|
+ original_index: &HashMap<Vec<u8>, &bam::Record>,
|
|
|
+ suspicious_qnames: &HashSet<Vec<u8>>,
|
|
|
+ out_bam: &Path,
|
|
|
+) -> anyhow::Result<()> {
|
|
|
+ let mut reader = bam::Reader::from_path(raw_bam)
|
|
|
+ .with_context(|| format!("Cannot open BAM: {}", raw_bam.display()))?;
|
|
|
+ let header = bam::Header::from_template(reader.header());
|
|
|
+ let mut writer = bam::Writer::from_path(out_bam, &header, bam::Format::Bam)
|
|
|
+ .with_context(|| format!("Cannot create BAM: {}", out_bam.display()))?;
|
|
|
+
|
|
|
+ for result in reader.records() {
|
|
|
+ let mut rec = result.context("Error reading BAM record")?;
|
|
|
+
|
|
|
+ if suspicious_qnames.contains(rec.qname()) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ if let Some(&orig) = original_index.get(rec.qname()) {
|
|
|
+ for tag in METHYLATION_TAGS {
|
|
|
+ rec.remove_aux(tag)?;
|
|
|
+ if let Ok(aux) = orig.aux(tag) {
|
|
|
+ rec.push_aux(tag, aux).with_context(|| {
|
|
|
+ format!(
|
|
|
+ "Failed to push {} tag for read {}",
|
|
|
+ std::str::from_utf8(tag).unwrap_or("??"),
|
|
|
+ std::str::from_utf8(rec.qname()).unwrap_or("??"),
|
|
|
+ )
|
|
|
+ })?;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ writer.write(&rec).context("Error writing BAM record")?;
|
|
|
+ }
|
|
|
+
|
|
|
+ Ok(())
|
|
|
+}
|