use std::{ fs, io::{Read, Write}, path::{Path, PathBuf}, time::SystemTime, }; use duct::cmd; use log::{info, warn}; use uuid::Uuid; use crate::{config::Config, pod5::FlowCellCase}; pub trait Run { fn run(self) -> anyhow::Result<()>; } #[derive(Debug, Clone)] pub struct DoradoParams { pub ref_fa: String, pub ref_mmi: String, pub name: String, pub time: String, pub pod_dir: String, pub samtools_view_threads: u16, pub samtools_sort_threads: u16, } pub struct Dorado { config: Config, case: FlowCellCase, case_dir: String, time_dir: String, bam: String, start_time: SystemTime, end_time: SystemTime, is_done: bool, log: Vec, } impl Dorado { pub fn init(case: FlowCellCase, config: Config) -> anyhow::Result { let data_dir = "/data/longreads_basic_pipe"; let case_dir = format!("{}/{}", data_dir, case.id); let time_dir = format!("{}/{}", case_dir, case.time_point); let bam = format!("{}/{}_{}_hs1.bam", time_dir, case.id, case.time_point); Ok(Self { config, start_time: SystemTime::now(), end_time: SystemTime::now(), is_done: false, log: Vec::new(), case_dir, time_dir, bam, case, }) } fn create_reference_mmi(&self) -> anyhow::Result<()> { if !std::path::Path::new(&self.config.align.ref_mmi).exists() { cmd!( "minimap2", "-x", "map-ont", "-d", &self.config.align.ref_mmi, &self.config.align.ref_fa ) .run()?; } Ok(()) } fn create_directories(&self) -> anyhow::Result<()> { if !std::path::Path::new(&self.case_dir).exists() { fs::create_dir(&self.case_dir)?; } if !std::path::Path::new(&self.time_dir).exists() { fs::create_dir(&self.time_dir)?; } Ok(()) } fn basecall_align(&mut self, dorado_bin: &str) -> anyhow::Result<()> { let pod_dir = &self.case.pod_dir.display(); let ref_mmi = &self.config.align.ref_mmi; let bam = &self.bam; let samtools_view_threads = self.config.align.samtools_view_threads; let samtools_sort_threads = self.config.align.samtools_sort_threads; let dorado = format!( "{dorado_bin} basecaller sup,5mC_5hmC {pod_dir} --trim all --reference {ref_mmi}" ); let samtools_view = format!("samtools view -h -@ {samtools_view_threads} -b /dev/stdin"); let samtools_sort = format!("samtools sort -@ {samtools_sort_threads} /dev/stdin -o {bam}"); let pipe = format!("{dorado} | {samtools_view} | {samtools_sort}"); info!("Running: {pipe}"); let pipe_cmd = cmd!("bash", "-c", pipe); let mut reader = pipe_cmd.stdout_capture().reader()?; let mut buffer = [0; 1]; let mut line = String::new(); loop { match reader.read(&mut buffer) { Ok(0) => break, // End of output Ok(_) => { let char = buffer[0] as char; eprint!("{}", char); std::io::stderr().flush()?; if char == '\n' { // Send the complete line self.log.push(line.clone()); line.clear(); } else { line.push(char); } } Err(err) => { warn!("Error reading from stderr: {}", err); break; } } } Ok(()) } pub fn index(&self) -> anyhow::Result<()> { let t = self.config.align.samtools_view_threads.to_string(); let cmd = format!("index -@ {t} {}", &self.bam); info!("Running samtools {cmd}"); cmd!("samtools", "index", "-@", &t, &self.bam).run()?; Ok(()) } fn run_cramino(&self) -> anyhow::Result<()> { let cramino_out = format!( "{}/{}_{}_hs1_cramino.txt", self.time_dir, self.case.id, self.case.time_point ); // if !Path::new(&cramino_out).exists() { info!("Quality control of BAM: {}", self.bam); let output = duct::cmd!( "cramino", "-t", "150", "--hist", "--checksum", "--karyotype", &self.bam ) .stdout_capture() .unchecked() .run()?; fs::write(cramino_out, output.stdout)?; // } Ok(()) } fn run_modkit(&self) -> anyhow::Result<()> { let mod_summary = format!( "{}/{}_{}_5mC_5hmC_summary.txt", self.time_dir, self.case.id, self.case.time_point ); // if !Path::new(&mod_summary).exists() { info!("Generating base modification summary for BAM: {}", self.bam); let output = cmd!("modkit", "summary", "-t", "50", &self.bam) .stdout_capture() .unchecked() .run()?; fs::write(mod_summary, output.stdout)?; // } Ok(()) } fn create_fastq(&self) -> anyhow::Result<()> { let bam = &self.bam; let fastq = format!( "{}/{}/{}/{}_{}.fastq.gz", self.case_dir, self.case.id, self.case.time_point, self.case.id, self.case.time_point ); if !std::path::Path::new(&fastq).exists() { let samtools = format!("samtools fastq -@ 150 {bam}"); let crabz = format!("crabz -f bgzf - -o {fastq}"); let pipe = format!("{samtools} | {crabz}"); info!("Running: {pipe}"); let pipe_cmd = duct::cmd!("bash", "-c", pipe); pipe_cmd.run()?; } Ok(()) } fn merge_bam(&self, bam: &Path) -> anyhow::Result<()> { let into = PathBuf::from(&self.bam); let dir = into.parent().unwrap(); let original_file = into.file_name().unwrap().to_string_lossy().to_string(); let original_i = dir.join(format!("{original_file}.bai")); if !original_i.exists() { self.index()?; } let tmp_original_file = format!("{}.bam", Uuid::new_v4()); let tmp_original = dir.join(tmp_original_file.clone()); let tmp_original_i = dir.join(format!("{tmp_original_file}.bai")); info!("Moving {} to {}", &into.display(), &tmp_original.display()); fs::rename(&into, &tmp_original)?; info!( "Moving {} to {}", &original_i.display(), &tmp_original_i.display() ); fs::rename(original_i, tmp_original_i.clone())?; let cmd = format!( "samtools merge -@ 160 -h {} {} {} {}", bam.display(), into.display(), bam.display(), tmp_original.display() ); info!("Running {cmd}"); cmd!( "samtools", "merge", "-@", "160", "-h", bam, into, bam, tmp_original.clone() ) .run()?; fs::remove_file(tmp_original)?; fs::remove_file(tmp_original_i)?; self.index()?; Ok(()) } pub fn from_mux(cases: Vec, config: Config) -> anyhow::Result<()> { // Creating a temporary directory let tmp_dir = format!("{}/.{}", config.result_dir, Uuid::new_v4()); info!("Creating tmp dir {tmp_dir}"); fs::create_dir(&tmp_dir)?; // Dorado base calling and align into a temporary bam file let muxed_bam = format!("{tmp_dir}/muxed.bam"); let dorado_bin = &config.align.dorado_bin; let dorado_arg = &config.align.dorado_basecall_arg; let pod_dir = cases[0].pod_dir.display(); let ref_mmi = &config.align.ref_mmi; let samtools_view_threads = config.align.samtools_view_threads; let dorado = format!( "{dorado_bin} basecaller {dorado_arg} {pod_dir} --trim all --reference {ref_mmi}" ); let samtools_view = format!("samtools view -h -@ {samtools_view_threads} -b -o {muxed_bam}"); let pipe = format!("{dorado} | {samtools_view}"); info!("Running: {pipe}"); let pipe_cmd = cmd!("bash", "-c", pipe); pipe_cmd.run()?; info!("Basecalling ✅"); // Demux the temporary bam file // Get the sequencing kit from the first pod5 file let muxed_pod_dir = &cases.first().unwrap().pod_dir; let pod_path = fs::read_dir(muxed_pod_dir)? .filter_map(|p| p.ok()) .map(|p| p.path()) .filter(|p| p.extension().unwrap() == "pod5") .take(1) .collect::>() .pop() .unwrap(); let sequencing_kit = pandora_lib_pod5::Pod5Info::from_pod5(pod_path.to_str().unwrap()) .sequencing_kit .to_uppercase(); let tmp_demux_dir = format!("{tmp_dir}/demuxed"); fs::create_dir(&tmp_demux_dir)?; info!("Demux from {sequencing_kit} into {tmp_demux_dir}",); duct::cmd!( &config.align.dorado_bin, "demux", "--output-dir", &tmp_demux_dir, "--kit-name", &sequencing_kit, &tmp_dir, ) .run()?; info!("Demux ✅"); for case in cases.iter() { let bam = format!( "{tmp_demux_dir}/{sequencing_kit}_barcode{}.bam", case.barcode ); // Trim let trimmed_bam = format!( "{tmp_demux_dir}/{sequencing_kit}_barcode{}_trimmed.bam", case.barcode ); let pipe = format!( "{} trim {bam} | samtools view -h -@ {} -b /dev/stdin -o {trimmed_bam}", config.align.dorado_bin, &config.align.samtools_view_threads ); cmd!("bash", "-c", pipe).run()?; // Align let aligned_bam = format!( "{tmp_demux_dir}/{sequencing_kit}_barcode{}_aligned.bam", case.barcode ); let dorado = format!( "{} aligner --threads 160 {} {trimmed_bam}", config.align.dorado_bin, config.align.ref_fa, ); let samtools_view = format!( "samtools view -h -@ {} -b /dev/stdin", &config.align.samtools_view_threads ); let samtools_sort = format!( "samtools sort -@ {} /dev/stdin -o {aligned_bam}", &config.align.samtools_sort_threads ); let pipe = format!("{dorado} | {samtools_view} | {samtools_sort}"); info!("Running {pipe}"); cmd!("bash", "-c", pipe).run()?; let d = Dorado::init(case.clone(), config.clone())?; d.create_directories()?; if PathBuf::from(&d.bam).exists() { info!("merge"); d.merge_bam(&PathBuf::from(aligned_bam))?; } else { info!("Moving from {} to {}", bam, d.bam); fs::rename(aligned_bam, d.bam.clone())?; d.index()?; } d.run_cramino()?; d.run_modkit()?; } fs::remove_dir(tmp_dir)?; Ok(()) } pub fn run_pipe(&mut self) -> anyhow::Result<()> { let start_time = std::time::SystemTime::now(); self.start_time = start_time; info!("Running Dorado with params: {:#?}", self.config); let dorado_bin = "/data/tools/dorado-0.7.2-linux-x64/bin/dorado"; self.create_reference_mmi()?; self.create_directories()?; info!( "Reading {} pod5 from: {}", self.case.time_point, self.config.pod_dir ); let bam_path = std::path::Path::new(&self.bam); if !bam_path.exists() { self.basecall_align(dorado_bin)?; } else { let new_bam_path = bam_path .parent() .unwrap() .join(format!("{}.bam", Uuid::new_v4())); warn!("Creating new bam {}", new_bam_path.display()); self.basecall_align(dorado_bin)?; self.merge_bam(&new_bam_path)?; } self.run_cramino()?; self.run_modkit()?; self.create_fastq()?; let end_time = std::time::SystemTime::now(); self.end_time = end_time; let execution_time = end_time.duration_since(start_time).unwrap().as_secs_f64(); info!( "Dorado and Minimap2 execution time: {} seconds", execution_time ); self.is_done = true; Ok(()) } }