use std::{ fs::{self, File}, path::{Path, PathBuf}, time::SystemTime, }; use duct::cmd; use log::{debug, info, warn}; use uuid::Uuid; use crate::{ collection::{bam::bam_compo, flowcells::FlowCell, pod5::FlowCellCase}, config::Config, helpers::find_unique_file, io::pod5_infos::Pod5Info, }; #[derive(Debug, Clone)] pub struct DoradoParams { pub ref_fa: String, pub ref_mmi: String, pub name: String, pub time: String, pub pod_dir: String, pub samtools_view_threads: u16, pub samtools_sort_threads: u16, } pub struct Dorado { config: Config, case: FlowCellCase, case_dir: String, time_dir: String, bam: String, start_time: SystemTime, end_time: SystemTime, is_done: bool, } impl Dorado { pub fn init(case: FlowCellCase, config: Config) -> anyhow::Result { let data_dir = &config.result_dir; let case_dir = format!("{}/{}", data_dir, case.id); let time_dir = format!("{}/{}", case_dir, case.time_point); let bam = format!("{}/{}_{}_hs1.bam", time_dir, case.id, case.time_point); debug!("Dorado init with config: {config:#?}"); info!("Final BAM file: {bam}"); Ok(Self { config, start_time: SystemTime::now(), end_time: SystemTime::now(), is_done: false, case_dir, time_dir, bam, case, }) } fn create_reference_mmi(&self) -> anyhow::Result<()> { if !std::path::Path::new(&self.config.align.ref_mmi).exists() { cmd!( "minimap2", "-x", "map-ont", "-d", &self.config.align.ref_mmi, &self.config.align.ref_fa ) .run()?; } Ok(()) } fn create_directories(&self) -> anyhow::Result<()> { if !std::path::Path::new(&self.case_dir).exists() { info!("Creating directory {}", self.case_dir); fs::create_dir(&self.case_dir)?; } if !std::path::Path::new(&self.time_dir).exists() { info!("Creating directory {}", self.time_dir); fs::create_dir(&self.time_dir)?; } Ok(()) } fn basecall_align(&mut self, dorado_bin: &str) -> anyhow::Result<()> { let pod_dir = &self.case.pod_dir; let ref_mmi = &self.config.align.ref_mmi; let bam = &self.bam; let samtools_view_threads = self.config.align.samtools_view_threads; let samtools_sort_threads = self.config.align.samtools_sort_threads; let dorado_arg = self.config.align.dorado_basecall_arg.clone(); let pod_path = fs::read_dir(pod_dir) .map_err(|e| anyhow::anyhow!("Failed to read pod5 dir: {}.\n\t{e}", pod_dir.display()))? .filter_map(|p| p.ok()) .map(|p| p.path()) .filter(|p| p.extension().unwrap() == "pod5") .take(1) .collect::>() .pop() .unwrap(); let sequencing_kit = Pod5Info::from_pod5(pod_path.to_str().unwrap()) .sequencing_kit .to_uppercase(); let dorado = format!( "{dorado_bin} basecaller --kit-name {sequencing_kit} {dorado_arg} {} --trim all --emit-moves --reference {ref_mmi} ", pod_dir.display() ); info!("running Dorado: {dorado}"); let samtools_view = format!("samtools view -h -@ {samtools_view_threads} -b /dev/stdin"); let samtools_sort = format!("samtools sort -@ {samtools_sort_threads} /dev/stdin -o {bam}"); let pipe = format!("{dorado} | {samtools_view} | {samtools_sort}"); info!("Running: {pipe}"); let pipe_cmd = cmd!("bash", "-c", &pipe); pipe_cmd .run() .map_err(|e| anyhow::anyhow!("Failed to run pipe: {pipe}.\n\t{}", e.to_string()))?; // let pipe_cmd = cmd!("bash", "-c", pipe); // let mut reader = pipe_cmd.stderr_capture().reader()?; // // let mut buffer = [0; 1]; // let mut line = String::new(); // // loop { // match reader.read(&mut buffer) { // Ok(0) => break, // End of output // Ok(_) => { // let char = buffer[0] as char; // eprint!("-{}", char); // std::io::stderr().flush()?; // // if char == '\n' { // // Send the complete line // self.log.push(line.clone()); // line.clear(); // } else { // line.push(char); // } // } // Err(err) => { // warn!("Error reading from stderr: {}", err); // break; // } // } // } Ok(()) } pub fn index(&self) -> anyhow::Result<()> { let t = self.config.align.samtools_view_threads.to_string(); let cmd = format!("index -@ {t} {}", &self.bam); info!("Running samtools {cmd}"); cmd!("samtools", "index", "-@", &t, &self.bam).run()?; Ok(()) } pub fn run_cramino(&self) -> anyhow::Result<()> { let cramino_out = format!( "{}/{}_{}_hs1_cramino.txt", self.time_dir, self.case.id, self.case.time_point ); info!("Quality control with cramino for BAM: {}", self.bam); let output = duct::cmd!("cramino", "-t", "150", "--karyotype", &self.bam) .stdout_capture() .unchecked() .run()?; fs::write(cramino_out, output.stdout)?; Ok(()) } pub fn run_modkit(&self) -> anyhow::Result<()> { let mod_summary = format!( "{}/{}_{}_5mC_5hmC_summary.txt", self.time_dir, self.case.id, self.case.time_point ); info!("Generating base modification summary for BAM: {}", self.bam); let output = cmd!("modkit", "summary", "-t", "50", &self.bam) .stdout_capture() .unchecked() .run()?; fs::write(mod_summary, output.stdout)?; Ok(()) } pub fn create_fastq(&self) -> anyhow::Result<()> { let bam = &self.bam; let fastq = format!( "{}/{}/{}/{}_{}.fastq.gz", self.case_dir, self.case.id, self.case.time_point, self.case.id, self.case.time_point ); if !std::path::Path::new(&fastq).exists() { let samtools = format!("samtools fastq -@ 150 {bam}"); let crabz = format!("crabz -f bgzf - -o {fastq}"); let pipe = format!("{samtools} | {crabz}"); info!("Running: {pipe}"); let pipe_cmd = duct::cmd!("bash", "-c", pipe); pipe_cmd.run()?; } Ok(()) } pub fn merge_bam(&self, bam: &Path) -> anyhow::Result<()> { let composition_a: Vec = bam_compo(bam.to_string_lossy().as_ref(), 20000)? .iter() .map(|(i, _, _)| i.clone()) .collect(); let composition_b: Vec = bam_compo(&self.bam, 20000)? .iter() .map(|(i, _, _)| i.clone()) .collect(); let n_id = composition_a .iter() .filter(|id| composition_b.contains(id)) .count(); if n_id > 0 { warn!( "{} is already merged, reads with the same run_id in the destination BAM.", self.case.id ); return Ok(()); } let into = PathBuf::from(&self.bam); let dir = into.parent().unwrap(); let original_file = into.file_name().unwrap().to_string_lossy().to_string(); let original_i = dir.join(format!("{original_file}.bai")); if !original_i.exists() { self.index()?; } let tmp_original_file = format!("{}.bam", Uuid::new_v4()); let tmp_original = dir.join(tmp_original_file.clone()); let tmp_original_i = dir.join(format!("{tmp_original_file}.bai")); info!("Moving {} to {}", &into.display(), &tmp_original.display()); fs::rename(&into, &tmp_original)?; info!( "Moving {} to {}", &original_i.display(), &tmp_original_i.display() ); fs::rename(original_i, tmp_original_i.clone())?; let cmd = format!( "samtools merge -@ 160 -h {} {} {} {}", bam.display(), into.display(), bam.display(), tmp_original.display() ); info!("Running {cmd}"); cmd!( "samtools", "merge", "-@", "160", "-h", bam, into, bam, tmp_original.clone() ) .run()?; fs::remove_file(tmp_original)?; fs::remove_file(tmp_original_i)?; fs::remove_file(bam)?; self.index()?; Ok(()) } pub fn from_mux(cases: Vec, config: Config) -> anyhow::Result<()> { // Creating a temporary directory let tmp_dir = format!("{}/.{}", config.result_dir, Uuid::new_v4()); info!("Creating tmp dir {tmp_dir}"); fs::create_dir(&tmp_dir)?; // Dorado base calling and align into a temporary bam file let muxed_bam = format!("{tmp_dir}/muxed.bam"); let dorado_bin = &config.align.dorado_bin; let dorado_arg = &config.align.dorado_basecall_arg; let pod_dir = cases[0].pod_dir.display(); let ref_mmi = &config.align.ref_mmi; let samtools_view_threads = config.align.samtools_view_threads; // Get the sequencing kit from the first pod5 file let muxed_pod_dir = &cases.first().unwrap().pod_dir; let pod_path = fs::read_dir(muxed_pod_dir) .map_err(|e| { anyhow::anyhow!( "Failed to read pod5 dir: {}.\n\t{e}", muxed_pod_dir.display() ) })? .filter_map(|p| p.ok()) .map(|p| p.path()) .filter(|p| p.extension().unwrap() == "pod5") .take(1) .collect::>() .pop() .unwrap(); let sequencing_kit = Pod5Info::from_pod5(pod_path.to_str().unwrap()) .sequencing_kit .to_uppercase(); let dorado = format!( "{dorado_bin} basecaller --kit-name {sequencing_kit} {dorado_arg} {pod_dir} --emit-moves --trim all --reference {ref_mmi}" ); let samtools_view = format!("samtools view -h -@ {samtools_view_threads} -b -o {muxed_bam}"); let pipe = format!("{dorado} | {samtools_view}"); info!("Running: {pipe}"); let pipe_cmd = cmd!("bash", "-c", &pipe); pipe_cmd .run() .map_err(|e| anyhow::anyhow!("Failed to run pipe: {pipe}.\n\t{}", e.to_string()))?; info!("Basecalling ✅"); // Demux the temporary bam file let tmp_demux_dir = format!("{tmp_dir}/demuxed"); fs::create_dir(&tmp_demux_dir)?; let pipe = format!( "samtools split -@ {} -f '{tmp_demux_dir}/%*_%!.bam' {muxed_bam}", config.align.samtools_view_threads ); info!("Demux from {sequencing_kit} into {tmp_demux_dir}",); info!("Running: {pipe}"); let pipe_cmd = cmd!("bash", "-c", pipe); pipe_cmd.run()?; info!("Demux ✅"); for case in cases.iter() { let barcode = case.barcode.replace("NB", ""); let bam = find_unique_file( &tmp_demux_dir, &format!("{sequencing_kit}_barcode{}.bam", barcode), )?; // Align let aligned_bam = format!( "{tmp_demux_dir}/{sequencing_kit}_barcode{}_aligned.bam", barcode ); let dorado = format!( // "{} aligner --threads 160 {} {trimmed_bam}", "{} aligner --threads 160 {} {bam}", config.align.dorado_bin, config.align.ref_fa, ); let samtools_view = format!( "samtools view -h -@ {} -b /dev/stdin", &config.align.samtools_view_threads ); let samtools_sort = format!( "samtools sort -@ {} /dev/stdin -o {aligned_bam}", &config.align.samtools_sort_threads ); let pipe = format!("{dorado} | {samtools_view} | {samtools_sort}"); info!("Running {pipe}"); cmd!("bash", "-c", pipe).run()?; info!("Alignement ✅"); let d = Dorado::init(case.clone(), config.clone())?; d.create_directories()?; if PathBuf::from(&d.bam).exists() { info!("Merging"); d.merge_bam(&PathBuf::from(aligned_bam))?; } else { info!("Moving from {} to {}", bam, d.bam); fs::rename(aligned_bam, d.bam.clone())?; d.index()?; } d.run_cramino()?; d.run_modkit()?; } fs::remove_dir_all(tmp_dir)?; Ok(()) } pub fn run_pipe(&mut self) -> anyhow::Result<()> { let start_time = std::time::SystemTime::now(); self.start_time = start_time; debug!("Running Dorado with config: {:#?}", self.config); let dorado_bin = self.config.align.dorado_bin.clone(); self.create_reference_mmi()?; self.create_directories()?; info!( "Reading {} pod5 from: {}", self.case.time_point, self.config.pod_dir ); let bam_path = std::path::Path::new(&self.bam); if !bam_path.exists() { info!("Creating new bam file"); self.basecall_align(&dorado_bin)?; self.index()?; } else { // check if merge before call let new_bam_path = bam_path .parent() .unwrap() .join(format!("{}.bam", Uuid::new_v4())); warn!("Creating new bam {}", new_bam_path.display()); let bam = self.bam.clone(); self.bam = new_bam_path.clone().to_string_lossy().to_string(); self.basecall_align(&dorado_bin)?; self.bam.clone_from(&bam); self.merge_bam(&new_bam_path)?; } self.run_cramino()?; self.run_modkit()?; // self.create_fastq()?; let end_time = std::time::SystemTime::now(); self.end_time = end_time; let execution_time = end_time.duration_since(start_time).unwrap().as_secs_f64(); info!( "Dorado and Minimap2 execution time: {} seconds", execution_time ); self.is_done = true; Ok(()) } pub fn from_flowcell(flowcell: &FlowCell, config: &Config) -> anyhow::Result<()> { let tp_conv = |time_point: &str| -> String { match time_point { "normal" => config.normal_name.clone(), "tumoral" => config.tumoral_name.clone(), _ => panic!("Error time point name"), } }; use crate::collection::flowcells::FlowCellLocation::*; let base_pod_dir = match &flowcell.location { Local(_) => None, Archived(pod_tar) => { let file = File::open(pod_tar) .map_err(|e| anyhow::anyhow!("Failed to open tar file: {pod_tar}\n\t{e}"))?; let mut archive = tar::Archive::new(file); info!("Un-tar of archived {pod_tar}"); archive .unpack(&config.unarchive_tmp_dir) .map_err(|e| anyhow::anyhow!("Failed to un-tar: {pod_tar}\n\t{e}"))?; info!("Un-tar of archived {pod_tar} Done."); Some(config.unarchive_tmp_dir.to_string()) } }; use crate::collection::flowcells::FlowCellExperiment::*; match &flowcell.experiment { WGSPod5Mux(pod_dir) => { let pod_dir = if let Some(base_pod_dir) = &base_pod_dir { format!("{base_pod_dir}/{pod_dir}") } else { pod_dir.clone() }; let cases = flowcell .cases .iter() .map(|c| FlowCellCase { id: c.case_id.clone(), time_point: tp_conv(&c.sample_type), barcode: c.barcode.clone(), pod_dir: pod_dir.clone().into(), }) .collect(); info!("Starting basecaller for muxed pod5: {cases:#?}"); Dorado::from_mux(cases, config.clone())?; } WGSPod5Demux(pod_dir) => { let pod_dir = if let Some(base_pod_dir) = &base_pod_dir { format!("{base_pod_dir}/{pod_dir}") } else { pod_dir.clone() }; for c in flowcell.cases.iter() { let pod_dir = format!("{pod_dir}/barcode{}", c.barcode.replace("NB", "")); info!("Starting basecaller for demuxed pod5: {pod_dir}"); let mut d = Dorado::init( FlowCellCase { id: c.case_id.clone(), time_point: tp_conv(&c.sample_type), barcode: c.barcode.clone(), pod_dir: pod_dir.into(), }, config.clone(), )?; d.run_pipe()?; } } } Ok(()) } }