| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522 |
- use std::{
- fs::{self, File},
- path::{Path, PathBuf},
- time::SystemTime,
- };
- use duct::cmd;
- use log::{debug, info, warn};
- use uuid::Uuid;
- use crate::{
- collection::{bam::bam_compo, flowcells::FlowCell, pod5::FlowCellCase},
- config::Config,
- helpers::find_unique_file,
- io::pod5_infos::Pod5Info,
- };
- #[derive(Debug, Clone)]
- pub struct DoradoParams {
- pub ref_fa: String,
- pub ref_mmi: String,
- pub name: String,
- pub time: String,
- pub pod_dir: String,
- pub samtools_view_threads: u16,
- pub samtools_sort_threads: u16,
- }
- pub struct Dorado {
- config: Config,
- case: FlowCellCase,
- case_dir: String,
- time_dir: String,
- bam: String,
- start_time: SystemTime,
- end_time: SystemTime,
- is_done: bool,
- }
- impl Dorado {
- pub fn init(case: FlowCellCase, config: Config) -> anyhow::Result<Self> {
- let data_dir = &config.result_dir;
- let case_dir = format!("{}/{}", data_dir, case.id);
- let time_dir = format!("{}/{}", case_dir, case.time_point);
- let bam = format!("{}/{}_{}_hs1.bam", time_dir, case.id, case.time_point);
- debug!("Dorado init with config: {config:#?}");
- info!("Final BAM file: {bam}");
- Ok(Self {
- config,
- start_time: SystemTime::now(),
- end_time: SystemTime::now(),
- is_done: false,
- case_dir,
- time_dir,
- bam,
- case,
- })
- }
- fn create_reference_mmi(&self) -> anyhow::Result<()> {
- if !std::path::Path::new(&self.config.align.ref_mmi).exists() {
- cmd!(
- "minimap2",
- "-x",
- "map-ont",
- "-d",
- &self.config.align.ref_mmi,
- &self.config.align.ref_fa
- )
- .run()?;
- }
- Ok(())
- }
- fn create_directories(&self) -> anyhow::Result<()> {
- if !std::path::Path::new(&self.case_dir).exists() {
- info!("Creating directory {}", self.case_dir);
- fs::create_dir(&self.case_dir)?;
- }
- if !std::path::Path::new(&self.time_dir).exists() {
- info!("Creating directory {}", self.time_dir);
- fs::create_dir(&self.time_dir)?;
- }
- Ok(())
- }
- fn basecall_align(&mut self, dorado_bin: &str) -> anyhow::Result<()> {
- let pod_dir = &self.case.pod_dir;
- let ref_mmi = &self.config.align.ref_mmi;
- let bam = &self.bam;
- let samtools_view_threads = self.config.align.samtools_view_threads;
- let samtools_sort_threads = self.config.align.samtools_sort_threads;
- let dorado_arg = self.config.align.dorado_basecall_arg.clone();
- let pod_path = fs::read_dir(pod_dir)
- .map_err(|e| anyhow::anyhow!("Failed to read pod5 dir: {}.\n\t{e}", pod_dir.display()))?
- .filter_map(|p| p.ok())
- .map(|p| p.path())
- .filter(|p| p.extension().unwrap() == "pod5")
- .take(1)
- .collect::<Vec<PathBuf>>()
- .pop()
- .unwrap();
- let sequencing_kit = Pod5Info::from_pod5(pod_path.to_str().unwrap())
- .sequencing_kit
- .to_uppercase();
- let dorado = format!(
- "{dorado_bin} basecaller --kit-name {sequencing_kit} {dorado_arg} {} --trim all --emit-moves --reference {ref_mmi} ",
- pod_dir.display()
- );
- info!("running Dorado: {dorado}");
- let samtools_view = format!("samtools view -h -@ {samtools_view_threads} -b /dev/stdin");
- let samtools_sort = format!("samtools sort -@ {samtools_sort_threads} /dev/stdin -o {bam}");
- let pipe = format!("{dorado} | {samtools_view} | {samtools_sort}");
- info!("Running: {pipe}");
- let pipe_cmd = cmd!("bash", "-c", &pipe);
- pipe_cmd
- .run()
- .map_err(|e| anyhow::anyhow!("Failed to run pipe: {pipe}.\n\t{}", e.to_string()))?;
- // let pipe_cmd = cmd!("bash", "-c", pipe);
- // let mut reader = pipe_cmd.stderr_capture().reader()?;
- //
- // let mut buffer = [0; 1];
- // let mut line = String::new();
- //
- // loop {
- // match reader.read(&mut buffer) {
- // Ok(0) => break, // End of output
- // Ok(_) => {
- // let char = buffer[0] as char;
- // eprint!("-{}", char);
- // std::io::stderr().flush()?;
- //
- // if char == '\n' {
- // // Send the complete line
- // self.log.push(line.clone());
- // line.clear();
- // } else {
- // line.push(char);
- // }
- // }
- // Err(err) => {
- // warn!("Error reading from stderr: {}", err);
- // break;
- // }
- // }
- // }
- Ok(())
- }
- pub fn index(&self) -> anyhow::Result<()> {
- let t = self.config.align.samtools_view_threads.to_string();
- let cmd = format!("index -@ {t} {}", &self.bam);
- info!("Running samtools {cmd}");
- cmd!("samtools", "index", "-@", &t, &self.bam).run()?;
- Ok(())
- }
- pub fn run_cramino(&self) -> anyhow::Result<()> {
- let cramino_out = format!(
- "{}/{}_{}_hs1_cramino.txt",
- self.time_dir, self.case.id, self.case.time_point
- );
- info!("Quality control with cramino for BAM: {}", self.bam);
- let output = duct::cmd!("cramino", "-t", "150", "--karyotype", &self.bam)
- .stdout_capture()
- .unchecked()
- .run()?;
- fs::write(cramino_out, output.stdout)?;
- Ok(())
- }
- pub fn run_modkit(&self) -> anyhow::Result<()> {
- let mod_summary = format!(
- "{}/{}_{}_5mC_5hmC_summary.txt",
- self.time_dir, self.case.id, self.case.time_point
- );
- info!("Generating base modification summary for BAM: {}", self.bam);
- let output = cmd!("modkit", "summary", "-t", "50", &self.bam)
- .stdout_capture()
- .unchecked()
- .run()?;
- fs::write(mod_summary, output.stdout)?;
- Ok(())
- }
- pub fn create_fastq(&self) -> anyhow::Result<()> {
- let bam = &self.bam;
- let fastq = format!(
- "{}/{}/{}/{}_{}.fastq.gz",
- self.case_dir, self.case.id, self.case.time_point, self.case.id, self.case.time_point
- );
- if !std::path::Path::new(&fastq).exists() {
- let samtools = format!("samtools fastq -@ 150 {bam}");
- let crabz = format!("crabz -f bgzf - -o {fastq}");
- let pipe = format!("{samtools} | {crabz}");
- info!("Running: {pipe}");
- let pipe_cmd = duct::cmd!("bash", "-c", pipe);
- pipe_cmd.run()?;
- }
- Ok(())
- }
- pub fn merge_bam(&self, bam: &Path) -> anyhow::Result<()> {
- let composition_a: Vec<String> = bam_compo(bam.to_string_lossy().as_ref(), 20000)?
- .iter()
- .map(|(i, _, _)| i.clone())
- .collect();
- let composition_b: Vec<String> = bam_compo(&self.bam, 20000)?
- .iter()
- .map(|(i, _, _)| i.clone())
- .collect();
- let n_id = composition_a
- .iter()
- .filter(|id| composition_b.contains(id))
- .count();
- if n_id > 0 {
- warn!(
- "{} is already merged, reads with the same run_id in the destination BAM.",
- self.case.id
- );
- return Ok(());
- }
- let into = PathBuf::from(&self.bam);
- let dir = into.parent().unwrap();
- let original_file = into.file_name().unwrap().to_string_lossy().to_string();
- let original_i = dir.join(format!("{original_file}.bai"));
- if !original_i.exists() {
- self.index()?;
- }
- let tmp_original_file = format!("{}.bam", Uuid::new_v4());
- let tmp_original = dir.join(tmp_original_file.clone());
- let tmp_original_i = dir.join(format!("{tmp_original_file}.bai"));
- info!("Moving {} to {}", &into.display(), &tmp_original.display());
- fs::rename(&into, &tmp_original)?;
- info!(
- "Moving {} to {}",
- &original_i.display(),
- &tmp_original_i.display()
- );
- fs::rename(original_i, tmp_original_i.clone())?;
- let cmd = format!(
- "samtools merge -@ 160 -h {} {} {} {}",
- bam.display(),
- into.display(),
- bam.display(),
- tmp_original.display()
- );
- info!("Running {cmd}");
- cmd!(
- "samtools",
- "merge",
- "-@",
- "160",
- "-h",
- bam,
- into,
- bam,
- tmp_original.clone()
- )
- .run()?;
- fs::remove_file(tmp_original)?;
- fs::remove_file(tmp_original_i)?;
- fs::remove_file(bam)?;
- self.index()?;
- Ok(())
- }
- pub fn from_mux(cases: Vec<FlowCellCase>, config: Config) -> anyhow::Result<()> {
- // Creating a temporary directory
- let tmp_dir = format!("{}/.{}", config.result_dir, Uuid::new_v4());
- info!("Creating tmp dir {tmp_dir}");
- fs::create_dir(&tmp_dir)?;
- // Dorado base calling and align into a temporary bam file
- let muxed_bam = format!("{tmp_dir}/muxed.bam");
- let dorado_bin = &config.align.dorado_bin;
- let dorado_arg = &config.align.dorado_basecall_arg;
- let pod_dir = cases[0].pod_dir.display();
- let ref_mmi = &config.align.ref_mmi;
- let samtools_view_threads = config.align.samtools_view_threads;
- // Get the sequencing kit from the first pod5 file
- let muxed_pod_dir = &cases.first().unwrap().pod_dir;
- let pod_path = fs::read_dir(muxed_pod_dir)
- .map_err(|e| {
- anyhow::anyhow!(
- "Failed to read pod5 dir: {}.\n\t{e}",
- muxed_pod_dir.display()
- )
- })?
- .filter_map(|p| p.ok())
- .map(|p| p.path())
- .filter(|p| p.extension().unwrap() == "pod5")
- .take(1)
- .collect::<Vec<PathBuf>>()
- .pop()
- .unwrap();
- let sequencing_kit = Pod5Info::from_pod5(pod_path.to_str().unwrap())
- .sequencing_kit
- .to_uppercase();
- let dorado = format!(
- "{dorado_bin} basecaller --kit-name {sequencing_kit} {dorado_arg} {pod_dir} --emit-moves --trim all --reference {ref_mmi}"
- );
- let samtools_view =
- format!("samtools view -h -@ {samtools_view_threads} -b -o {muxed_bam}");
- let pipe = format!("{dorado} | {samtools_view}");
- info!("Running: {pipe}");
- let pipe_cmd = cmd!("bash", "-c", &pipe);
- pipe_cmd
- .run()
- .map_err(|e| anyhow::anyhow!("Failed to run pipe: {pipe}.\n\t{}", e.to_string()))?;
- info!("Basecalling ✅");
- // Demux the temporary bam file
- let tmp_demux_dir = format!("{tmp_dir}/demuxed");
- fs::create_dir(&tmp_demux_dir)?;
- let pipe = format!(
- "samtools split -@ {} -f '{tmp_demux_dir}/%*_%!.bam' {muxed_bam}",
- config.align.samtools_view_threads
- );
- info!("Demux from {sequencing_kit} into {tmp_demux_dir}",);
- info!("Running: {pipe}");
- let pipe_cmd = cmd!("bash", "-c", pipe);
- pipe_cmd.run()?;
- info!("Demux ✅");
- for case in cases.iter() {
- let barcode = case.barcode.replace("NB", "");
- let bam = find_unique_file(
- &tmp_demux_dir,
- &format!("{sequencing_kit}_barcode{}.bam", barcode),
- )?;
- // Align
- let aligned_bam = format!(
- "{tmp_demux_dir}/{sequencing_kit}_barcode{}_aligned.bam",
- barcode
- );
- let dorado = format!(
- // "{} aligner --threads 160 {} {trimmed_bam}",
- "{} aligner --threads 160 {} {bam}",
- config.align.dorado_bin, config.align.ref_fa,
- );
- let samtools_view = format!(
- "samtools view -h -@ {} -b /dev/stdin",
- &config.align.samtools_view_threads
- );
- let samtools_sort = format!(
- "samtools sort -@ {} /dev/stdin -o {aligned_bam}",
- &config.align.samtools_sort_threads
- );
- let pipe = format!("{dorado} | {samtools_view} | {samtools_sort}");
- info!("Running {pipe}");
- cmd!("bash", "-c", pipe).run()?;
- info!("Alignement ✅");
- let d = Dorado::init(case.clone(), config.clone())?;
- d.create_directories()?;
- if PathBuf::from(&d.bam).exists() {
- info!("Merging");
- d.merge_bam(&PathBuf::from(aligned_bam))?;
- } else {
- info!("Moving from {} to {}", bam, d.bam);
- fs::rename(aligned_bam, d.bam.clone())?;
- d.index()?;
- }
- d.run_cramino()?;
- d.run_modkit()?;
- }
- fs::remove_dir_all(tmp_dir)?;
- Ok(())
- }
- pub fn run_pipe(&mut self) -> anyhow::Result<()> {
- let start_time = std::time::SystemTime::now();
- self.start_time = start_time;
- debug!("Running Dorado with config: {:#?}", self.config);
- let dorado_bin = self.config.align.dorado_bin.clone();
- self.create_reference_mmi()?;
- self.create_directories()?;
- info!(
- "Reading {} pod5 from: {}",
- self.case.time_point, self.config.pod_dir
- );
- let bam_path = std::path::Path::new(&self.bam);
- if !bam_path.exists() {
- info!("Creating new bam file");
- self.basecall_align(&dorado_bin)?;
- self.index()?;
- } else {
- // check if merge before call
- let new_bam_path = bam_path
- .parent()
- .unwrap()
- .join(format!("{}.bam", Uuid::new_v4()));
- warn!("Creating new bam {}", new_bam_path.display());
- let bam = self.bam.clone();
- self.bam = new_bam_path.clone().to_string_lossy().to_string();
- self.basecall_align(&dorado_bin)?;
- self.bam.clone_from(&bam);
- self.merge_bam(&new_bam_path)?;
- }
- self.run_cramino()?;
- self.run_modkit()?;
- // self.create_fastq()?;
- let end_time = std::time::SystemTime::now();
- self.end_time = end_time;
- let execution_time = end_time.duration_since(start_time).unwrap().as_secs_f64();
- info!(
- "Dorado and Minimap2 execution time: {} seconds",
- execution_time
- );
- self.is_done = true;
- Ok(())
- }
- pub fn from_flowcell(flowcell: &FlowCell, config: &Config) -> anyhow::Result<()> {
- let tp_conv = |time_point: &str| -> String {
- match time_point {
- "normal" => config.normal_name.clone(),
- "tumoral" => config.tumoral_name.clone(),
- _ => panic!("Error time point name"),
- }
- };
- use crate::collection::flowcells::FlowCellLocation::*;
- let base_pod_dir = match &flowcell.location {
- Local(_) => None,
- Archived(pod_tar) => {
- let file = File::open(pod_tar)
- .map_err(|e| anyhow::anyhow!("Failed to open tar file: {pod_tar}\n\t{e}"))?;
- let mut archive = tar::Archive::new(file);
- info!("Un-tar of archived {pod_tar}");
- archive
- .unpack(&config.unarchive_tmp_dir)
- .map_err(|e| anyhow::anyhow!("Failed to un-tar: {pod_tar}\n\t{e}"))?;
- info!("Un-tar of archived {pod_tar} Done.");
- Some(config.unarchive_tmp_dir.to_string())
- }
- };
- use crate::collection::flowcells::FlowCellExperiment::*;
- match &flowcell.experiment {
- WGSPod5Mux(pod_dir) => {
- let pod_dir = if let Some(base_pod_dir) = &base_pod_dir {
- format!("{base_pod_dir}/{pod_dir}")
- } else {
- pod_dir.clone()
- };
- let cases = flowcell
- .cases
- .iter()
- .map(|c| FlowCellCase {
- id: c.case_id.clone(),
- time_point: tp_conv(&c.sample_type),
- barcode: c.barcode.clone(),
- pod_dir: pod_dir.clone().into(),
- })
- .collect();
- info!("Starting basecaller for muxed pod5: {cases:#?}");
- Dorado::from_mux(cases, config.clone())?;
- }
- WGSPod5Demux(pod_dir) => {
- let pod_dir = if let Some(base_pod_dir) = &base_pod_dir {
- format!("{base_pod_dir}/{pod_dir}")
- } else {
- pod_dir.clone()
- };
- for c in flowcell.cases.iter() {
- let pod_dir = format!("{pod_dir}/barcode{}", c.barcode.replace("NB", ""));
- info!("Starting basecaller for demuxed pod5: {pod_dir}");
- let mut d = Dorado::init(
- FlowCellCase {
- id: c.case_id.clone(),
- time_point: tp_conv(&c.sample_type),
- barcode: c.barcode.clone(),
- pod_dir: pod_dir.into(),
- },
- config.clone(),
- )?;
- d.run_pipe()?;
- }
- }
- }
-
- Ok(())
- }
- }
|