|
|
@@ -0,0 +1,530 @@
|
|
|
+use std::{
|
|
|
+ collections::HashMap,
|
|
|
+ fmt,
|
|
|
+ fs::{self, metadata},
|
|
|
+ path::{Path, PathBuf},
|
|
|
+ time::SystemTime,
|
|
|
+};
|
|
|
+
|
|
|
+use anyhow::Context;
|
|
|
+use chrono::{DateTime, Utc};
|
|
|
+use glob::glob;
|
|
|
+use log::{info, warn};
|
|
|
+
|
|
|
+use self::{bam::BamCollection, pod5::Pod5Collection, vcf::VcfCollection};
|
|
|
+use crate::{
|
|
|
+ callers::{
|
|
|
+ clairs::{ClairS, ClairSConfig},
|
|
|
+ deep_variant::{DeepVariant, DeepVariantConfig},
|
|
|
+ nanomonsv::{NanomonSV, NanomonSVConfig},
|
|
|
+ },
|
|
|
+ collection::pod5::FlowCellCase,
|
|
|
+ commands::dorado::Dorado as BasecallAlign,
|
|
|
+ config::Config,
|
|
|
+ functions::{
|
|
|
+ assembler::{Assembler, AssemblerConfig},
|
|
|
+ variants::{Variants, VariantsConfig},
|
|
|
+ whole_scan::{WholeScan, WholeScanConfig},
|
|
|
+ },
|
|
|
+};
|
|
|
+
|
|
|
+pub mod bam;
|
|
|
+pub mod pod5;
|
|
|
+pub mod variants;
|
|
|
+pub mod vcf;
|
|
|
+
|
|
|
+#[derive(Debug, Clone)]
|
|
|
+pub struct CollectionsConfig {
|
|
|
+ pub pod_dir: String,
|
|
|
+ pub corrected_fc_path: String,
|
|
|
+ pub result_dir: String,
|
|
|
+ pub dict_file: String,
|
|
|
+ pub min_diag_cov: f32,
|
|
|
+ pub min_mrd_cov: f32,
|
|
|
+}
|
|
|
+
|
|
|
+impl Default for CollectionsConfig {
|
|
|
+ fn default() -> Self {
|
|
|
+ Self {
|
|
|
+ pod_dir: "/data/run_data".to_string(),
|
|
|
+ corrected_fc_path: "/data/flow_cells.tsv".to_string(),
|
|
|
+ result_dir: "/data/longreads_basic_pipe".to_string(),
|
|
|
+ dict_file: "/data/ref/hs1/chm13v2.0.dict".to_string(),
|
|
|
+ min_diag_cov: 12.0,
|
|
|
+ min_mrd_cov: 10.0,
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+#[derive(Debug)]
|
|
|
+pub struct Collections {
|
|
|
+ pub config: CollectionsConfig,
|
|
|
+ pub pod5: Pod5Collection,
|
|
|
+ pub bam: BamCollection,
|
|
|
+ pub vcf: VcfCollection,
|
|
|
+ pub tasks: Vec<CollectionsTasks>,
|
|
|
+}
|
|
|
+
|
|
|
+impl Collections {
|
|
|
+ pub fn new(config: CollectionsConfig) -> anyhow::Result<Self> {
|
|
|
+ let CollectionsConfig {
|
|
|
+ pod_dir,
|
|
|
+ corrected_fc_path,
|
|
|
+ result_dir,
|
|
|
+ ..
|
|
|
+ } = &config;
|
|
|
+ let pod5 = Pod5Collection::new(pod_dir, corrected_fc_path, result_dir)?;
|
|
|
+ let bam = BamCollection::new(result_dir);
|
|
|
+ let vcf = VcfCollection::new(result_dir);
|
|
|
+
|
|
|
+ Ok(Self {
|
|
|
+ pod5,
|
|
|
+ bam,
|
|
|
+ vcf,
|
|
|
+ tasks: Vec::new(),
|
|
|
+ config,
|
|
|
+ })
|
|
|
+ }
|
|
|
+
|
|
|
+ pub fn todo(&mut self) -> anyhow::Result<()> {
|
|
|
+ info!("Looking for base calling tasks...");
|
|
|
+
|
|
|
+ let mut tasks = Vec::new();
|
|
|
+ let mut to_demux = Vec::new();
|
|
|
+ for run in self.pod5.runs.iter() {
|
|
|
+ for fc in run.flowcells.iter() {
|
|
|
+ let acq_id = fc.pod5_info.acquisition_id.clone();
|
|
|
+ for case in fc.cases.iter() {
|
|
|
+ let bams_ids: Vec<String> = self
|
|
|
+ .bam
|
|
|
+ .get(&case.id, &case.time_point)
|
|
|
+ .iter()
|
|
|
+ .flat_map(|b| {
|
|
|
+ b.composition
|
|
|
+ .iter()
|
|
|
+ .map(|c| c.0.clone())
|
|
|
+ .collect::<Vec<String>>()
|
|
|
+ })
|
|
|
+ .filter(|id| *id == acq_id)
|
|
|
+ .collect();
|
|
|
+ if bams_ids.is_empty() {
|
|
|
+ match fc.pod5_type {
|
|
|
+ pod5::Pod5Type::Raw => to_demux.push(case.clone()),
|
|
|
+ pod5::Pod5Type::Demuxed => {
|
|
|
+ tasks.push(CollectionsTasks::Align(case.clone()))
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Group for muxed and push task with all cases
|
|
|
+ let mut grouped: HashMap<PathBuf, Vec<FlowCellCase>> = HashMap::new();
|
|
|
+ for case in to_demux {
|
|
|
+ grouped.entry(case.pod_dir.clone()).or_default().push(case);
|
|
|
+ }
|
|
|
+ grouped
|
|
|
+ .into_values()
|
|
|
+ .for_each(|data| tasks.push(CollectionsTasks::DemuxAlign(data)));
|
|
|
+
|
|
|
+ // Whole scan
|
|
|
+ for bam in self
|
|
|
+ .bam
|
|
|
+ .by_id_completed(self.config.min_diag_cov, self.config.min_mrd_cov)
|
|
|
+ {
|
|
|
+ let config = WholeScanConfig::default();
|
|
|
+ let scan_dir = format!(
|
|
|
+ "{}/{}/{}/{}",
|
|
|
+ &config.result_dir, bam.id, bam.time_point, config.scan_dir
|
|
|
+ );
|
|
|
+ if PathBuf::from(&scan_dir).exists() {
|
|
|
+ let dir_mod: DateTime<Utc> = fs::metadata(&scan_dir)?.modified()?.into();
|
|
|
+ if bam.modified > dir_mod {
|
|
|
+ fs::remove_dir_all(&scan_dir)?;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if !PathBuf::from(&scan_dir).exists() {
|
|
|
+ tasks.push(CollectionsTasks::WholeScan {
|
|
|
+ id: bam.id,
|
|
|
+ time_point: bam.time_point,
|
|
|
+ bam: bam
|
|
|
+ .path
|
|
|
+ .to_str()
|
|
|
+ .context("Cant convert path to string")?
|
|
|
+ .to_string(),
|
|
|
+ config: WholeScanConfig::default(),
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Remove VCF anterior to BAM
|
|
|
+ let vcf_by_id = self.vcf.group_by_id();
|
|
|
+ vcf_by_id.iter().for_each(|(id, vcfs)| {
|
|
|
+ if let (Some(diag), Some(mrd)) = (
|
|
|
+ self.bam.get(id, "diag").first(),
|
|
|
+ self.bam.get(id, "mrd").first(),
|
|
|
+ ) {
|
|
|
+ let diag_modified = diag.modified;
|
|
|
+ let mrd_modified = mrd.modified;
|
|
|
+ let mut rm_paths: Vec<&Path> = vcfs
|
|
|
+ .iter()
|
|
|
+ .flat_map(|vcf| {
|
|
|
+ let vcf_mod: DateTime<Utc> = vcf
|
|
|
+ .file_metadata
|
|
|
+ .modified()
|
|
|
+ .expect("Can't read VCF modified time.")
|
|
|
+ .into();
|
|
|
+
|
|
|
+ // For somatic caller erase if one bam (diag or mrd) is more recent.
|
|
|
+ if vcf.caller != "DeepVariant" {
|
|
|
+ if vcf_mod < diag_modified || vcf_mod < mrd_modified {
|
|
|
+ vec![vcf.path.parent().unwrap()]
|
|
|
+ } else {
|
|
|
+ vec![]
|
|
|
+ }
|
|
|
+ } else if (vcf.time_point == "diag" && vcf_mod < diag_modified)
|
|
|
+ || (vcf.time_point == "mrd" && vcf_mod < mrd_modified)
|
|
|
+ {
|
|
|
+ vec![vcf.path.parent().unwrap()]
|
|
|
+ } else {
|
|
|
+ vec![]
|
|
|
+ }
|
|
|
+ })
|
|
|
+ .collect();
|
|
|
+ rm_paths.sort();
|
|
|
+ rm_paths.dedup();
|
|
|
+ rm_paths
|
|
|
+ .iter()
|
|
|
+ .for_each(|p| fs::remove_dir_all(p).expect("Can't erase caller dir."))
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ // Variant calling
|
|
|
+ info!("Looking for variant calling tasks...");
|
|
|
+ self.bam.bams.iter().map(|b| b.id.clone()).for_each(|id| {
|
|
|
+ if let (Some(diag), Some(mrd)) = (
|
|
|
+ self.bam.get(&id, "diag").first(),
|
|
|
+ self.bam.get(&id, "mrd").first(),
|
|
|
+ ) {
|
|
|
+ if let (Some(diag_cramino), Some(mrd_cramino)) = (&diag.cramino, &mrd.cramino) {
|
|
|
+ if diag_cramino.mean_coverage >= self.config.min_diag_cov.into()
|
|
|
+ && mrd_cramino.mean_coverage >= self.config.min_mrd_cov.into()
|
|
|
+ {
|
|
|
+ let caller_time: Vec<(&str, &str)> = vcf_by_id
|
|
|
+ .iter()
|
|
|
+ .filter(|(i, _)| *i == id)
|
|
|
+ .flat_map(|(_, vcfs)| {
|
|
|
+ vcfs.iter()
|
|
|
+ .map(|vcf| (vcf.caller.as_str(), vcf.time_point.as_str()))
|
|
|
+ })
|
|
|
+ .collect();
|
|
|
+
|
|
|
+ if !caller_time.contains(&("clairs", "diag"))
|
|
|
+ || !caller_time.contains(&("clairs_indel", "diag"))
|
|
|
+ {
|
|
|
+ tasks.push(CollectionsTasks::ClairS {
|
|
|
+ id: id.to_string(),
|
|
|
+ diag_bam: diag.path.to_str().unwrap().to_string(),
|
|
|
+ mrd_bam: mrd.path.to_str().unwrap().to_string(),
|
|
|
+ config: ClairSConfig::default(),
|
|
|
+ });
|
|
|
+ }
|
|
|
+ if !caller_time.contains(&("DeepVariant", "diag")) {
|
|
|
+ tasks.push(CollectionsTasks::DeepVariant {
|
|
|
+ id: id.to_string(),
|
|
|
+ time_point: "diag".to_string(),
|
|
|
+ bam: diag.path.to_str().unwrap().to_string(),
|
|
|
+ config: DeepVariantConfig::default(),
|
|
|
+ });
|
|
|
+ }
|
|
|
+ if !caller_time.contains(&("DeepVariant", "mrd")) {
|
|
|
+ tasks.push(CollectionsTasks::DeepVariant {
|
|
|
+ id: id.to_string(),
|
|
|
+ time_point: "mrd".to_string(),
|
|
|
+ bam: mrd.path.to_str().unwrap().to_string(),
|
|
|
+ config: DeepVariantConfig::default(),
|
|
|
+ });
|
|
|
+ }
|
|
|
+ if !caller_time.contains(&("nanomonsv", "diag")) {
|
|
|
+ tasks.push(CollectionsTasks::NanomonSV {
|
|
|
+ id: id.to_string(),
|
|
|
+ diag_bam: diag.path.to_str().unwrap().to_string(),
|
|
|
+ mrd_bam: mrd.path.to_str().unwrap().to_string(),
|
|
|
+ config: NanomonSVConfig::default(),
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ // Variants aggregation
|
|
|
+ // info!("Looking for variants aggregation tasks...");
|
|
|
+ // self.bam.bams.iter().filter(|b| b.time_point == "diag" ).for_each(|bam| {
|
|
|
+ // let id = bam.id;
|
|
|
+ // });
|
|
|
+
|
|
|
+ // de novo
|
|
|
+ tasks.extend(self.todo_assembler()?);
|
|
|
+
|
|
|
+ // Tasks sorting and dedup
|
|
|
+ let mut hs = HashMap::new();
|
|
|
+ tasks.into_iter().for_each(|t| {
|
|
|
+ hs.insert(t.to_string(), t);
|
|
|
+ });
|
|
|
+ self.tasks = hs.into_values().collect();
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
+
|
|
|
+ pub fn tasks_dedup(&mut self) {
|
|
|
+ let mut hs = HashMap::new();
|
|
|
+ self.tasks.clone().into_iter().for_each(|t| {
|
|
|
+ hs.insert(t.to_string(), t);
|
|
|
+ });
|
|
|
+ self.tasks = hs.into_values().collect();
|
|
|
+ }
|
|
|
+
|
|
|
+ pub fn todo_assembler(&mut self) -> anyhow::Result<Vec<CollectionsTasks>> {
|
|
|
+ let mut tasks = Vec::new();
|
|
|
+ let config = AssemblerConfig::default();
|
|
|
+ for b in &self.bam.bams {
|
|
|
+ let assemblies_dir = format!(
|
|
|
+ "{}/{}/{}/{}",
|
|
|
+ config.result_dir, b.id, b.time_point, config.output_dir_name
|
|
|
+ );
|
|
|
+
|
|
|
+ if !Path::new(&assemblies_dir).exists() {
|
|
|
+ tasks.push(CollectionsTasks::Assemble {
|
|
|
+ id: b.id.clone(),
|
|
|
+ time_point: b.time_point.clone(),
|
|
|
+ config: config.clone(),
|
|
|
+ });
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ let pattern = format!("{assemblies_dir}/*/*.bam");
|
|
|
+ let mut mtimes: Vec<SystemTime> = glob(&pattern)?
|
|
|
+ .filter_map(|entry| entry.ok())
|
|
|
+ .filter_map(|path| metadata(path).ok()?.modified().ok())
|
|
|
+ .collect();
|
|
|
+
|
|
|
+ if mtimes.is_empty() {
|
|
|
+ tasks.push(CollectionsTasks::Assemble {
|
|
|
+ id: b.id.clone(),
|
|
|
+ time_point: b.time_point.clone(),
|
|
|
+ config: config.clone(),
|
|
|
+ });
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ mtimes.sort_unstable();
|
|
|
+ mtimes.dedup();
|
|
|
+ let max_mtime: DateTime<Utc> =
|
|
|
+ mtimes.last().context("No modified time")?.to_owned().into();
|
|
|
+ if b.modified > max_mtime {
|
|
|
+ tasks.push(CollectionsTasks::Assemble {
|
|
|
+ id: b.id.clone(),
|
|
|
+ time_point: b.time_point.clone(),
|
|
|
+ config: config.clone(),
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ Ok(tasks)
|
|
|
+ }
|
|
|
+
|
|
|
+ pub fn run(&mut self) -> anyhow::Result<()> {
|
|
|
+ // self.tasks.reverse();
|
|
|
+ if self.tasks.is_empty() {
|
|
|
+ self.todo()?;
|
|
|
+ if self.tasks.is_empty() {
|
|
|
+ return Ok(());
|
|
|
+ } else {
|
|
|
+ self.run()?;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ let n_tasks = self.tasks.len();
|
|
|
+ warn!("{n_tasks} tasks to run");
|
|
|
+ let mut i = 1;
|
|
|
+ while let Some(task) = self.tasks.pop() {
|
|
|
+ warn!("Running {i}/{n_tasks}");
|
|
|
+ info!("{task:#?}");
|
|
|
+ task.run()?;
|
|
|
+ i += 1;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
+}
|
|
|
+use strum_macros::EnumOrd;
|
|
|
+#[derive(Debug, Clone, EnumOrd)]
|
|
|
+pub enum CollectionsTasks {
|
|
|
+ Align(FlowCellCase),
|
|
|
+ DemuxAlign(Vec<FlowCellCase>),
|
|
|
+ WholeScan {
|
|
|
+ id: String,
|
|
|
+ time_point: String,
|
|
|
+ bam: String,
|
|
|
+ config: WholeScanConfig,
|
|
|
+ },
|
|
|
+ Assemble {
|
|
|
+ id: String,
|
|
|
+ time_point: String,
|
|
|
+ config: AssemblerConfig,
|
|
|
+ },
|
|
|
+ DeepVariant {
|
|
|
+ id: String,
|
|
|
+ time_point: String,
|
|
|
+ bam: String,
|
|
|
+ config: DeepVariantConfig,
|
|
|
+ },
|
|
|
+ ClairS {
|
|
|
+ id: String,
|
|
|
+ diag_bam: String,
|
|
|
+ mrd_bam: String,
|
|
|
+ config: ClairSConfig,
|
|
|
+ },
|
|
|
+ NanomonSV {
|
|
|
+ id: String,
|
|
|
+ diag_bam: String,
|
|
|
+ mrd_bam: String,
|
|
|
+ config: NanomonSVConfig,
|
|
|
+ },
|
|
|
+ Variants {
|
|
|
+ id: String,
|
|
|
+ config: VariantsConfig,
|
|
|
+ },
|
|
|
+}
|
|
|
+
|
|
|
+impl CollectionsTasks {
|
|
|
+ pub fn run(self) -> anyhow::Result<()> {
|
|
|
+ match self {
|
|
|
+ CollectionsTasks::Align(case) => {
|
|
|
+ BasecallAlign::init(case.clone(), Config::default())?.run_pipe()?;
|
|
|
+ }
|
|
|
+ CollectionsTasks::DemuxAlign(cases) => {
|
|
|
+ BasecallAlign::from_mux(cases, Config::default())?;
|
|
|
+ }
|
|
|
+ CollectionsTasks::DeepVariant {
|
|
|
+ id,
|
|
|
+ time_point,
|
|
|
+ bam,
|
|
|
+ config,
|
|
|
+ } => {
|
|
|
+ DeepVariant::new(&id, &time_point, &bam, config).run();
|
|
|
+ }
|
|
|
+ CollectionsTasks::ClairS {
|
|
|
+ id,
|
|
|
+ diag_bam,
|
|
|
+ mrd_bam,
|
|
|
+ config,
|
|
|
+ } => {
|
|
|
+ ClairS::new(&id, &diag_bam, &mrd_bam, config).run();
|
|
|
+ }
|
|
|
+ CollectionsTasks::NanomonSV {
|
|
|
+ id,
|
|
|
+ diag_bam,
|
|
|
+ mrd_bam,
|
|
|
+ config,
|
|
|
+ } => {
|
|
|
+ NanomonSV::new(&id, &diag_bam, &mrd_bam, config).run();
|
|
|
+ }
|
|
|
+ CollectionsTasks::WholeScan {
|
|
|
+ id,
|
|
|
+ time_point,
|
|
|
+ bam,
|
|
|
+ config,
|
|
|
+ } => {
|
|
|
+ WholeScan::new(id, time_point, bam, config)?.run()?;
|
|
|
+ }
|
|
|
+ CollectionsTasks::Variants { id, config } => {
|
|
|
+ Variants::new(id, config).run()?;
|
|
|
+ }
|
|
|
+ CollectionsTasks::Assemble {
|
|
|
+ id,
|
|
|
+ time_point,
|
|
|
+ config,
|
|
|
+ } => {
|
|
|
+ Assembler::new(id, time_point, config).run()?;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// Implement Display for CollectionsTasks
|
|
|
+impl fmt::Display for CollectionsTasks {
|
|
|
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
+ use CollectionsTasks::*;
|
|
|
+
|
|
|
+ match self {
|
|
|
+ Align(case) => write!(f, "Align task with: {:#?}", case),
|
|
|
+ DemuxAlign(cases) => write!(f, "DemuxAlign task with: {:#?}", cases),
|
|
|
+ DeepVariant {
|
|
|
+ id,
|
|
|
+ time_point,
|
|
|
+ bam,
|
|
|
+ ..
|
|
|
+ } => {
|
|
|
+ write!(
|
|
|
+ f,
|
|
|
+ "DeepVariant task with id: {}, time_point: {}, bam: {}",
|
|
|
+ id, time_point, bam
|
|
|
+ )
|
|
|
+ }
|
|
|
+ ClairS {
|
|
|
+ id,
|
|
|
+ diag_bam,
|
|
|
+ mrd_bam,
|
|
|
+ ..
|
|
|
+ } => {
|
|
|
+ write!(
|
|
|
+ f,
|
|
|
+ "ClairS task with id: {}, diag_bam: {}, mrd_bam: {}",
|
|
|
+ id, diag_bam, mrd_bam
|
|
|
+ )
|
|
|
+ }
|
|
|
+ NanomonSV {
|
|
|
+ id,
|
|
|
+ diag_bam,
|
|
|
+ mrd_bam,
|
|
|
+ ..
|
|
|
+ } => {
|
|
|
+ write!(
|
|
|
+ f,
|
|
|
+ "NanomonSV task with id: {}, diag_bam: {}, mrd_bam: {}",
|
|
|
+ id, diag_bam, mrd_bam
|
|
|
+ )
|
|
|
+ }
|
|
|
+ WholeScan { id, bam, .. } => write!(f, "Whole scan for id: {}, bam: {}", id, bam),
|
|
|
+ Variants { id, .. } => write!(f, "Variants aggregation for id: {}", id),
|
|
|
+ Assemble { id, time_point, .. } => {
|
|
|
+ write!(f, "Assembly for id: {}, time point: {}", id, time_point)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+pub fn run_tasks(config: CollectionsConfig) -> anyhow::Result<()> {
|
|
|
+ let mut last_n = Vec::new();
|
|
|
+ loop {
|
|
|
+ let mut collection = Collections::new(config.clone())?;
|
|
|
+ collection.todo()?;
|
|
|
+ if collection.tasks.is_empty() {
|
|
|
+ warn!("All results are update");
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ let n_tasks = collection.tasks.len();
|
|
|
+ warn!("{n_tasks} tasks to run");
|
|
|
+ if last_n.len() > 2
|
|
|
+ && last_n[last_n.len() - 1] == n_tasks
|
|
|
+ && last_n[last_n.len() - 2] == n_tasks
|
|
|
+ {
|
|
|
+ warn!("Tasks don't progress");
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ last_n.push(n_tasks);
|
|
|
+ collection.run()?;
|
|
|
+ }
|
|
|
+ Ok(())
|
|
|
+}
|