| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410 |
- use std::{
- fs,
- io::{Read, Write},
- path::{Path, PathBuf},
- time::SystemTime,
- };
- use duct::cmd;
- use log::{info, warn};
- use uuid::Uuid;
- use crate::{config::Config, pod5::FlowCellCase};
- pub trait Run {
- fn run(self) -> anyhow::Result<()>;
- }
- #[derive(Debug, Clone)]
- pub struct DoradoParams {
- pub ref_fa: String,
- pub ref_mmi: String,
- pub name: String,
- pub time: String,
- pub pod_dir: String,
- pub samtools_view_threads: u16,
- pub samtools_sort_threads: u16,
- }
- pub struct Dorado {
- config: Config,
- case: FlowCellCase,
- case_dir: String,
- time_dir: String,
- bam: String,
- start_time: SystemTime,
- end_time: SystemTime,
- is_done: bool,
- log: Vec<String>,
- }
- impl Dorado {
- pub fn init(case: FlowCellCase, config: Config) -> anyhow::Result<Self> {
- let data_dir = "/data/longreads_basic_pipe";
- let case_dir = format!("{}/{}", data_dir, case.id);
- let time_dir = format!("{}/{}", case_dir, case.time_point);
- let bam = format!("{}/{}_{}_hs1.bam", time_dir, case.id, case.time_point);
- Ok(Self {
- config,
- start_time: SystemTime::now(),
- end_time: SystemTime::now(),
- is_done: false,
- log: Vec::new(),
- case_dir,
- time_dir,
- bam,
- case,
- })
- }
- fn create_reference_mmi(&self) -> anyhow::Result<()> {
- if !std::path::Path::new(&self.config.align.ref_mmi).exists() {
- cmd!(
- "minimap2",
- "-x",
- "map-ont",
- "-d",
- &self.config.align.ref_mmi,
- &self.config.align.ref_fa
- )
- .run()?;
- }
- Ok(())
- }
- fn create_directories(&self) -> anyhow::Result<()> {
- if !std::path::Path::new(&self.case_dir).exists() {
- fs::create_dir(&self.case_dir)?;
- }
- if !std::path::Path::new(&self.time_dir).exists() {
- fs::create_dir(&self.time_dir)?;
- }
- Ok(())
- }
- fn basecall_align(&mut self, dorado_bin: &str) -> anyhow::Result<()> {
- let pod_dir = &self.case.pod_dir.display();
- let ref_mmi = &self.config.align.ref_mmi;
- let bam = &self.bam;
- let samtools_view_threads = self.config.align.samtools_view_threads;
- let samtools_sort_threads = self.config.align.samtools_sort_threads;
- let dorado = format!(
- "{dorado_bin} basecaller sup,5mC_5hmC {pod_dir} --trim all --reference {ref_mmi}"
- );
- let samtools_view = format!("samtools view -h -@ {samtools_view_threads} -b /dev/stdin");
- let samtools_sort = format!("samtools sort -@ {samtools_sort_threads} /dev/stdin -o {bam}");
- let pipe = format!("{dorado} | {samtools_view} | {samtools_sort}");
- info!("Running: {pipe}");
- let pipe_cmd = cmd!("bash", "-c", pipe);
- let mut reader = pipe_cmd.stdout_capture().reader()?;
- let mut buffer = [0; 1];
- let mut line = String::new();
- loop {
- match reader.read(&mut buffer) {
- Ok(0) => break, // End of output
- Ok(_) => {
- let char = buffer[0] as char;
- eprint!("{}", char);
- std::io::stderr().flush()?;
- if char == '\n' {
- // Send the complete line
- self.log.push(line.clone());
- line.clear();
- } else {
- line.push(char);
- }
- }
- Err(err) => {
- warn!("Error reading from stderr: {}", err);
- break;
- }
- }
- }
- Ok(())
- }
- pub fn index(&self) -> anyhow::Result<()> {
- let t = self.config.align.samtools_view_threads.to_string();
- let cmd = format!("index -@ {t} {}", &self.bam);
- info!("Running samtools {cmd}");
- cmd!("samtools", "index", "-@", &t, &self.bam).run()?;
- Ok(())
- }
- fn run_cramino(&self) -> anyhow::Result<()> {
- let cramino_out = format!(
- "{}/{}_{}_hs1_cramino.txt",
- self.time_dir, self.case.id, self.case.time_point
- );
- // if !Path::new(&cramino_out).exists() {
- info!("Quality control of BAM: {}", self.bam);
- let output = duct::cmd!(
- "cramino",
- "-t",
- "150",
- "--hist",
- "--checksum",
- "--karyotype",
- &self.bam
- )
- .stdout_capture()
- .unchecked()
- .run()?;
- fs::write(cramino_out, output.stdout)?;
- // }
- Ok(())
- }
- fn run_modkit(&self) -> anyhow::Result<()> {
- let mod_summary = format!(
- "{}/{}_{}_5mC_5hmC_summary.txt",
- self.time_dir, self.case.id, self.case.time_point
- );
- // if !Path::new(&mod_summary).exists() {
- info!("Generating base modification summary for BAM: {}", self.bam);
- let output = cmd!("modkit", "summary", "-t", "50", &self.bam)
- .stdout_capture()
- .unchecked()
- .run()?;
- fs::write(mod_summary, output.stdout)?;
- // }
- Ok(())
- }
- fn create_fastq(&self) -> anyhow::Result<()> {
- let bam = &self.bam;
- let fastq = format!(
- "{}/{}/{}/{}_{}.fastq.gz",
- self.case_dir, self.case.id, self.case.time_point, self.case.id, self.case.time_point
- );
- if !std::path::Path::new(&fastq).exists() {
- let samtools = format!("samtools fastq -@ 150 {bam}");
- let crabz = format!("crabz -f bgzf - -o {fastq}");
- let pipe = format!("{samtools} | {crabz}");
- info!("Running: {pipe}");
- let pipe_cmd = duct::cmd!("bash", "-c", pipe);
- pipe_cmd.run()?;
- }
- Ok(())
- }
- fn merge_bam(&self, bam: &Path) -> anyhow::Result<()> {
- let into = PathBuf::from(&self.bam);
- let dir = into.parent().unwrap();
- let original_file = into.file_name().unwrap().to_string_lossy().to_string();
- let original_i = dir.join(format!("{original_file}.bai"));
- if !original_i.exists() {
- self.index()?;
- }
- let tmp_original_file = format!("{}.bam", Uuid::new_v4());
- let tmp_original = dir.join(tmp_original_file.clone());
- let tmp_original_i = dir.join(format!("{tmp_original_file}.bai"));
- info!("Moving {} to {}", &into.display(), &tmp_original.display());
- fs::rename(&into, &tmp_original)?;
- info!(
- "Moving {} to {}",
- &original_i.display(),
- &tmp_original_i.display()
- );
- fs::rename(original_i, tmp_original_i.clone())?;
- let cmd = format!(
- "samtools merge -@ 160 -h {} {} {} {}",
- bam.display(),
- into.display(),
- bam.display(),
- tmp_original.display()
- );
- info!("Running {cmd}");
- cmd!(
- "samtools",
- "merge",
- "-@",
- "160",
- "-h",
- bam,
- into,
- bam,
- tmp_original.clone()
- )
- .run()?;
- fs::remove_file(tmp_original)?;
- fs::remove_file(tmp_original_i)?;
- self.index()?;
- Ok(())
- }
- pub fn from_mux(cases: Vec<FlowCellCase>, config: Config) -> anyhow::Result<()> {
- // Creating a temporary directory
- let tmp_dir = format!("{}/.{}", config.result_dir, Uuid::new_v4());
- info!("Creating tmp dir {tmp_dir}");
- fs::create_dir(&tmp_dir)?;
- // Dorado base calling and align into a temporary bam file
- let muxed_bam = format!("{tmp_dir}/muxed.bam");
- let dorado_bin = &config.align.dorado_bin;
- let dorado_arg = &config.align.dorado_basecall_arg;
- let pod_dir = cases[0].pod_dir.display();
- let ref_mmi = &config.align.ref_mmi;
- let samtools_view_threads = config.align.samtools_view_threads;
- let dorado = format!(
- "{dorado_bin} basecaller {dorado_arg} {pod_dir} --trim all --reference {ref_mmi}"
- );
- let samtools_view =
- format!("samtools view -h -@ {samtools_view_threads} -b -o {muxed_bam}");
- let pipe = format!("{dorado} | {samtools_view}");
- info!("Running: {pipe}");
- let pipe_cmd = cmd!("bash", "-c", pipe);
- pipe_cmd.run()?;
- info!("Basecalling ✅");
- // Demux the temporary bam file
- // Get the sequencing kit from the first pod5 file
- let muxed_pod_dir = &cases.first().unwrap().pod_dir;
- let pod_path = fs::read_dir(muxed_pod_dir)?
- .filter_map(|p| p.ok())
- .map(|p| p.path())
- .filter(|p| p.extension().unwrap() == "pod5")
- .take(1)
- .collect::<Vec<PathBuf>>()
- .pop()
- .unwrap();
- let sequencing_kit = pandora_lib_pod5::Pod5Info::from_pod5(pod_path.to_str().unwrap())
- .sequencing_kit
- .to_uppercase();
- let tmp_demux_dir = format!("{tmp_dir}/demuxed");
- fs::create_dir(&tmp_demux_dir)?;
- info!("Demux from {sequencing_kit} into {tmp_demux_dir}",);
- duct::cmd!(
- &config.align.dorado_bin,
- "demux",
- "--output-dir",
- &tmp_demux_dir,
- "--kit-name",
- &sequencing_kit,
- &tmp_dir,
- )
- .run()?;
- info!("Demux ✅");
- for case in cases.iter() {
- let bam = format!(
- "{tmp_demux_dir}/{sequencing_kit}_barcode{}.bam",
- case.barcode
- );
- // Trim
- let trimmed_bam = format!(
- "{tmp_demux_dir}/{sequencing_kit}_barcode{}_trimmed.bam",
- case.barcode
- );
- let pipe = format!(
- "{} trim {bam} | samtools view -h -@ {} -b /dev/stdin -o {trimmed_bam}",
- config.align.dorado_bin, &config.align.samtools_view_threads
- );
- cmd!("bash", "-c", pipe).run()?;
- // Align
- let aligned_bam = format!(
- "{tmp_demux_dir}/{sequencing_kit}_barcode{}_aligned.bam",
- case.barcode
- );
- let dorado = format!(
- "{} aligner --threads 160 {} {trimmed_bam}",
- config.align.dorado_bin, config.align.ref_fa,
- );
- let samtools_view = format!(
- "samtools view -h -@ {} -b /dev/stdin",
- &config.align.samtools_view_threads
- );
- let samtools_sort = format!(
- "samtools sort -@ {} /dev/stdin -o {aligned_bam}",
- &config.align.samtools_sort_threads
- );
- let pipe = format!("{dorado} | {samtools_view} | {samtools_sort}");
- info!("Running {pipe}");
- cmd!("bash", "-c", pipe).run()?;
- let d = Dorado::init(case.clone(), config.clone())?;
- d.create_directories()?;
- if PathBuf::from(&d.bam).exists() {
- info!("merge");
- d.merge_bam(&PathBuf::from(aligned_bam))?;
- } else {
- info!("Moving from {} to {}", bam, d.bam);
- fs::rename(aligned_bam, d.bam.clone())?;
- d.index()?;
- }
- d.run_cramino()?;
- d.run_modkit()?;
- }
- fs::remove_dir(tmp_dir)?;
- Ok(())
- }
- pub fn run_pipe(&mut self) -> anyhow::Result<()> {
- let start_time = std::time::SystemTime::now();
- self.start_time = start_time;
- info!("Running Dorado with params: {:#?}", self.config);
- let dorado_bin = "/data/tools/dorado-0.7.2-linux-x64/bin/dorado";
- self.create_reference_mmi()?;
- self.create_directories()?;
- info!(
- "Reading {} pod5 from: {}",
- self.case.time_point, self.config.pod_dir
- );
- let bam_path = std::path::Path::new(&self.bam);
- if !bam_path.exists() {
- self.basecall_align(dorado_bin)?;
- } else {
- let new_bam_path = bam_path
- .parent()
- .unwrap()
- .join(format!("{}.bam", Uuid::new_v4()));
- warn!("Creating new bam {}", new_bam_path.display());
- self.basecall_align(dorado_bin)?;
- self.merge_bam(&new_bam_path)?;
- }
- self.run_cramino()?;
- self.run_modkit()?;
- self.create_fastq()?;
- let end_time = std::time::SystemTime::now();
- self.end_time = end_time;
- let execution_time = end_time.duration_since(start_time).unwrap().as_secs_f64();
- info!(
- "Dorado and Minimap2 execution time: {} seconds",
- execution_time
- );
- self.is_done = true;
- Ok(())
- }
- }
|