|
|
@@ -1,530 +0,0 @@
|
|
|
-use std::{
|
|
|
- collections::HashMap,
|
|
|
- fmt,
|
|
|
- fs::{self, metadata},
|
|
|
- path::{Path, PathBuf},
|
|
|
- time::SystemTime,
|
|
|
-};
|
|
|
-
|
|
|
-use anyhow::Context;
|
|
|
-use chrono::{DateTime, Utc};
|
|
|
-use glob::glob;
|
|
|
-use log::{info, warn};
|
|
|
-
|
|
|
-use self::{bam::BamCollection, pod5::Pod5Collection, vcf::VcfCollection};
|
|
|
-use crate::{
|
|
|
- callers::{
|
|
|
- clairs::{ClairS, ClairSConfig},
|
|
|
- deep_variant::{DeepVariant, DeepVariantConfig},
|
|
|
- nanomonsv::{NanomonSV, NanomonSVConfig},
|
|
|
- },
|
|
|
- collection::pod5::FlowCellCase,
|
|
|
- commands::dorado::Dorado as BasecallAlign,
|
|
|
- config::Config,
|
|
|
- functions::{
|
|
|
- assembler::{Assembler, AssemblerConfig},
|
|
|
- variants::{Variants, VariantsConfig},
|
|
|
- whole_scan::{WholeScan, WholeScanConfig},
|
|
|
- },
|
|
|
-};
|
|
|
-
|
|
|
-pub mod bam;
|
|
|
-pub mod pod5;
|
|
|
-pub mod variants;
|
|
|
-pub mod vcf;
|
|
|
-
|
|
|
-#[derive(Debug, Clone)]
|
|
|
-pub struct CollectionsConfig {
|
|
|
- pub pod_dir: String,
|
|
|
- pub corrected_fc_path: String,
|
|
|
- pub result_dir: String,
|
|
|
- pub dict_file: String,
|
|
|
- pub min_diag_cov: f32,
|
|
|
- pub min_mrd_cov: f32,
|
|
|
-}
|
|
|
-
|
|
|
-impl Default for CollectionsConfig {
|
|
|
- fn default() -> Self {
|
|
|
- Self {
|
|
|
- pod_dir: "/data/run_data".to_string(),
|
|
|
- corrected_fc_path: "/data/flow_cells.tsv".to_string(),
|
|
|
- result_dir: "/data/longreads_basic_pipe".to_string(),
|
|
|
- dict_file: "/data/ref/hs1/chm13v2.0.dict".to_string(),
|
|
|
- min_diag_cov: 12.0,
|
|
|
- min_mrd_cov: 10.0,
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-#[derive(Debug)]
|
|
|
-pub struct Collections {
|
|
|
- pub config: CollectionsConfig,
|
|
|
- pub pod5: Pod5Collection,
|
|
|
- pub bam: BamCollection,
|
|
|
- pub vcf: VcfCollection,
|
|
|
- pub tasks: Vec<CollectionsTasks>,
|
|
|
-}
|
|
|
-
|
|
|
-impl Collections {
|
|
|
- pub fn new(config: CollectionsConfig) -> anyhow::Result<Self> {
|
|
|
- let CollectionsConfig {
|
|
|
- pod_dir,
|
|
|
- corrected_fc_path,
|
|
|
- result_dir,
|
|
|
- ..
|
|
|
- } = &config;
|
|
|
- let pod5 = Pod5Collection::new(pod_dir, corrected_fc_path, result_dir)?;
|
|
|
- let bam = BamCollection::new(result_dir);
|
|
|
- let vcf = VcfCollection::new(result_dir);
|
|
|
-
|
|
|
- Ok(Self {
|
|
|
- pod5,
|
|
|
- bam,
|
|
|
- vcf,
|
|
|
- tasks: Vec::new(),
|
|
|
- config,
|
|
|
- })
|
|
|
- }
|
|
|
-
|
|
|
- pub fn todo(&mut self) -> anyhow::Result<()> {
|
|
|
- info!("Looking for base calling tasks...");
|
|
|
-
|
|
|
- let mut tasks = Vec::new();
|
|
|
- let mut to_demux = Vec::new();
|
|
|
- for run in self.pod5.runs.iter() {
|
|
|
- for fc in run.flowcells.iter() {
|
|
|
- let acq_id = fc.pod5_info.acquisition_id.clone();
|
|
|
- for case in fc.cases.iter() {
|
|
|
- let bams_ids: Vec<String> = self
|
|
|
- .bam
|
|
|
- .get(&case.id, &case.time_point)
|
|
|
- .iter()
|
|
|
- .flat_map(|b| {
|
|
|
- b.composition
|
|
|
- .iter()
|
|
|
- .map(|c| c.0.clone())
|
|
|
- .collect::<Vec<String>>()
|
|
|
- })
|
|
|
- .filter(|id| *id == acq_id)
|
|
|
- .collect();
|
|
|
- if bams_ids.is_empty() {
|
|
|
- match fc.pod5_type {
|
|
|
- pod5::Pod5Type::Raw => to_demux.push(case.clone()),
|
|
|
- pod5::Pod5Type::Demuxed => {
|
|
|
- tasks.push(CollectionsTasks::Align(case.clone()))
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // Group for muxed and push task with all cases
|
|
|
- let mut grouped: HashMap<PathBuf, Vec<FlowCellCase>> = HashMap::new();
|
|
|
- for case in to_demux {
|
|
|
- grouped.entry(case.pod_dir.clone()).or_default().push(case);
|
|
|
- }
|
|
|
- grouped
|
|
|
- .into_values()
|
|
|
- .for_each(|data| tasks.push(CollectionsTasks::DemuxAlign(data)));
|
|
|
-
|
|
|
- // Whole scan
|
|
|
- for bam in self
|
|
|
- .bam
|
|
|
- .by_id_completed(self.config.min_diag_cov, self.config.min_mrd_cov)
|
|
|
- {
|
|
|
- let config = WholeScanConfig::default();
|
|
|
- let scan_dir = format!(
|
|
|
- "{}/{}/{}/{}",
|
|
|
- &config.result_dir, bam.id, bam.time_point, config.scan_dir
|
|
|
- );
|
|
|
- if PathBuf::from(&scan_dir).exists() {
|
|
|
- let dir_mod: DateTime<Utc> = fs::metadata(&scan_dir)?.modified()?.into();
|
|
|
- if bam.modified > dir_mod {
|
|
|
- fs::remove_dir_all(&scan_dir)?;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if !PathBuf::from(&scan_dir).exists() {
|
|
|
- tasks.push(CollectionsTasks::WholeScan {
|
|
|
- id: bam.id,
|
|
|
- time_point: bam.time_point,
|
|
|
- bam: bam
|
|
|
- .path
|
|
|
- .to_str()
|
|
|
- .context("Cant convert path to string")?
|
|
|
- .to_string(),
|
|
|
- config: WholeScanConfig::default(),
|
|
|
- });
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // Remove VCF anterior to BAM
|
|
|
- let vcf_by_id = self.vcf.group_by_id();
|
|
|
- vcf_by_id.iter().for_each(|(id, vcfs)| {
|
|
|
- if let (Some(diag), Some(mrd)) = (
|
|
|
- self.bam.get(id, "diag").first(),
|
|
|
- self.bam.get(id, "mrd").first(),
|
|
|
- ) {
|
|
|
- let diag_modified = diag.modified;
|
|
|
- let mrd_modified = mrd.modified;
|
|
|
- let mut rm_paths: Vec<&Path> = vcfs
|
|
|
- .iter()
|
|
|
- .flat_map(|vcf| {
|
|
|
- let vcf_mod: DateTime<Utc> = vcf
|
|
|
- .file_metadata
|
|
|
- .modified()
|
|
|
- .expect("Can't read VCF modified time.")
|
|
|
- .into();
|
|
|
-
|
|
|
- // For somatic caller erase if one bam (diag or mrd) is more recent.
|
|
|
- if vcf.caller != "DeepVariant" {
|
|
|
- if vcf_mod < diag_modified || vcf_mod < mrd_modified {
|
|
|
- vec![vcf.path.parent().unwrap()]
|
|
|
- } else {
|
|
|
- vec![]
|
|
|
- }
|
|
|
- } else if (vcf.time_point == "diag" && vcf_mod < diag_modified)
|
|
|
- || (vcf.time_point == "mrd" && vcf_mod < mrd_modified)
|
|
|
- {
|
|
|
- vec![vcf.path.parent().unwrap()]
|
|
|
- } else {
|
|
|
- vec![]
|
|
|
- }
|
|
|
- })
|
|
|
- .collect();
|
|
|
- rm_paths.sort();
|
|
|
- rm_paths.dedup();
|
|
|
- rm_paths
|
|
|
- .iter()
|
|
|
- .for_each(|p| fs::remove_dir_all(p).expect("Can't erase caller dir."))
|
|
|
- }
|
|
|
- });
|
|
|
-
|
|
|
- // Variant calling
|
|
|
- info!("Looking for variant calling tasks...");
|
|
|
- self.bam.bams.iter().map(|b| b.id.clone()).for_each(|id| {
|
|
|
- if let (Some(diag), Some(mrd)) = (
|
|
|
- self.bam.get(&id, "diag").first(),
|
|
|
- self.bam.get(&id, "mrd").first(),
|
|
|
- ) {
|
|
|
- if let (Some(diag_cramino), Some(mrd_cramino)) = (&diag.cramino, &mrd.cramino) {
|
|
|
- if diag_cramino.mean_coverage >= self.config.min_diag_cov.into()
|
|
|
- && mrd_cramino.mean_coverage >= self.config.min_mrd_cov.into()
|
|
|
- {
|
|
|
- let caller_time: Vec<(&str, &str)> = vcf_by_id
|
|
|
- .iter()
|
|
|
- .filter(|(i, _)| *i == id)
|
|
|
- .flat_map(|(_, vcfs)| {
|
|
|
- vcfs.iter()
|
|
|
- .map(|vcf| (vcf.caller.as_str(), vcf.time_point.as_str()))
|
|
|
- })
|
|
|
- .collect();
|
|
|
-
|
|
|
- if !caller_time.contains(&("clairs", "diag"))
|
|
|
- || !caller_time.contains(&("clairs_indel", "diag"))
|
|
|
- {
|
|
|
- tasks.push(CollectionsTasks::ClairS {
|
|
|
- id: id.to_string(),
|
|
|
- diag_bam: diag.path.to_str().unwrap().to_string(),
|
|
|
- mrd_bam: mrd.path.to_str().unwrap().to_string(),
|
|
|
- config: ClairSConfig::default(),
|
|
|
- });
|
|
|
- }
|
|
|
- if !caller_time.contains(&("DeepVariant", "diag")) {
|
|
|
- tasks.push(CollectionsTasks::DeepVariant {
|
|
|
- id: id.to_string(),
|
|
|
- time_point: "diag".to_string(),
|
|
|
- bam: diag.path.to_str().unwrap().to_string(),
|
|
|
- config: DeepVariantConfig::default(),
|
|
|
- });
|
|
|
- }
|
|
|
- if !caller_time.contains(&("DeepVariant", "mrd")) {
|
|
|
- tasks.push(CollectionsTasks::DeepVariant {
|
|
|
- id: id.to_string(),
|
|
|
- time_point: "mrd".to_string(),
|
|
|
- bam: mrd.path.to_str().unwrap().to_string(),
|
|
|
- config: DeepVariantConfig::default(),
|
|
|
- });
|
|
|
- }
|
|
|
- if !caller_time.contains(&("nanomonsv", "diag")) {
|
|
|
- tasks.push(CollectionsTasks::NanomonSV {
|
|
|
- id: id.to_string(),
|
|
|
- diag_bam: diag.path.to_str().unwrap().to_string(),
|
|
|
- mrd_bam: mrd.path.to_str().unwrap().to_string(),
|
|
|
- config: NanomonSVConfig::default(),
|
|
|
- });
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- });
|
|
|
-
|
|
|
- // Variants aggregation
|
|
|
- // info!("Looking for variants aggregation tasks...");
|
|
|
- // self.bam.bams.iter().filter(|b| b.time_point == "diag" ).for_each(|bam| {
|
|
|
- // let id = bam.id;
|
|
|
- // });
|
|
|
-
|
|
|
- // de novo
|
|
|
- tasks.extend(self.todo_assembler()?);
|
|
|
-
|
|
|
- // Tasks sorting and dedup
|
|
|
- let mut hs = HashMap::new();
|
|
|
- tasks.into_iter().for_each(|t| {
|
|
|
- hs.insert(t.to_string(), t);
|
|
|
- });
|
|
|
- self.tasks = hs.into_values().collect();
|
|
|
- Ok(())
|
|
|
- }
|
|
|
-
|
|
|
- pub fn tasks_dedup(&mut self) {
|
|
|
- let mut hs = HashMap::new();
|
|
|
- self.tasks.clone().into_iter().for_each(|t| {
|
|
|
- hs.insert(t.to_string(), t);
|
|
|
- });
|
|
|
- self.tasks = hs.into_values().collect();
|
|
|
- }
|
|
|
-
|
|
|
- pub fn todo_assembler(&mut self) -> anyhow::Result<Vec<CollectionsTasks>> {
|
|
|
- let mut tasks = Vec::new();
|
|
|
- let config = AssemblerConfig::default();
|
|
|
- for b in &self.bam.bams {
|
|
|
- let assemblies_dir = format!(
|
|
|
- "{}/{}/{}/{}",
|
|
|
- config.result_dir, b.id, b.time_point, config.output_dir_name
|
|
|
- );
|
|
|
-
|
|
|
- if !Path::new(&assemblies_dir).exists() {
|
|
|
- tasks.push(CollectionsTasks::Assemble {
|
|
|
- id: b.id.clone(),
|
|
|
- time_point: b.time_point.clone(),
|
|
|
- config: config.clone(),
|
|
|
- });
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
- let pattern = format!("{assemblies_dir}/*/*.bam");
|
|
|
- let mut mtimes: Vec<SystemTime> = glob(&pattern)?
|
|
|
- .filter_map(|entry| entry.ok())
|
|
|
- .filter_map(|path| metadata(path).ok()?.modified().ok())
|
|
|
- .collect();
|
|
|
-
|
|
|
- if mtimes.is_empty() {
|
|
|
- tasks.push(CollectionsTasks::Assemble {
|
|
|
- id: b.id.clone(),
|
|
|
- time_point: b.time_point.clone(),
|
|
|
- config: config.clone(),
|
|
|
- });
|
|
|
- continue;
|
|
|
- }
|
|
|
- mtimes.sort_unstable();
|
|
|
- mtimes.dedup();
|
|
|
- let max_mtime: DateTime<Utc> =
|
|
|
- mtimes.last().context("No modified time")?.to_owned().into();
|
|
|
- if b.modified > max_mtime {
|
|
|
- tasks.push(CollectionsTasks::Assemble {
|
|
|
- id: b.id.clone(),
|
|
|
- time_point: b.time_point.clone(),
|
|
|
- config: config.clone(),
|
|
|
- });
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- Ok(tasks)
|
|
|
- }
|
|
|
-
|
|
|
- pub fn run(&mut self) -> anyhow::Result<()> {
|
|
|
- // self.tasks.reverse();
|
|
|
- if self.tasks.is_empty() {
|
|
|
- self.todo()?;
|
|
|
- if self.tasks.is_empty() {
|
|
|
- return Ok(());
|
|
|
- } else {
|
|
|
- self.run()?;
|
|
|
- }
|
|
|
- } else {
|
|
|
- let n_tasks = self.tasks.len();
|
|
|
- warn!("{n_tasks} tasks to run");
|
|
|
- let mut i = 1;
|
|
|
- while let Some(task) = self.tasks.pop() {
|
|
|
- warn!("Running {i}/{n_tasks}");
|
|
|
- info!("{task:#?}");
|
|
|
- task.run()?;
|
|
|
- i += 1;
|
|
|
- }
|
|
|
- }
|
|
|
- Ok(())
|
|
|
- }
|
|
|
-}
|
|
|
-use strum_macros::EnumOrd;
|
|
|
-#[derive(Debug, Clone, EnumOrd)]
|
|
|
-pub enum CollectionsTasks {
|
|
|
- Align(FlowCellCase),
|
|
|
- DemuxAlign(Vec<FlowCellCase>),
|
|
|
- WholeScan {
|
|
|
- id: String,
|
|
|
- time_point: String,
|
|
|
- bam: String,
|
|
|
- config: WholeScanConfig,
|
|
|
- },
|
|
|
- Assemble {
|
|
|
- id: String,
|
|
|
- time_point: String,
|
|
|
- config: AssemblerConfig,
|
|
|
- },
|
|
|
- DeepVariant {
|
|
|
- id: String,
|
|
|
- time_point: String,
|
|
|
- bam: String,
|
|
|
- config: DeepVariantConfig,
|
|
|
- },
|
|
|
- ClairS {
|
|
|
- id: String,
|
|
|
- diag_bam: String,
|
|
|
- mrd_bam: String,
|
|
|
- config: ClairSConfig,
|
|
|
- },
|
|
|
- NanomonSV {
|
|
|
- id: String,
|
|
|
- diag_bam: String,
|
|
|
- mrd_bam: String,
|
|
|
- config: NanomonSVConfig,
|
|
|
- },
|
|
|
- Variants {
|
|
|
- id: String,
|
|
|
- config: VariantsConfig,
|
|
|
- },
|
|
|
-}
|
|
|
-
|
|
|
-impl CollectionsTasks {
|
|
|
- pub fn run(self) -> anyhow::Result<()> {
|
|
|
- match self {
|
|
|
- CollectionsTasks::Align(case) => {
|
|
|
- BasecallAlign::init(case.clone(), Config::default())?.run_pipe()?;
|
|
|
- }
|
|
|
- CollectionsTasks::DemuxAlign(cases) => {
|
|
|
- BasecallAlign::from_mux(cases, Config::default())?;
|
|
|
- }
|
|
|
- CollectionsTasks::DeepVariant {
|
|
|
- id,
|
|
|
- time_point,
|
|
|
- bam,
|
|
|
- config,
|
|
|
- } => {
|
|
|
- DeepVariant::new(&id, &time_point, &bam, config).run();
|
|
|
- }
|
|
|
- CollectionsTasks::ClairS {
|
|
|
- id,
|
|
|
- diag_bam,
|
|
|
- mrd_bam,
|
|
|
- config,
|
|
|
- } => {
|
|
|
- ClairS::new(&id, &diag_bam, &mrd_bam, config).run();
|
|
|
- }
|
|
|
- CollectionsTasks::NanomonSV {
|
|
|
- id,
|
|
|
- diag_bam,
|
|
|
- mrd_bam,
|
|
|
- config,
|
|
|
- } => {
|
|
|
- NanomonSV::new(&id, &diag_bam, &mrd_bam, config).run();
|
|
|
- }
|
|
|
- CollectionsTasks::WholeScan {
|
|
|
- id,
|
|
|
- time_point,
|
|
|
- bam,
|
|
|
- config,
|
|
|
- } => {
|
|
|
- WholeScan::new(id, time_point, bam, config)?.run()?;
|
|
|
- }
|
|
|
- CollectionsTasks::Variants { id, config } => {
|
|
|
- Variants::new(id, config).run()?;
|
|
|
- }
|
|
|
- CollectionsTasks::Assemble {
|
|
|
- id,
|
|
|
- time_point,
|
|
|
- config,
|
|
|
- } => {
|
|
|
- Assembler::new(id, time_point, config).run()?;
|
|
|
- }
|
|
|
- }
|
|
|
- Ok(())
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-// Implement Display for CollectionsTasks
|
|
|
-impl fmt::Display for CollectionsTasks {
|
|
|
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
- use CollectionsTasks::*;
|
|
|
-
|
|
|
- match self {
|
|
|
- Align(case) => write!(f, "Align task with: {:#?}", case),
|
|
|
- DemuxAlign(cases) => write!(f, "DemuxAlign task with: {:#?}", cases),
|
|
|
- DeepVariant {
|
|
|
- id,
|
|
|
- time_point,
|
|
|
- bam,
|
|
|
- ..
|
|
|
- } => {
|
|
|
- write!(
|
|
|
- f,
|
|
|
- "DeepVariant task with id: {}, time_point: {}, bam: {}",
|
|
|
- id, time_point, bam
|
|
|
- )
|
|
|
- }
|
|
|
- ClairS {
|
|
|
- id,
|
|
|
- diag_bam,
|
|
|
- mrd_bam,
|
|
|
- ..
|
|
|
- } => {
|
|
|
- write!(
|
|
|
- f,
|
|
|
- "ClairS task with id: {}, diag_bam: {}, mrd_bam: {}",
|
|
|
- id, diag_bam, mrd_bam
|
|
|
- )
|
|
|
- }
|
|
|
- NanomonSV {
|
|
|
- id,
|
|
|
- diag_bam,
|
|
|
- mrd_bam,
|
|
|
- ..
|
|
|
- } => {
|
|
|
- write!(
|
|
|
- f,
|
|
|
- "NanomonSV task with id: {}, diag_bam: {}, mrd_bam: {}",
|
|
|
- id, diag_bam, mrd_bam
|
|
|
- )
|
|
|
- }
|
|
|
- WholeScan { id, bam, .. } => write!(f, "Whole scan for id: {}, bam: {}", id, bam),
|
|
|
- Variants { id, .. } => write!(f, "Variants aggregation for id: {}", id),
|
|
|
- Assemble { id, time_point, .. } => {
|
|
|
- write!(f, "Assembly for id: {}, time point: {}", id, time_point)
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-pub fn run_tasks(config: CollectionsConfig) -> anyhow::Result<()> {
|
|
|
- let mut last_n = Vec::new();
|
|
|
- loop {
|
|
|
- let mut collection = Collections::new(config.clone())?;
|
|
|
- collection.todo()?;
|
|
|
- if collection.tasks.is_empty() {
|
|
|
- warn!("All results are update");
|
|
|
- break;
|
|
|
- }
|
|
|
- let n_tasks = collection.tasks.len();
|
|
|
- warn!("{n_tasks} tasks to run");
|
|
|
- if last_n.len() > 2
|
|
|
- && last_n[last_n.len() - 1] == n_tasks
|
|
|
- && last_n[last_n.len() - 2] == n_tasks
|
|
|
- {
|
|
|
- warn!("Tasks don't progress");
|
|
|
- break;
|
|
|
- }
|
|
|
- last_n.push(n_tasks);
|
|
|
- collection.run()?;
|
|
|
- }
|
|
|
- Ok(())
|
|
|
-}
|