|
@@ -1,19 +1,13 @@
|
|
|
use std::{
|
|
use std::{
|
|
|
- fs::{self, File},
|
|
|
|
|
|
|
+ fs::{self},
|
|
|
path::{Path, PathBuf},
|
|
path::{Path, PathBuf},
|
|
|
- time::SystemTime,
|
|
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
use anyhow::Context;
|
|
use anyhow::Context;
|
|
|
-use duct::cmd;
|
|
|
|
|
-use log::{debug, info, warn};
|
|
|
|
|
-use uuid::Uuid;
|
|
|
|
|
|
|
|
|
|
use crate::{
|
|
use crate::{
|
|
|
- collection::{bam::bam_composition, flowcells::FlowCell, pod5::FlowCellCase},
|
|
|
|
|
commands::{Command, SlurmParams},
|
|
commands::{Command, SlurmParams},
|
|
|
config::Config,
|
|
config::Config,
|
|
|
- helpers::find_unique_file,
|
|
|
|
|
io::pod5_infos::Pod5Info,
|
|
io::pod5_infos::Pod5Info,
|
|
|
slurm_helpers::max_gpu_per_node,
|
|
slurm_helpers::max_gpu_per_node,
|
|
|
};
|
|
};
|
|
@@ -151,7 +145,8 @@ impl Command for DoradoBasecall {
|
|
|
/// ```
|
|
/// ```
|
|
|
impl super::SlurmRunner for DoradoBasecall {
|
|
impl super::SlurmRunner for DoradoBasecall {
|
|
|
fn slurm_args(&self) -> Vec<String> {
|
|
fn slurm_args(&self) -> Vec<String> {
|
|
|
- let (gpu, n) = if let (Some(h100_av), Some(a100_av)) = (max_gpu_per_node("h100"), max_gpu_per_node("a100"))
|
|
|
|
|
|
|
+ let (gpu, n) = if let (Some(h100_av), Some(a100_av)) =
|
|
|
|
|
+ (max_gpu_per_node("h100"), max_gpu_per_node("a100"))
|
|
|
{
|
|
{
|
|
|
let (gpu, n) = if h100_av >= a100_av {
|
|
let (gpu, n) = if h100_av >= a100_av {
|
|
|
("h100", h100_av)
|
|
("h100", h100_av)
|
|
@@ -328,551 +323,551 @@ pub struct DoradoParams {
|
|
|
pub samtools_sort_threads: u16,
|
|
pub samtools_sort_threads: u16,
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-pub struct Dorado {
|
|
|
|
|
- config: Config,
|
|
|
|
|
- case: FlowCellCase,
|
|
|
|
|
- case_dir: String,
|
|
|
|
|
- time_dir: String,
|
|
|
|
|
- bam: String,
|
|
|
|
|
- start_time: SystemTime,
|
|
|
|
|
- end_time: SystemTime,
|
|
|
|
|
- is_done: bool,
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-impl Dorado {
|
|
|
|
|
- pub fn init(case: FlowCellCase, config: Config) -> anyhow::Result<Self> {
|
|
|
|
|
- let data_dir = &config.result_dir;
|
|
|
|
|
- let case_dir = format!("{}/{}", data_dir, case.id);
|
|
|
|
|
- let time_dir = format!("{}/{}", case_dir, case.time_point);
|
|
|
|
|
- let bam = format!("{}/{}_{}_hs1.bam", time_dir, case.id, case.time_point);
|
|
|
|
|
- debug!("Dorado init with config: {config:#?}");
|
|
|
|
|
- info!("Final BAM file: {bam}");
|
|
|
|
|
-
|
|
|
|
|
- Ok(Self {
|
|
|
|
|
- config,
|
|
|
|
|
- start_time: SystemTime::now(),
|
|
|
|
|
- end_time: SystemTime::now(),
|
|
|
|
|
- is_done: false,
|
|
|
|
|
- case_dir,
|
|
|
|
|
- time_dir,
|
|
|
|
|
- bam,
|
|
|
|
|
- case,
|
|
|
|
|
- })
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // ------------------------------------------------------------------
|
|
|
|
|
- // Small helper to actually execute a shell command
|
|
|
|
|
- // ------------------------------------------------------------------
|
|
|
|
|
- fn run_shell(cmdline: &str) -> anyhow::Result<()> {
|
|
|
|
|
- info!("Running: {cmdline}");
|
|
|
|
|
- cmd!("bash", "-c", cmdline)
|
|
|
|
|
- .run()
|
|
|
|
|
- .map_err(|e| anyhow::anyhow!("Failed to run: {cmdline}\n\t{}", e.to_string()))?;
|
|
|
|
|
- Ok(())
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // ------------------------------------------------------------------
|
|
|
|
|
- // Command builders (return strings)
|
|
|
|
|
- // ------------------------------------------------------------------
|
|
|
|
|
-
|
|
|
|
|
- /// minimap2 index creation (returns None if index already exists)
|
|
|
|
|
- fn create_reference_mmi_cmd(&self) -> Option<String> {
|
|
|
|
|
- if std::path::Path::new(&self.config.align.ref_mmi).exists() {
|
|
|
|
|
- None
|
|
|
|
|
- } else {
|
|
|
|
|
- Some(format!(
|
|
|
|
|
- "minimap2 -x map-ont -d {} {}",
|
|
|
|
|
- self.config.align.ref_mmi, self.config.align.ref_fa
|
|
|
|
|
- ))
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- /// Dorado + samtools pipeline for basecalling + alignment
|
|
|
|
|
- fn basecall_align_cmd(&self, dorado_bin: &str) -> anyhow::Result<String> {
|
|
|
|
|
- let pod_dir = &self.case.pod_dir;
|
|
|
|
|
- let ref_fa = &self.config.align.ref_fa;
|
|
|
|
|
- let bam = &self.bam;
|
|
|
|
|
- let samtools = &self.config.align.samtools_bin;
|
|
|
|
|
- let samtools_view_threads = self.config.align.samtools_view_threads;
|
|
|
|
|
- let samtools_sort_threads = self.config.align.samtools_sort_threads;
|
|
|
|
|
- let dorado_arg = self.config.align.dorado_basecall_arg.clone();
|
|
|
|
|
-
|
|
|
|
|
- let pod_path = fs::read_dir(pod_dir)
|
|
|
|
|
- .map_err(|e| anyhow::anyhow!("Failed to read pod5 dir: {}.\n\t{e}", pod_dir.display()))?
|
|
|
|
|
- .filter_map(|p| p.ok())
|
|
|
|
|
- .map(|p| p.path())
|
|
|
|
|
- .filter(|p| p.extension().unwrap() == "pod5")
|
|
|
|
|
- .take(1)
|
|
|
|
|
- .collect::<Vec<PathBuf>>()
|
|
|
|
|
- .pop()
|
|
|
|
|
- .unwrap();
|
|
|
|
|
-
|
|
|
|
|
- let sequencing_kit = Pod5Info::from_pod5(pod_path.to_str().unwrap())
|
|
|
|
|
- .sequencing_kit
|
|
|
|
|
- .to_uppercase();
|
|
|
|
|
-
|
|
|
|
|
- let dorado = format!(
|
|
|
|
|
- "{dorado_bin} basecaller --kit-name {sequencing_kit} {dorado_arg} {} --trim all --emit-moves --reference {ref_fa}",
|
|
|
|
|
- pod_dir.display()
|
|
|
|
|
- );
|
|
|
|
|
- info!("Dorado command: {dorado}");
|
|
|
|
|
-
|
|
|
|
|
- let samtools_view = format!("{samtools} view -h -@ {samtools_view_threads} -b /dev/stdin");
|
|
|
|
|
- let samtools_sort =
|
|
|
|
|
- format!("{samtools} sort -@ {samtools_sort_threads} /dev/stdin -o {bam}");
|
|
|
|
|
-
|
|
|
|
|
- Ok(format!("{dorado} | {samtools_view} | {samtools_sort}"))
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- /// samtools index command
|
|
|
|
|
- fn index_cmd(&self) -> String {
|
|
|
|
|
- let t = self.config.align.samtools_view_threads.to_string();
|
|
|
|
|
- format!(
|
|
|
|
|
- "{} index -@ {t} {}",
|
|
|
|
|
- self.config.align.samtools_bin, self.bam
|
|
|
|
|
- )
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- /// cramino QC command
|
|
|
|
|
- fn cramino_cmd(&self) -> String {
|
|
|
|
|
- format!("cramino -t 150 --karyotype {}", self.bam)
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- /// modkit summary command
|
|
|
|
|
- fn modkit_cmd(&self) -> String {
|
|
|
|
|
- format!("modkit summary -t 50 {}", self.bam)
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- /// fastq export pipeline from BAM
|
|
|
|
|
- fn create_fastq_cmd(&self) -> String {
|
|
|
|
|
- let bam = &self.bam;
|
|
|
|
|
- let fastq = format!(
|
|
|
|
|
- "{}/{}/{}/{}_{}.fastq.gz",
|
|
|
|
|
- self.case_dir, self.case.id, self.case.time_point, self.case.id, self.case.time_point
|
|
|
|
|
- );
|
|
|
|
|
- let samtools = format!("samtools fastq -@ 150 {bam}");
|
|
|
|
|
- let crabz = format!("crabz -f bgzf - -o {fastq}");
|
|
|
|
|
- format!("{samtools} | {crabz}")
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- /// samtools merge command used in `merge_bam`
|
|
|
|
|
- fn merge_bam_cmd(&self, bam: &Path, into: &Path) -> String {
|
|
|
|
|
- format!(
|
|
|
|
|
- "{} merge -@ 160 -h {} {} {} {}",
|
|
|
|
|
- self.config.align.samtools_bin,
|
|
|
|
|
- bam.display(),
|
|
|
|
|
- into.display(),
|
|
|
|
|
- bam.display(),
|
|
|
|
|
- into.display() // placeholder, real tmp path is managed outside
|
|
|
|
|
- )
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // mux basecall + samtools view into muxed.bam
|
|
|
|
|
- fn from_mux_basecall_cmd(
|
|
|
|
|
- config: &Config,
|
|
|
|
|
- sequencing_kit: &str,
|
|
|
|
|
- pod_dir: &str,
|
|
|
|
|
- muxed_bam: &str,
|
|
|
|
|
- ) -> String {
|
|
|
|
|
- let dorado_bin = &config.align.dorado_bin;
|
|
|
|
|
- let dorado_arg = &config.align.dorado_basecall_arg;
|
|
|
|
|
- let ref_mmi = &config.align.ref_mmi;
|
|
|
|
|
- let samtools_bin = &config.align.samtools_bin;
|
|
|
|
|
- let samtools_view_threads = config.align.samtools_view_threads;
|
|
|
|
|
-
|
|
|
|
|
- let dorado = format!(
|
|
|
|
|
- "{dorado_bin} basecaller --kit-name {sequencing_kit} {dorado_arg} {pod_dir} --emit-moves --trim all --reference {ref_mmi}"
|
|
|
|
|
- );
|
|
|
|
|
- let samtools_view =
|
|
|
|
|
- format!("{samtools_bin} view -h -@ {samtools_view_threads} -b -o {muxed_bam}");
|
|
|
|
|
- format!("{dorado} | {samtools_view}")
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- /// samtools split command for demux
|
|
|
|
|
- fn demux_cmd(config: &Config, muxed_bam: &str, tmp_demux_dir: &str) -> String {
|
|
|
|
|
- format!(
|
|
|
|
|
- "{} split -@ {} -f '{}/%*_%!.bam' {}",
|
|
|
|
|
- config.align.samtools_bin, config.align.samtools_view_threads, tmp_demux_dir, muxed_bam
|
|
|
|
|
- )
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- /// dorado aligner + samtools for realignment in from_mux
|
|
|
|
|
- fn realign_cmd(
|
|
|
|
|
- config: &Config,
|
|
|
|
|
- sequencing_kit: &str,
|
|
|
|
|
- barcode: &str,
|
|
|
|
|
- bam: &str,
|
|
|
|
|
- aligned_bam: &str,
|
|
|
|
|
- ) -> String {
|
|
|
|
|
- let dorado = format!(
|
|
|
|
|
- "{} aligner --threads {} {} {}",
|
|
|
|
|
- config.align.dorado_bin, config.align.dorado_aligner_threads, config.align.ref_fa, bam,
|
|
|
|
|
- );
|
|
|
|
|
- let samtools_view = format!(
|
|
|
|
|
- "{} view -h -@ {} -b /dev/stdin",
|
|
|
|
|
- config.align.samtools_bin, config.align.samtools_view_threads
|
|
|
|
|
- );
|
|
|
|
|
- let samtools_sort = format!(
|
|
|
|
|
- "{} sort -@ {} /dev/stdin -o {}",
|
|
|
|
|
- config.align.samtools_bin, config.align.samtools_sort_threads, aligned_bam
|
|
|
|
|
- );
|
|
|
|
|
- let _ = sequencing_kit; // not used here but kept for symmetry
|
|
|
|
|
- format!("{dorado} | {samtools_view} | {samtools_sort}")
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // ------------------------------------------------------------------
|
|
|
|
|
- // Workflow methods that now *run* the commands
|
|
|
|
|
- // ------------------------------------------------------------------
|
|
|
|
|
-
|
|
|
|
|
- fn create_reference_mmi(&self) -> anyhow::Result<()> {
|
|
|
|
|
- if let Some(cmdline) = self.create_reference_mmi_cmd() {
|
|
|
|
|
- Self::run_shell(&cmdline)?;
|
|
|
|
|
- }
|
|
|
|
|
- Ok(())
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- fn create_directories(&self) -> anyhow::Result<()> {
|
|
|
|
|
- if !std::path::Path::new(&self.case_dir).exists() {
|
|
|
|
|
- info!("Creating directory {}", self.case_dir);
|
|
|
|
|
- fs::create_dir(&self.case_dir)?;
|
|
|
|
|
- }
|
|
|
|
|
- if !std::path::Path::new(&self.time_dir).exists() {
|
|
|
|
|
- info!("Creating directory {}", self.time_dir);
|
|
|
|
|
- fs::create_dir(&self.time_dir)?;
|
|
|
|
|
- }
|
|
|
|
|
- Ok(())
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- fn basecall_align(&mut self, dorado_bin: &str) -> anyhow::Result<()> {
|
|
|
|
|
- let pipe = self.basecall_align_cmd(dorado_bin)?;
|
|
|
|
|
- Self::run_shell(&pipe)
|
|
|
|
|
- .map_err(|e| anyhow::anyhow!("Failed to run pipe: {pipe}.\n\t{}", e.to_string()))
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- pub fn index(&self) -> anyhow::Result<()> {
|
|
|
|
|
- let cmdline = self.index_cmd();
|
|
|
|
|
- info!("Running samtools index for {}", self.bam);
|
|
|
|
|
- Self::run_shell(&cmdline)
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- pub fn run_cramino(&self) -> anyhow::Result<()> {
|
|
|
|
|
- let cramino_out = format!(
|
|
|
|
|
- "{}/{}_{}_hs1_cramino.txt",
|
|
|
|
|
- self.time_dir, self.case.id, self.case.time_point
|
|
|
|
|
- );
|
|
|
|
|
- info!("Quality control with cramino for BAM: {}", self.bam);
|
|
|
|
|
- let cmdline = self.cramino_cmd();
|
|
|
|
|
-
|
|
|
|
|
- let output = cmd!("bash", "-c", &cmdline)
|
|
|
|
|
- .stdout_capture()
|
|
|
|
|
- .unchecked()
|
|
|
|
|
- .run()?;
|
|
|
|
|
-
|
|
|
|
|
- fs::write(cramino_out, output.stdout)?;
|
|
|
|
|
- Ok(())
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- pub fn run_modkit(&self) -> anyhow::Result<()> {
|
|
|
|
|
- let mod_summary = format!(
|
|
|
|
|
- "{}/{}_{}_5mC_5hmC_summary.txt",
|
|
|
|
|
- self.time_dir, self.case.id, self.case.time_point
|
|
|
|
|
- );
|
|
|
|
|
- info!("Generating base modification summary for BAM: {}", self.bam);
|
|
|
|
|
- let cmdline = self.modkit_cmd();
|
|
|
|
|
-
|
|
|
|
|
- let output = cmd!("bash", "-c", &cmdline)
|
|
|
|
|
- .stdout_capture()
|
|
|
|
|
- .unchecked()
|
|
|
|
|
- .run()?;
|
|
|
|
|
-
|
|
|
|
|
- fs::write(mod_summary, output.stdout)?;
|
|
|
|
|
- Ok(())
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- pub fn create_fastq(&self) -> anyhow::Result<()> {
|
|
|
|
|
- let fastq = format!(
|
|
|
|
|
- "{}/{}/{}/{}_{}.fastq.gz",
|
|
|
|
|
- self.case_dir, self.case.id, self.case.time_point, self.case.id, self.case.time_point
|
|
|
|
|
- );
|
|
|
|
|
- if !std::path::Path::new(&fastq).exists() {
|
|
|
|
|
- let pipe = self.create_fastq_cmd();
|
|
|
|
|
- Self::run_shell(&pipe)?;
|
|
|
|
|
- }
|
|
|
|
|
- Ok(())
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- pub fn merge_bam(&self, bam: &Path) -> anyhow::Result<()> {
|
|
|
|
|
- let composition_a: Vec<String> = bam_composition(bam.to_string_lossy().as_ref(), 20000)?
|
|
|
|
|
- .iter()
|
|
|
|
|
- .map(|(i, _, _)| i.clone())
|
|
|
|
|
- .collect();
|
|
|
|
|
- let composition_b: Vec<String> = bam_composition(&self.bam, 20000)?
|
|
|
|
|
- .iter()
|
|
|
|
|
- .map(|(i, _, _)| i.clone())
|
|
|
|
|
- .collect();
|
|
|
|
|
- let n_id = composition_a
|
|
|
|
|
- .iter()
|
|
|
|
|
- .filter(|id| composition_b.contains(id))
|
|
|
|
|
- .count();
|
|
|
|
|
- if n_id > 0 {
|
|
|
|
|
- warn!(
|
|
|
|
|
- "{} is already merged, reads with the same run_id in the destination BAM.",
|
|
|
|
|
- self.case.id
|
|
|
|
|
- );
|
|
|
|
|
- return Ok(());
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- let into = PathBuf::from(&self.bam);
|
|
|
|
|
- let dir = into.parent().unwrap();
|
|
|
|
|
-
|
|
|
|
|
- let original_file = into.file_name().unwrap().to_string_lossy().to_string();
|
|
|
|
|
- let original_i = dir.join(format!("{original_file}.bai"));
|
|
|
|
|
- if !original_i.exists() {
|
|
|
|
|
- self.index()?;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- let tmp_original_file = format!("{}.bam", Uuid::new_v4());
|
|
|
|
|
- let tmp_original = dir.join(tmp_original_file.clone());
|
|
|
|
|
- let tmp_original_i = dir.join(format!("{tmp_original_file}.bai"));
|
|
|
|
|
-
|
|
|
|
|
- info!("Moving {} to {}", &into.display(), &tmp_original.display());
|
|
|
|
|
- fs::rename(&into, &tmp_original)?;
|
|
|
|
|
- info!(
|
|
|
|
|
- "Moving {} to {}",
|
|
|
|
|
- &original_i.display(),
|
|
|
|
|
- &tmp_original_i.display()
|
|
|
|
|
- );
|
|
|
|
|
- fs::rename(original_i, tmp_original_i.clone())?;
|
|
|
|
|
-
|
|
|
|
|
- // real merge command with the correct tmp path
|
|
|
|
|
- let merge_cmdline = format!(
|
|
|
|
|
- "{} merge -@ 160 -h {} {} {} {}",
|
|
|
|
|
- self.config.align.samtools_bin,
|
|
|
|
|
- bam.display(),
|
|
|
|
|
- into.display(),
|
|
|
|
|
- bam.display(),
|
|
|
|
|
- tmp_original.display()
|
|
|
|
|
- );
|
|
|
|
|
- info!("Running {merge_cmdline}");
|
|
|
|
|
- Self::run_shell(&merge_cmdline)?;
|
|
|
|
|
-
|
|
|
|
|
- fs::remove_file(tmp_original)?;
|
|
|
|
|
- fs::remove_file(tmp_original_i)?;
|
|
|
|
|
- fs::remove_file(bam)?;
|
|
|
|
|
-
|
|
|
|
|
- self.index()?;
|
|
|
|
|
- Ok(())
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- pub fn from_mux(cases: Vec<FlowCellCase>, config: Config) -> anyhow::Result<()> {
|
|
|
|
|
- // tmp dir
|
|
|
|
|
- let tmp_dir = format!("{}/.{}", config.result_dir, Uuid::new_v4());
|
|
|
|
|
- info!("Creating tmp dir {tmp_dir}");
|
|
|
|
|
- fs::create_dir(&tmp_dir)?;
|
|
|
|
|
-
|
|
|
|
|
- // basecalling into muxed.bam
|
|
|
|
|
- let muxed_bam = format!("{tmp_dir}/muxed.bam");
|
|
|
|
|
- let pod_dir = cases[0].pod_dir.display().to_string();
|
|
|
|
|
-
|
|
|
|
|
- let muxed_pod_dir = &cases.first().unwrap().pod_dir;
|
|
|
|
|
- let pod_path = fs::read_dir(muxed_pod_dir)
|
|
|
|
|
- .map_err(|e| {
|
|
|
|
|
- anyhow::anyhow!(
|
|
|
|
|
- "Failed to read pod5 dir: {}.\n\t{e}",
|
|
|
|
|
- muxed_pod_dir.display()
|
|
|
|
|
- )
|
|
|
|
|
- })?
|
|
|
|
|
- .filter_map(|p| p.ok())
|
|
|
|
|
- .map(|p| p.path())
|
|
|
|
|
- .filter(|p| p.extension().unwrap() == "pod5")
|
|
|
|
|
- .take(1)
|
|
|
|
|
- .collect::<Vec<PathBuf>>()
|
|
|
|
|
- .pop()
|
|
|
|
|
- .unwrap();
|
|
|
|
|
- let sequencing_kit = Pod5Info::from_pod5(pod_path.to_str().unwrap())
|
|
|
|
|
- .sequencing_kit
|
|
|
|
|
- .to_uppercase();
|
|
|
|
|
-
|
|
|
|
|
- let basecall_pipe =
|
|
|
|
|
- Self::from_mux_basecall_cmd(&config, &sequencing_kit, &pod_dir, &muxed_bam);
|
|
|
|
|
- info!("Running: {basecall_pipe}");
|
|
|
|
|
- Self::run_shell(&basecall_pipe)?;
|
|
|
|
|
- info!("Basecalling ✅");
|
|
|
|
|
-
|
|
|
|
|
- // demux
|
|
|
|
|
- let tmp_demux_dir = format!("{tmp_dir}/demuxed");
|
|
|
|
|
- fs::create_dir(&tmp_demux_dir)?;
|
|
|
|
|
- let demux_cmdline = Self::demux_cmd(&config, &muxed_bam, &tmp_demux_dir);
|
|
|
|
|
- info!("Demux from {sequencing_kit} into {tmp_demux_dir}");
|
|
|
|
|
- info!("Running: {demux_cmdline}");
|
|
|
|
|
- Self::run_shell(&demux_cmdline)?;
|
|
|
|
|
- info!("Demux ✅");
|
|
|
|
|
-
|
|
|
|
|
- for case in cases.iter() {
|
|
|
|
|
- let barcode = case.barcode.replace("NB", "");
|
|
|
|
|
- let bam = find_unique_file(
|
|
|
|
|
- &tmp_demux_dir,
|
|
|
|
|
- &format!("{sequencing_kit}_barcode{}.bam", barcode),
|
|
|
|
|
- )?;
|
|
|
|
|
-
|
|
|
|
|
- let aligned_bam = if !config.align.dorado_should_realign {
|
|
|
|
|
- bam.clone()
|
|
|
|
|
- } else {
|
|
|
|
|
- let aligned_bam = format!(
|
|
|
|
|
- "{tmp_demux_dir}/{sequencing_kit}_barcode{}_aligned.bam",
|
|
|
|
|
- barcode
|
|
|
|
|
- );
|
|
|
|
|
- let pipe =
|
|
|
|
|
- Self::realign_cmd(&config, &sequencing_kit, &barcode, &bam, &aligned_bam);
|
|
|
|
|
- info!("Running {pipe}");
|
|
|
|
|
- Self::run_shell(&pipe)?;
|
|
|
|
|
- info!("Alignement ✅");
|
|
|
|
|
- aligned_bam.into()
|
|
|
|
|
- };
|
|
|
|
|
-
|
|
|
|
|
- let d = Dorado::init(case.clone(), config.clone())?;
|
|
|
|
|
- d.create_directories()?;
|
|
|
|
|
-
|
|
|
|
|
- if PathBuf::from(&d.bam).exists() {
|
|
|
|
|
- info!("Merging");
|
|
|
|
|
- d.merge_bam(&PathBuf::from(aligned_bam))?;
|
|
|
|
|
- } else {
|
|
|
|
|
- info!("Moving from {} to {}", bam, d.bam);
|
|
|
|
|
- fs::rename(aligned_bam, d.bam.clone())?;
|
|
|
|
|
- d.index()?;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- d.run_cramino()?;
|
|
|
|
|
- d.run_modkit()?;
|
|
|
|
|
- }
|
|
|
|
|
- fs::remove_dir_all(tmp_dir)?;
|
|
|
|
|
-
|
|
|
|
|
- Ok(())
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- pub fn run_pipe(&mut self) -> anyhow::Result<()> {
|
|
|
|
|
- let start_time = std::time::SystemTime::now();
|
|
|
|
|
- self.start_time = start_time;
|
|
|
|
|
-
|
|
|
|
|
- debug!("Running Dorado with config: {:#?}", self.config);
|
|
|
|
|
- let dorado_bin = self.config.align.dorado_bin.clone();
|
|
|
|
|
-
|
|
|
|
|
- self.create_reference_mmi()?;
|
|
|
|
|
- self.create_directories()?;
|
|
|
|
|
-
|
|
|
|
|
- info!(
|
|
|
|
|
- "Reading {} pod5 from: {}",
|
|
|
|
|
- self.case.time_point, self.config.pod_dir
|
|
|
|
|
- );
|
|
|
|
|
- let bam_path = std::path::Path::new(&self.bam);
|
|
|
|
|
-
|
|
|
|
|
- if !bam_path.exists() {
|
|
|
|
|
- info!("Creating new bam file");
|
|
|
|
|
- self.basecall_align(&dorado_bin)?;
|
|
|
|
|
- self.index()?;
|
|
|
|
|
- } else {
|
|
|
|
|
- let new_bam_path = bam_path
|
|
|
|
|
- .parent()
|
|
|
|
|
- .unwrap()
|
|
|
|
|
- .join(format!("{}.bam", Uuid::new_v4()));
|
|
|
|
|
- warn!("Creating new bam {}", new_bam_path.display());
|
|
|
|
|
-
|
|
|
|
|
- let bam = self.bam.clone();
|
|
|
|
|
- self.bam = new_bam_path.clone().to_string_lossy().to_string();
|
|
|
|
|
- self.basecall_align(&dorado_bin)?;
|
|
|
|
|
- self.bam.clone_from(&bam);
|
|
|
|
|
- self.merge_bam(&new_bam_path)?;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- self.run_cramino()?;
|
|
|
|
|
- self.run_modkit()?;
|
|
|
|
|
- // self.create_fastq()?;
|
|
|
|
|
-
|
|
|
|
|
- let end_time = std::time::SystemTime::now();
|
|
|
|
|
- self.end_time = end_time;
|
|
|
|
|
- let execution_time = end_time.duration_since(start_time).unwrap().as_secs_f64();
|
|
|
|
|
- info!(
|
|
|
|
|
- "Dorado and Minimap2 execution time: {} seconds",
|
|
|
|
|
- execution_time
|
|
|
|
|
- );
|
|
|
|
|
- self.is_done = true;
|
|
|
|
|
-
|
|
|
|
|
- Ok(())
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // from_flowcell stays mostly as-is; it just calls run_pipe/from_mux
|
|
|
|
|
- pub fn from_flowcell(flowcell: &FlowCell, config: &Config) -> anyhow::Result<()> {
|
|
|
|
|
- let tp_conv = |time_point: &str| -> String {
|
|
|
|
|
- match time_point {
|
|
|
|
|
- "normal" => config.normal_name.clone(),
|
|
|
|
|
- "tumoral" => config.tumoral_name.clone(),
|
|
|
|
|
- _ => panic!("Error time point name"),
|
|
|
|
|
- }
|
|
|
|
|
- };
|
|
|
|
|
- use crate::collection::flowcells::FlowCellLocation::*;
|
|
|
|
|
- let base_pod_dir = match &flowcell.location {
|
|
|
|
|
- Local(_) => None,
|
|
|
|
|
- Archived(pod_tar) => {
|
|
|
|
|
- let file = File::open(pod_tar)
|
|
|
|
|
- .map_err(|e| anyhow::anyhow!("Failed to open tar file: {pod_tar}\n\t{e}"))?;
|
|
|
|
|
- let mut archive = tar::Archive::new(file);
|
|
|
|
|
- info!("Un-tar of archived {pod_tar}");
|
|
|
|
|
- archive
|
|
|
|
|
- .unpack(&config.unarchive_tmp_dir)
|
|
|
|
|
- .map_err(|e| anyhow::anyhow!("Failed to un-tar: {pod_tar}\n\t{e}"))?;
|
|
|
|
|
- info!("Un-tar of archived {pod_tar} Done.");
|
|
|
|
|
-
|
|
|
|
|
- Some(config.unarchive_tmp_dir.to_string())
|
|
|
|
|
- }
|
|
|
|
|
- };
|
|
|
|
|
-
|
|
|
|
|
- use crate::collection::flowcells::FlowCellExperiment::*;
|
|
|
|
|
- match &flowcell.experiment {
|
|
|
|
|
- WGSPod5Mux(pod_dir) => {
|
|
|
|
|
- let pod_dir = if let Some(base_pod_dir) = &base_pod_dir {
|
|
|
|
|
- format!("{base_pod_dir}/{pod_dir}")
|
|
|
|
|
- } else {
|
|
|
|
|
- pod_dir.clone()
|
|
|
|
|
- };
|
|
|
|
|
-
|
|
|
|
|
- let cases = flowcell
|
|
|
|
|
- .cases
|
|
|
|
|
- .iter()
|
|
|
|
|
- .map(|c| FlowCellCase {
|
|
|
|
|
- id: c.case_id.clone(),
|
|
|
|
|
- time_point: tp_conv(&c.sample_type),
|
|
|
|
|
- barcode: c.barcode.clone(),
|
|
|
|
|
- pod_dir: pod_dir.clone().into(),
|
|
|
|
|
- })
|
|
|
|
|
- .collect();
|
|
|
|
|
- info!("Starting basecaller for muxed pod5: {cases:#?}");
|
|
|
|
|
-
|
|
|
|
|
- Dorado::from_mux(cases, config.clone())?;
|
|
|
|
|
- }
|
|
|
|
|
- WGSPod5Demux(pod_dir) => {
|
|
|
|
|
- let pod_dir = if let Some(base_pod_dir) = &base_pod_dir {
|
|
|
|
|
- format!("{base_pod_dir}/{pod_dir}")
|
|
|
|
|
- } else {
|
|
|
|
|
- pod_dir.clone()
|
|
|
|
|
- };
|
|
|
|
|
-
|
|
|
|
|
- for c in flowcell.cases.iter() {
|
|
|
|
|
- let pod_dir = format!("{pod_dir}/barcode{}", c.barcode.replace("NB", ""));
|
|
|
|
|
- info!("Starting basecaller for demuxed pod5: {pod_dir}");
|
|
|
|
|
- let mut d = Dorado::init(
|
|
|
|
|
- FlowCellCase {
|
|
|
|
|
- id: c.case_id.clone(),
|
|
|
|
|
- time_point: tp_conv(&c.sample_type),
|
|
|
|
|
- barcode: c.barcode.clone(),
|
|
|
|
|
- pod_dir: pod_dir.into(),
|
|
|
|
|
- },
|
|
|
|
|
- config.clone(),
|
|
|
|
|
- )?;
|
|
|
|
|
- d.run_pipe()?;
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- Ok(())
|
|
|
|
|
- }
|
|
|
|
|
-}
|
|
|
|
|
|
|
+// pub struct Dorado {
|
|
|
|
|
+// config: Config,
|
|
|
|
|
+// case: FlowCellCase,
|
|
|
|
|
+// case_dir: String,
|
|
|
|
|
+// time_dir: String,
|
|
|
|
|
+// bam: String,
|
|
|
|
|
+// start_time: SystemTime,
|
|
|
|
|
+// end_time: SystemTime,
|
|
|
|
|
+// is_done: bool,
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// impl Dorado {
|
|
|
|
|
+// pub fn init(case: FlowCellCase, config: Config) -> anyhow::Result<Self> {
|
|
|
|
|
+// let data_dir = &config.result_dir;
|
|
|
|
|
+// let case_dir = format!("{}/{}", data_dir, case.id);
|
|
|
|
|
+// let time_dir = format!("{}/{}", case_dir, case.time_point);
|
|
|
|
|
+// let bam = format!("{}/{}_{}_hs1.bam", time_dir, case.id, case.time_point);
|
|
|
|
|
+// debug!("Dorado init with config: {config:#?}");
|
|
|
|
|
+// info!("Final BAM file: {bam}");
|
|
|
|
|
+//
|
|
|
|
|
+// Ok(Self {
|
|
|
|
|
+// config,
|
|
|
|
|
+// start_time: SystemTime::now(),
|
|
|
|
|
+// end_time: SystemTime::now(),
|
|
|
|
|
+// is_done: false,
|
|
|
|
|
+// case_dir,
|
|
|
|
|
+// time_dir,
|
|
|
|
|
+// bam,
|
|
|
|
|
+// case,
|
|
|
|
|
+// })
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// // ------------------------------------------------------------------
|
|
|
|
|
+// // Small helper to actually execute a shell command
|
|
|
|
|
+// // ------------------------------------------------------------------
|
|
|
|
|
+// fn run_shell(cmdline: &str) -> anyhow::Result<()> {
|
|
|
|
|
+// info!("Running: {cmdline}");
|
|
|
|
|
+// cmd!("bash", "-c", cmdline)
|
|
|
|
|
+// .run()
|
|
|
|
|
+// .map_err(|e| anyhow::anyhow!("Failed to run: {cmdline}\n\t{}", e.to_string()))?;
|
|
|
|
|
+// Ok(())
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// // ------------------------------------------------------------------
|
|
|
|
|
+// // Command builders (return strings)
|
|
|
|
|
+// // ------------------------------------------------------------------
|
|
|
|
|
+//
|
|
|
|
|
+// /// minimap2 index creation (returns None if index already exists)
|
|
|
|
|
+// fn create_reference_mmi_cmd(&self) -> Option<String> {
|
|
|
|
|
+// if std::path::Path::new(&self.config.align.ref_mmi).exists() {
|
|
|
|
|
+// None
|
|
|
|
|
+// } else {
|
|
|
|
|
+// Some(format!(
|
|
|
|
|
+// "minimap2 -x map-ont -d {} {}",
|
|
|
|
|
+// self.config.align.ref_mmi, self.config.align.ref_fa
|
|
|
|
|
+// ))
|
|
|
|
|
+// }
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// /// Dorado + samtools pipeline for basecalling + alignment
|
|
|
|
|
+// fn basecall_align_cmd(&self, dorado_bin: &str) -> anyhow::Result<String> {
|
|
|
|
|
+// let pod_dir = &self.case.pod_dir;
|
|
|
|
|
+// let ref_fa = &self.config.align.ref_fa;
|
|
|
|
|
+// let bam = &self.bam;
|
|
|
|
|
+// let samtools = &self.config.align.samtools_bin;
|
|
|
|
|
+// let samtools_view_threads = self.config.align.samtools_view_threads;
|
|
|
|
|
+// let samtools_sort_threads = self.config.align.samtools_sort_threads;
|
|
|
|
|
+// let dorado_arg = self.config.align.dorado_basecall_arg.clone();
|
|
|
|
|
+//
|
|
|
|
|
+// let pod_path = fs::read_dir(pod_dir)
|
|
|
|
|
+// .map_err(|e| anyhow::anyhow!("Failed to read pod5 dir: {}.\n\t{e}", pod_dir.display()))?
|
|
|
|
|
+// .filter_map(|p| p.ok())
|
|
|
|
|
+// .map(|p| p.path())
|
|
|
|
|
+// .filter(|p| p.extension().unwrap() == "pod5")
|
|
|
|
|
+// .take(1)
|
|
|
|
|
+// .collect::<Vec<PathBuf>>()
|
|
|
|
|
+// .pop()
|
|
|
|
|
+// .unwrap();
|
|
|
|
|
+//
|
|
|
|
|
+// let sequencing_kit = Pod5Info::from_pod5(pod_path.to_str().unwrap())
|
|
|
|
|
+// .sequencing_kit
|
|
|
|
|
+// .to_uppercase();
|
|
|
|
|
+//
|
|
|
|
|
+// let dorado = format!(
|
|
|
|
|
+// "{dorado_bin} basecaller --kit-name {sequencing_kit} {dorado_arg} {} --trim all --emit-moves --reference {ref_fa}",
|
|
|
|
|
+// pod_dir.display()
|
|
|
|
|
+// );
|
|
|
|
|
+// info!("Dorado command: {dorado}");
|
|
|
|
|
+//
|
|
|
|
|
+// let samtools_view = format!("{samtools} view -h -@ {samtools_view_threads} -b /dev/stdin");
|
|
|
|
|
+// let samtools_sort =
|
|
|
|
|
+// format!("{samtools} sort -@ {samtools_sort_threads} /dev/stdin -o {bam}");
|
|
|
|
|
+//
|
|
|
|
|
+// Ok(format!("{dorado} | {samtools_view} | {samtools_sort}"))
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// /// samtools index command
|
|
|
|
|
+// fn index_cmd(&self) -> String {
|
|
|
|
|
+// let t = self.config.align.samtools_view_threads.to_string();
|
|
|
|
|
+// format!(
|
|
|
|
|
+// "{} index -@ {t} {}",
|
|
|
|
|
+// self.config.align.samtools_bin, self.bam
|
|
|
|
|
+// )
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// /// cramino QC command
|
|
|
|
|
+// fn cramino_cmd(&self) -> String {
|
|
|
|
|
+// format!("cramino -t 150 --karyotype {}", self.bam)
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// /// modkit summary command
|
|
|
|
|
+// fn modkit_cmd(&self) -> String {
|
|
|
|
|
+// format!("modkit summary -t 50 {}", self.bam)
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// /// fastq export pipeline from BAM
|
|
|
|
|
+// fn create_fastq_cmd(&self) -> String {
|
|
|
|
|
+// let bam = &self.bam;
|
|
|
|
|
+// let fastq = format!(
|
|
|
|
|
+// "{}/{}/{}/{}_{}.fastq.gz",
|
|
|
|
|
+// self.case_dir, self.case.id, self.case.time_point, self.case.id, self.case.time_point
|
|
|
|
|
+// );
|
|
|
|
|
+// let samtools = format!("samtools fastq -@ 150 {bam}");
|
|
|
|
|
+// let crabz = format!("crabz -f bgzf - -o {fastq}");
|
|
|
|
|
+// format!("{samtools} | {crabz}")
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// /// samtools merge command used in `merge_bam`
|
|
|
|
|
+// fn merge_bam_cmd(&self, bam: &Path, into: &Path) -> String {
|
|
|
|
|
+// format!(
|
|
|
|
|
+// "{} merge -@ 160 -h {} {} {} {}",
|
|
|
|
|
+// self.config.align.samtools_bin,
|
|
|
|
|
+// bam.display(),
|
|
|
|
|
+// into.display(),
|
|
|
|
|
+// bam.display(),
|
|
|
|
|
+// into.display() // placeholder, real tmp path is managed outside
|
|
|
|
|
+// )
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// // mux basecall + samtools view into muxed.bam
|
|
|
|
|
+// fn from_mux_basecall_cmd(
|
|
|
|
|
+// config: &Config,
|
|
|
|
|
+// sequencing_kit: &str,
|
|
|
|
|
+// pod_dir: &str,
|
|
|
|
|
+// muxed_bam: &str,
|
|
|
|
|
+// ) -> String {
|
|
|
|
|
+// let dorado_bin = &config.align.dorado_bin;
|
|
|
|
|
+// let dorado_arg = &config.align.dorado_basecall_arg;
|
|
|
|
|
+// let ref_mmi = &config.align.ref_mmi;
|
|
|
|
|
+// let samtools_bin = &config.align.samtools_bin;
|
|
|
|
|
+// let samtools_view_threads = config.align.samtools_view_threads;
|
|
|
|
|
+//
|
|
|
|
|
+// let dorado = format!(
|
|
|
|
|
+// "{dorado_bin} basecaller --kit-name {sequencing_kit} {dorado_arg} {pod_dir} --emit-moves --trim all --reference {ref_mmi}"
|
|
|
|
|
+// );
|
|
|
|
|
+// let samtools_view =
|
|
|
|
|
+// format!("{samtools_bin} view -h -@ {samtools_view_threads} -b -o {muxed_bam}");
|
|
|
|
|
+// format!("{dorado} | {samtools_view}")
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// /// samtools split command for demux
|
|
|
|
|
+// fn demux_cmd(config: &Config, muxed_bam: &str, tmp_demux_dir: &str) -> String {
|
|
|
|
|
+// format!(
|
|
|
|
|
+// "{} split -@ {} -f '{}/%*_%!.bam' {}",
|
|
|
|
|
+// config.align.samtools_bin, config.align.samtools_view_threads, tmp_demux_dir, muxed_bam
|
|
|
|
|
+// )
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// /// dorado aligner + samtools for realignment in from_mux
|
|
|
|
|
+// fn realign_cmd(
|
|
|
|
|
+// config: &Config,
|
|
|
|
|
+// sequencing_kit: &str,
|
|
|
|
|
+// barcode: &str,
|
|
|
|
|
+// bam: &str,
|
|
|
|
|
+// aligned_bam: &str,
|
|
|
|
|
+// ) -> String {
|
|
|
|
|
+// let dorado = format!(
|
|
|
|
|
+// "{} aligner --threads {} {} {}",
|
|
|
|
|
+// config.align.dorado_bin, config.align.dorado_aligner_threads, config.align.ref_fa, bam,
|
|
|
|
|
+// );
|
|
|
|
|
+// let samtools_view = format!(
|
|
|
|
|
+// "{} view -h -@ {} -b /dev/stdin",
|
|
|
|
|
+// config.align.samtools_bin, config.align.samtools_view_threads
|
|
|
|
|
+// );
|
|
|
|
|
+// let samtools_sort = format!(
|
|
|
|
|
+// "{} sort -@ {} /dev/stdin -o {}",
|
|
|
|
|
+// config.align.samtools_bin, config.align.samtools_sort_threads, aligned_bam
|
|
|
|
|
+// );
|
|
|
|
|
+// let _ = sequencing_kit; // not used here but kept for symmetry
|
|
|
|
|
+// format!("{dorado} | {samtools_view} | {samtools_sort}")
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// // ------------------------------------------------------------------
|
|
|
|
|
+// // Workflow methods that now *run* the commands
|
|
|
|
|
+// // ------------------------------------------------------------------
|
|
|
|
|
+//
|
|
|
|
|
+// fn create_reference_mmi(&self) -> anyhow::Result<()> {
|
|
|
|
|
+// if let Some(cmdline) = self.create_reference_mmi_cmd() {
|
|
|
|
|
+// Self::run_shell(&cmdline)?;
|
|
|
|
|
+// }
|
|
|
|
|
+// Ok(())
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// fn create_directories(&self) -> anyhow::Result<()> {
|
|
|
|
|
+// if !std::path::Path::new(&self.case_dir).exists() {
|
|
|
|
|
+// info!("Creating directory {}", self.case_dir);
|
|
|
|
|
+// fs::create_dir(&self.case_dir)?;
|
|
|
|
|
+// }
|
|
|
|
|
+// if !std::path::Path::new(&self.time_dir).exists() {
|
|
|
|
|
+// info!("Creating directory {}", self.time_dir);
|
|
|
|
|
+// fs::create_dir(&self.time_dir)?;
|
|
|
|
|
+// }
|
|
|
|
|
+// Ok(())
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// fn basecall_align(&mut self, dorado_bin: &str) -> anyhow::Result<()> {
|
|
|
|
|
+// let pipe = self.basecall_align_cmd(dorado_bin)?;
|
|
|
|
|
+// Self::run_shell(&pipe)
|
|
|
|
|
+// .map_err(|e| anyhow::anyhow!("Failed to run pipe: {pipe}.\n\t{}", e.to_string()))
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// pub fn index(&self) -> anyhow::Result<()> {
|
|
|
|
|
+// let cmdline = self.index_cmd();
|
|
|
|
|
+// info!("Running samtools index for {}", self.bam);
|
|
|
|
|
+// Self::run_shell(&cmdline)
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// pub fn run_cramino(&self) -> anyhow::Result<()> {
|
|
|
|
|
+// let cramino_out = format!(
|
|
|
|
|
+// "{}/{}_{}_hs1_cramino.txt",
|
|
|
|
|
+// self.time_dir, self.case.id, self.case.time_point
|
|
|
|
|
+// );
|
|
|
|
|
+// info!("Quality control with cramino for BAM: {}", self.bam);
|
|
|
|
|
+// let cmdline = self.cramino_cmd();
|
|
|
|
|
+//
|
|
|
|
|
+// let output = cmd!("bash", "-c", &cmdline)
|
|
|
|
|
+// .stdout_capture()
|
|
|
|
|
+// .unchecked()
|
|
|
|
|
+// .run()?;
|
|
|
|
|
+//
|
|
|
|
|
+// fs::write(cramino_out, output.stdout)?;
|
|
|
|
|
+// Ok(())
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// pub fn run_modkit(&self) -> anyhow::Result<()> {
|
|
|
|
|
+// let mod_summary = format!(
|
|
|
|
|
+// "{}/{}_{}_5mC_5hmC_summary.txt",
|
|
|
|
|
+// self.time_dir, self.case.id, self.case.time_point
|
|
|
|
|
+// );
|
|
|
|
|
+// info!("Generating base modification summary for BAM: {}", self.bam);
|
|
|
|
|
+// let cmdline = self.modkit_cmd();
|
|
|
|
|
+//
|
|
|
|
|
+// let output = cmd!("bash", "-c", &cmdline)
|
|
|
|
|
+// .stdout_capture()
|
|
|
|
|
+// .unchecked()
|
|
|
|
|
+// .run()?;
|
|
|
|
|
+//
|
|
|
|
|
+// fs::write(mod_summary, output.stdout)?;
|
|
|
|
|
+// Ok(())
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// pub fn create_fastq(&self) -> anyhow::Result<()> {
|
|
|
|
|
+// let fastq = format!(
|
|
|
|
|
+// "{}/{}/{}/{}_{}.fastq.gz",
|
|
|
|
|
+// self.case_dir, self.case.id, self.case.time_point, self.case.id, self.case.time_point
|
|
|
|
|
+// );
|
|
|
|
|
+// if !std::path::Path::new(&fastq).exists() {
|
|
|
|
|
+// let pipe = self.create_fastq_cmd();
|
|
|
|
|
+// Self::run_shell(&pipe)?;
|
|
|
|
|
+// }
|
|
|
|
|
+// Ok(())
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// pub fn merge_bam(&self, bam: &Path) -> anyhow::Result<()> {
|
|
|
|
|
+// let composition_a: Vec<String> = bam_composition(bam.to_string_lossy().as_ref(), 20000)?
|
|
|
|
|
+// .iter()
|
|
|
|
|
+// .map(|(i, _, _)| i.clone())
|
|
|
|
|
+// .collect();
|
|
|
|
|
+// let composition_b: Vec<String> = bam_composition(&self.bam, 20000)?
|
|
|
|
|
+// .iter()
|
|
|
|
|
+// .map(|(i, _, _)| i.clone())
|
|
|
|
|
+// .collect();
|
|
|
|
|
+// let n_id = composition_a
|
|
|
|
|
+// .iter()
|
|
|
|
|
+// .filter(|id| composition_b.contains(id))
|
|
|
|
|
+// .count();
|
|
|
|
|
+// if n_id > 0 {
|
|
|
|
|
+// warn!(
|
|
|
|
|
+// "{} is already merged, reads with the same run_id in the destination BAM.",
|
|
|
|
|
+// self.case.id
|
|
|
|
|
+// );
|
|
|
|
|
+// return Ok(());
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// let into = PathBuf::from(&self.bam);
|
|
|
|
|
+// let dir = into.parent().unwrap();
|
|
|
|
|
+//
|
|
|
|
|
+// let original_file = into.file_name().unwrap().to_string_lossy().to_string();
|
|
|
|
|
+// let original_i = dir.join(format!("{original_file}.bai"));
|
|
|
|
|
+// if !original_i.exists() {
|
|
|
|
|
+// self.index()?;
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// let tmp_original_file = format!("{}.bam", Uuid::new_v4());
|
|
|
|
|
+// let tmp_original = dir.join(tmp_original_file.clone());
|
|
|
|
|
+// let tmp_original_i = dir.join(format!("{tmp_original_file}.bai"));
|
|
|
|
|
+//
|
|
|
|
|
+// info!("Moving {} to {}", &into.display(), &tmp_original.display());
|
|
|
|
|
+// fs::rename(&into, &tmp_original)?;
|
|
|
|
|
+// info!(
|
|
|
|
|
+// "Moving {} to {}",
|
|
|
|
|
+// &original_i.display(),
|
|
|
|
|
+// &tmp_original_i.display()
|
|
|
|
|
+// );
|
|
|
|
|
+// fs::rename(original_i, tmp_original_i.clone())?;
|
|
|
|
|
+//
|
|
|
|
|
+// // real merge command with the correct tmp path
|
|
|
|
|
+// let merge_cmdline = format!(
|
|
|
|
|
+// "{} merge -@ 160 -h {} {} {} {}",
|
|
|
|
|
+// self.config.align.samtools_bin,
|
|
|
|
|
+// bam.display(),
|
|
|
|
|
+// into.display(),
|
|
|
|
|
+// bam.display(),
|
|
|
|
|
+// tmp_original.display()
|
|
|
|
|
+// );
|
|
|
|
|
+// info!("Running {merge_cmdline}");
|
|
|
|
|
+// Self::run_shell(&merge_cmdline)?;
|
|
|
|
|
+//
|
|
|
|
|
+// fs::remove_file(tmp_original)?;
|
|
|
|
|
+// fs::remove_file(tmp_original_i)?;
|
|
|
|
|
+// fs::remove_file(bam)?;
|
|
|
|
|
+//
|
|
|
|
|
+// self.index()?;
|
|
|
|
|
+// Ok(())
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// pub fn from_mux(cases: Vec<FlowCellCase>, config: Config) -> anyhow::Result<()> {
|
|
|
|
|
+// // tmp dir
|
|
|
|
|
+// let tmp_dir = format!("{}/.{}", config.result_dir, Uuid::new_v4());
|
|
|
|
|
+// info!("Creating tmp dir {tmp_dir}");
|
|
|
|
|
+// fs::create_dir(&tmp_dir)?;
|
|
|
|
|
+//
|
|
|
|
|
+// // basecalling into muxed.bam
|
|
|
|
|
+// let muxed_bam = format!("{tmp_dir}/muxed.bam");
|
|
|
|
|
+// let pod_dir = cases[0].pod_dir.display().to_string();
|
|
|
|
|
+//
|
|
|
|
|
+// let muxed_pod_dir = &cases.first().unwrap().pod_dir;
|
|
|
|
|
+// let pod_path = fs::read_dir(muxed_pod_dir)
|
|
|
|
|
+// .map_err(|e| {
|
|
|
|
|
+// anyhow::anyhow!(
|
|
|
|
|
+// "Failed to read pod5 dir: {}.\n\t{e}",
|
|
|
|
|
+// muxed_pod_dir.display()
|
|
|
|
|
+// )
|
|
|
|
|
+// })?
|
|
|
|
|
+// .filter_map(|p| p.ok())
|
|
|
|
|
+// .map(|p| p.path())
|
|
|
|
|
+// .filter(|p| p.extension().unwrap() == "pod5")
|
|
|
|
|
+// .take(1)
|
|
|
|
|
+// .collect::<Vec<PathBuf>>()
|
|
|
|
|
+// .pop()
|
|
|
|
|
+// .unwrap();
|
|
|
|
|
+// let sequencing_kit = Pod5Info::from_pod5(pod_path.to_str().unwrap())
|
|
|
|
|
+// .sequencing_kit
|
|
|
|
|
+// .to_uppercase();
|
|
|
|
|
+//
|
|
|
|
|
+// let basecall_pipe =
|
|
|
|
|
+// Self::from_mux_basecall_cmd(&config, &sequencing_kit, &pod_dir, &muxed_bam);
|
|
|
|
|
+// info!("Running: {basecall_pipe}");
|
|
|
|
|
+// Self::run_shell(&basecall_pipe)?;
|
|
|
|
|
+// info!("Basecalling ✅");
|
|
|
|
|
+//
|
|
|
|
|
+// // demux
|
|
|
|
|
+// let tmp_demux_dir = format!("{tmp_dir}/demuxed");
|
|
|
|
|
+// fs::create_dir(&tmp_demux_dir)?;
|
|
|
|
|
+// let demux_cmdline = Self::demux_cmd(&config, &muxed_bam, &tmp_demux_dir);
|
|
|
|
|
+// info!("Demux from {sequencing_kit} into {tmp_demux_dir}");
|
|
|
|
|
+// info!("Running: {demux_cmdline}");
|
|
|
|
|
+// Self::run_shell(&demux_cmdline)?;
|
|
|
|
|
+// info!("Demux ✅");
|
|
|
|
|
+//
|
|
|
|
|
+// for case in cases.iter() {
|
|
|
|
|
+// let barcode = case.barcode.replace("NB", "");
|
|
|
|
|
+// let bam = find_unique_file(
|
|
|
|
|
+// &tmp_demux_dir,
|
|
|
|
|
+// &format!("{sequencing_kit}_barcode{}.bam", barcode),
|
|
|
|
|
+// )?;
|
|
|
|
|
+//
|
|
|
|
|
+// let aligned_bam = if !config.align.dorado_should_realign {
|
|
|
|
|
+// bam.clone()
|
|
|
|
|
+// } else {
|
|
|
|
|
+// let aligned_bam = format!(
|
|
|
|
|
+// "{tmp_demux_dir}/{sequencing_kit}_barcode{}_aligned.bam",
|
|
|
|
|
+// barcode
|
|
|
|
|
+// );
|
|
|
|
|
+// let pipe =
|
|
|
|
|
+// Self::realign_cmd(&config, &sequencing_kit, &barcode, &bam, &aligned_bam);
|
|
|
|
|
+// info!("Running {pipe}");
|
|
|
|
|
+// Self::run_shell(&pipe)?;
|
|
|
|
|
+// info!("Alignement ✅");
|
|
|
|
|
+// aligned_bam.into()
|
|
|
|
|
+// };
|
|
|
|
|
+//
|
|
|
|
|
+// let d = Dorado::init(case.clone(), config.clone())?;
|
|
|
|
|
+// d.create_directories()?;
|
|
|
|
|
+//
|
|
|
|
|
+// if PathBuf::from(&d.bam).exists() {
|
|
|
|
|
+// info!("Merging");
|
|
|
|
|
+// d.merge_bam(&PathBuf::from(aligned_bam))?;
|
|
|
|
|
+// } else {
|
|
|
|
|
+// info!("Moving from {} to {}", bam, d.bam);
|
|
|
|
|
+// fs::rename(aligned_bam, d.bam.clone())?;
|
|
|
|
|
+// d.index()?;
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// d.run_cramino()?;
|
|
|
|
|
+// d.run_modkit()?;
|
|
|
|
|
+// }
|
|
|
|
|
+// fs::remove_dir_all(tmp_dir)?;
|
|
|
|
|
+//
|
|
|
|
|
+// Ok(())
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// pub fn run_pipe(&mut self) -> anyhow::Result<()> {
|
|
|
|
|
+// let start_time = std::time::SystemTime::now();
|
|
|
|
|
+// self.start_time = start_time;
|
|
|
|
|
+//
|
|
|
|
|
+// debug!("Running Dorado with config: {:#?}", self.config);
|
|
|
|
|
+// let dorado_bin = self.config.align.dorado_bin.clone();
|
|
|
|
|
+//
|
|
|
|
|
+// self.create_reference_mmi()?;
|
|
|
|
|
+// self.create_directories()?;
|
|
|
|
|
+//
|
|
|
|
|
+// info!(
|
|
|
|
|
+// "Reading {} pod5 from: {}",
|
|
|
|
|
+// self.case.time_point, self.config.pod_dir
|
|
|
|
|
+// );
|
|
|
|
|
+// let bam_path = std::path::Path::new(&self.bam);
|
|
|
|
|
+//
|
|
|
|
|
+// if !bam_path.exists() {
|
|
|
|
|
+// info!("Creating new bam file");
|
|
|
|
|
+// self.basecall_align(&dorado_bin)?;
|
|
|
|
|
+// self.index()?;
|
|
|
|
|
+// } else {
|
|
|
|
|
+// let new_bam_path = bam_path
|
|
|
|
|
+// .parent()
|
|
|
|
|
+// .unwrap()
|
|
|
|
|
+// .join(format!("{}.bam", Uuid::new_v4()));
|
|
|
|
|
+// warn!("Creating new bam {}", new_bam_path.display());
|
|
|
|
|
+//
|
|
|
|
|
+// let bam = self.bam.clone();
|
|
|
|
|
+// self.bam = new_bam_path.clone().to_string_lossy().to_string();
|
|
|
|
|
+// self.basecall_align(&dorado_bin)?;
|
|
|
|
|
+// self.bam.clone_from(&bam);
|
|
|
|
|
+// self.merge_bam(&new_bam_path)?;
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// self.run_cramino()?;
|
|
|
|
|
+// self.run_modkit()?;
|
|
|
|
|
+// // self.create_fastq()?;
|
|
|
|
|
+//
|
|
|
|
|
+// let end_time = std::time::SystemTime::now();
|
|
|
|
|
+// self.end_time = end_time;
|
|
|
|
|
+// let execution_time = end_time.duration_since(start_time).unwrap().as_secs_f64();
|
|
|
|
|
+// info!(
|
|
|
|
|
+// "Dorado and Minimap2 execution time: {} seconds",
|
|
|
|
|
+// execution_time
|
|
|
|
|
+// );
|
|
|
|
|
+// self.is_done = true;
|
|
|
|
|
+//
|
|
|
|
|
+// Ok(())
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// // from_flowcell stays mostly as-is; it just calls run_pipe/from_mux
|
|
|
|
|
+// pub fn from_flowcell(flowcell: &FlowCell, config: &Config) -> anyhow::Result<()> {
|
|
|
|
|
+// let tp_conv = |time_point: &str| -> String {
|
|
|
|
|
+// match time_point {
|
|
|
|
|
+// "normal" => config.normal_name.clone(),
|
|
|
|
|
+// "tumoral" => config.tumoral_name.clone(),
|
|
|
|
|
+// _ => panic!("Error time point name"),
|
|
|
|
|
+// }
|
|
|
|
|
+// };
|
|
|
|
|
+// use crate::collection::flowcells::FlowCellLocation::*;
|
|
|
|
|
+// let base_pod_dir = match &flowcell.location {
|
|
|
|
|
+// Local(_) => None,
|
|
|
|
|
+// Archived(pod_tar) => {
|
|
|
|
|
+// let file = File::open(pod_tar)
|
|
|
|
|
+// .map_err(|e| anyhow::anyhow!("Failed to open tar file: {pod_tar}\n\t{e}"))?;
|
|
|
|
|
+// let mut archive = tar::Archive::new(file);
|
|
|
|
|
+// info!("Un-tar of archived {pod_tar}");
|
|
|
|
|
+// archive
|
|
|
|
|
+// .unpack(&config.unarchive_tmp_dir)
|
|
|
|
|
+// .map_err(|e| anyhow::anyhow!("Failed to un-tar: {pod_tar}\n\t{e}"))?;
|
|
|
|
|
+// info!("Un-tar of archived {pod_tar} Done.");
|
|
|
|
|
+//
|
|
|
|
|
+// Some(config.unarchive_tmp_dir.to_string())
|
|
|
|
|
+// }
|
|
|
|
|
+// };
|
|
|
|
|
+//
|
|
|
|
|
+// use crate::collection::flowcells::FlowCellExperiment::*;
|
|
|
|
|
+// match &flowcell.experiment {
|
|
|
|
|
+// WGSPod5Mux(pod_dir) => {
|
|
|
|
|
+// let pod_dir = if let Some(base_pod_dir) = &base_pod_dir {
|
|
|
|
|
+// format!("{base_pod_dir}/{pod_dir}")
|
|
|
|
|
+// } else {
|
|
|
|
|
+// pod_dir.clone()
|
|
|
|
|
+// };
|
|
|
|
|
+//
|
|
|
|
|
+// let cases = flowcell
|
|
|
|
|
+// .cases
|
|
|
|
|
+// .iter()
|
|
|
|
|
+// .map(|c| FlowCellCase {
|
|
|
|
|
+// id: c.case_id.clone(),
|
|
|
|
|
+// time_point: tp_conv(&c.sample_type),
|
|
|
|
|
+// barcode: c.barcode.clone(),
|
|
|
|
|
+// pod_dir: pod_dir.clone().into(),
|
|
|
|
|
+// })
|
|
|
|
|
+// .collect();
|
|
|
|
|
+// info!("Starting basecaller for muxed pod5: {cases:#?}");
|
|
|
|
|
+//
|
|
|
|
|
+// Dorado::from_mux(cases, config.clone())?;
|
|
|
|
|
+// }
|
|
|
|
|
+// WGSPod5Demux(pod_dir) => {
|
|
|
|
|
+// let pod_dir = if let Some(base_pod_dir) = &base_pod_dir {
|
|
|
|
|
+// format!("{base_pod_dir}/{pod_dir}")
|
|
|
|
|
+// } else {
|
|
|
|
|
+// pod_dir.clone()
|
|
|
|
|
+// };
|
|
|
|
|
+//
|
|
|
|
|
+// for c in flowcell.cases.iter() {
|
|
|
|
|
+// let pod_dir = format!("{pod_dir}/barcode{}", c.barcode.replace("NB", ""));
|
|
|
|
|
+// info!("Starting basecaller for demuxed pod5: {pod_dir}");
|
|
|
|
|
+// let mut d = Dorado::init(
|
|
|
|
|
+// FlowCellCase {
|
|
|
|
|
+// id: c.case_id.clone(),
|
|
|
|
|
+// time_point: tp_conv(&c.sample_type),
|
|
|
|
|
+// barcode: c.barcode.clone(),
|
|
|
|
|
+// pod_dir: pod_dir.into(),
|
|
|
|
|
+// },
|
|
|
|
|
+// config.clone(),
|
|
|
|
|
+// )?;
|
|
|
|
|
+// d.run_pipe()?;
|
|
|
|
|
+// }
|
|
|
|
|
+// }
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// Ok(())
|
|
|
|
|
+// }
|
|
|
|
|
+// }
|