Your Name 1 anno fa
parent
commit
d08c603a67

+ 235 - 0
:

@@ -0,0 +1,235 @@
+use std::{
+    fs::{self, File, Metadata}, io::Read, path::PathBuf, str::FromStr
+};
+
+use anyhow::{anyhow, Context};
+use glob::glob;
+use hashbrown::HashMap;
+use log::warn;
+use pandora_lib_bindings::{
+    progs::cramino::{Cramino, CraminoRes},
+    utils::RunBin,
+};
+use rayon::prelude::*;
+use serde::{Deserialize, Serialize};
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+pub struct Bam {
+    pub id: String,
+    pub time_point: String,
+    pub reference_genome: String,
+    pub bam_type: BamType,
+    pub path: PathBuf,
+    #[serde(with = "metadata_serde")]
+    pub file_metadata: Metadata,
+    // #[serde(skip)]
+// pub file_metadata: Metadata,
+    pub cramino: Option<CraminoRes>,
+    pub composition: Vec<(String, f64)>,
+}
+
+#[derive(Debug, PartialEq, Clone, Deserialize, Serialize)]
+pub enum BamType {
+    WGS,
+    Panel(String),
+    ChIP(String),
+}
+
+impl Bam {
+    pub fn new(path: PathBuf) -> anyhow::Result<Self> {
+        let stem = path
+            .clone()
+            .file_stem()
+            .context("Can't parse stem from {path}")?
+            .to_string_lossy()
+            .to_string();
+        let stem: Vec<&str> = stem.split('_').collect();
+
+        if stem.len() > 4 || stem.len() < 3 {
+            return Err(anyhow!("Error in bam name: {}", path.display()));
+        }
+
+        let id = stem[0].to_string();
+        let time_point = stem[1].to_string();
+        let reference_genome = stem
+            .last()
+            .context("Can't get last from stem {stem}")?
+            .to_string();
+
+        let bam_type = if stem.len() == 4 {
+            match stem[2] {
+                "oncoT" => BamType::Panel("oncoT".to_string()),
+                "H3K27ac" => BamType::ChIP("H3K27ac".to_string()),
+                "H3K4me3" => BamType::ChIP("H3K4me3".to_string()),
+                _ => return Err(anyhow!("Error in bam name: {}", path.display())),
+            }
+        } else {
+            BamType::WGS
+        };
+
+        let tp_dir = path
+            .parent()
+            .context("Can't parse parent from: {bam_path}")?;
+        let cramino_path = format!(
+            "{}/{id}_{time_point}_hs1_cramino.txt",
+            tp_dir.to_string_lossy()
+        );
+        let file_metadata = fs::metadata(&path)?;
+
+        let cramino = if bam_type == BamType::WGS {
+            if !PathBuf::from_str(&cramino_path)?.exists() {
+                return Err(anyhow!("Cramino file missing {cramino_path}"));
+            }
+            let mut cramino = Cramino::default().with_result_path(&cramino_path);
+            cramino
+                .parse_results()
+                .context(format!("Error while parsing cramino for {cramino_path}"))?;
+
+            if let Some(cramino) = cramino.results {
+                Some(cramino)
+            } else {
+                return Err(anyhow!("Cramino results parsing failed"));
+            }
+        } else {
+            None
+        };
+
+        let composition =
+            pandora_lib_pileup::bam_compo(path.to_string_lossy().as_ref(), 20000).context(
+                format!("Error while reading BAM composition for {}", path.display()),
+            )?;
+
+        Ok(Self {
+            path,
+            // file_metadata,
+            cramino,
+            id: id.to_string(),
+            time_point: time_point.to_string(),
+            bam_type,
+            reference_genome,
+            composition,
+        })
+    }
+
+    pub fn load_json(path: &str) -> anyhow::Result<Self> {
+        let f = File::open(path)?;
+        let s: Self = serde_json::from_reader(f)?;
+        Ok(s)
+    }
+
+    pub fn save_json(path: &str) -> anyhow::Result<()> {
+
+    }
+}
+
+#[derive(Debug)]
+pub struct BamCollection {
+    pub bams: Vec<Bam>,
+}
+
+impl BamCollection {
+    pub fn new(result_dir: &str) -> Self {
+        load_bam_collection(result_dir)
+    }
+
+    pub fn by_acquisition_id(&self) -> HashMap<String, Vec<&Bam>> {
+        let mut acq: HashMap<String, Vec<&Bam>> = HashMap::new();
+        for bam in self.bams.iter() {
+            for (acq_id, _) in bam.composition.iter() {
+                if let Some(entry) = acq.get_mut(acq_id) {
+                    entry.push(bam);
+                } else {
+                    acq.insert(acq_id.to_string(), vec![bam]);
+                }
+            }
+        }
+        acq
+    }
+
+    pub fn get(&self, id: &str, time_point: &str) -> Vec<&Bam> {
+        self.bams
+            .iter()
+            .filter(|b| b.id == id && b.time_point == time_point)
+            .collect()
+    }
+
+    pub fn by_id_completed(&self, min_diag_cov: f32, min_mrd_cov: f32) -> Vec<Bam> {
+        self.bams
+            .iter()
+            .filter(|b| matches!(b.bam_type, BamType::WGS))
+            .filter(|b| match &b.cramino {
+                Some(cramino) => match b.time_point.as_str() {
+                    "diag" => cramino.mean_length >= min_diag_cov as f64,
+                    "mrd" => cramino.mean_length >= min_mrd_cov as f64,
+                    _ => false
+                },
+                _ => false,
+            })
+            .cloned()
+            .collect()
+    }
+}
+
+pub fn load_bam_collection(result_dir: &str) -> BamCollection {
+    let pattern = format!("{}/*/*/*.bam", result_dir);
+    let bams = glob(&pattern)
+        .expect("Failed to read glob pattern")
+        .par_bridge()
+        .filter_map(|entry| {
+            match entry {
+                Ok(path) => match Bam::new(path) {
+                    Ok(bam) => return Some(bam),
+                    Err(e) => warn!("{e}"),
+                },
+                Err(e) => warn!("Error: {:?}", e),
+            }
+            None
+        })
+        .collect();
+
+    BamCollection { bams }
+}
+
+mod metadata_serde {
+    use super::*;
+    use serde::{Serializer, Deserializer};
+
+    #[derive(Serialize, Deserialize)]
+    struct SerializableMetadata {
+        len: u64,
+        modified: u64,
+        created: u64,
+    }
+
+    pub fn serialize<S>(metadata: &Metadata, serializer: S) -> Result<S::Ok, S::Error>
+    where
+        S: Serializer,
+    {
+        let serializable = SerializableMetadata {
+            len: metadata.len(),
+            modified: metadata.modified()
+                .unwrap_or(UNIX_EPOCH)
+                .duration_since(UNIX_EPOCH)
+                .unwrap_or_default()
+                .as_secs(),
+            created: metadata.created()
+                .unwrap_or(UNIX_EPOCH)
+                .duration_since(UNIX_EPOCH)
+                .unwrap_or_default()
+                .as_secs(),
+        };
+        serializable.serialize(serializer)
+    }
+
+    pub fn deserialize<'de, D>(deserializer: D) -> Result<Metadata, D::Error>
+    where
+        D: Deserializer<'de>,
+    {
+        let serializable = SerializableMetadata::deserialize(deserializer)?;
+        let file = tempfile::tempfile().map_err(serde::de::Error::custom)?;
+        let metadata = file.metadata().map_err(serde::de::Error::custom)?;
+        Ok(metadata)
+    }
+}
+
+

File diff suppressed because it is too large
+ 639 - 29
Cargo.lock


+ 4 - 0
Cargo.toml

@@ -12,6 +12,8 @@ pandora_lib_pod5 = { git = "https://git.t0m4.fr/Thomas/pandora_lib_pod5.git" }
 pandora_lib_pileup = { git = "https://git.t0m4.fr/Thomas/pandora_lib_pileup.git" }
 pandora_lib_bindings = { git = "https://git.t0m4.fr/Thomas/pandora_lib_bindings.git" }
 pandora_lib_scan = { git = "https://git.t0m4.fr/Thomas/pandora_lib_scan.git" }
+pandora_lib_variants = { git = "https://git.t0m4.fr/Thomas/pandora_lib_variants.git" }
+pandora_lib_assembler = { git = "https://git.t0m4.fr/Thomas/pandora_lib_assembler.git" }
 regex = "1.10.5"
 chrono = { version = "0.4.38", features = ["serde"] }
 csv = "1.3.0"
@@ -35,3 +37,5 @@ rayon = "1.10.0"
 hashbrown = { version = "0.14.5", features = ["rayon"] }
 ctrlc = "3.4.4"
 lazy_static = "1.5.0"
+indicatif = "0.17.8"
+tempfile = "3.12.0"

+ 1 - 1
src/callers/clairs.rs

@@ -5,7 +5,7 @@ use crate::{
 use std::{fs, path::Path};
 use tracing::info;
 
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct ClairSConfig {
     pub result_dir: String,
     pub reference: String,

+ 1 - 1
src/callers/deep_variant.rs

@@ -6,7 +6,7 @@ use crate::{
     runners::{run_wait, DockerRun},
 };
 
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct DeepVariantConfig {
     pub bin_version: String,
     pub threads: u8,

+ 39 - 10
src/collection/bam.rs

@@ -1,10 +1,11 @@
 use std::{
-    fs::{self, Metadata},
+    fs::{self, File},
     path::PathBuf,
     str::FromStr,
 };
 
 use anyhow::{anyhow, Context};
+use chrono::{DateTime, Utc};
 use glob::glob;
 use hashbrown::HashMap;
 use log::warn;
@@ -13,20 +14,21 @@ use pandora_lib_bindings::{
     utils::RunBin,
 };
 use rayon::prelude::*;
+use serde::{Deserialize, Serialize};
 
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, Deserialize, Serialize)]
 pub struct Bam {
     pub id: String,
     pub time_point: String,
     pub reference_genome: String,
     pub bam_type: BamType,
     pub path: PathBuf,
-    pub file_metadata: Metadata,
+    pub modified: DateTime<Utc>,
     pub cramino: Option<CraminoRes>,
     pub composition: Vec<(String, f64)>,
 }
 
-#[derive(Debug, PartialEq, Clone)]
+#[derive(Debug, PartialEq, Clone, Deserialize, Serialize)]
 pub enum BamType {
     WGS,
     Panel(String),
@@ -49,6 +51,7 @@ impl Bam {
 
         let id = stem[0].to_string();
         let time_point = stem[1].to_string();
+
         let reference_genome = stem
             .last()
             .context("Can't get last from stem {stem}")?
@@ -64,15 +67,27 @@ impl Bam {
         } else {
             BamType::WGS
         };
+        let file_metadata = fs::metadata(&path)?;
+        let modified = file_metadata.modified()?;
 
         let tp_dir = path
             .parent()
             .context("Can't parse parent from: {bam_path}")?;
+
+        let json_path = format!(
+            "{}/{id}_{time_point}_{reference_genome}_info.json",
+            tp_dir.to_string_lossy()
+        );
+        let json_file = PathBuf::from(&json_path);
+
+        if json_file.exists() && json_file.metadata()?.modified()? > modified {
+            return Self::load_json(&json_path);
+        }
+
         let cramino_path = format!(
-            "{}/{id}_{time_point}_hs1_cramino.txt",
+            "{}/{id}_{time_point}_{reference_genome}_cramino.txt",
             tp_dir.to_string_lossy()
         );
-        let file_metadata = fs::metadata(&path)?;
 
         let cramino = if bam_type == BamType::WGS {
             if !PathBuf::from_str(&cramino_path)?.exists() {
@@ -97,16 +112,30 @@ impl Bam {
                 format!("Error while reading BAM composition for {}", path.display()),
             )?;
 
-        Ok(Self {
+        let s = Self {
             path,
-            file_metadata,
             cramino,
             id: id.to_string(),
             time_point: time_point.to_string(),
             bam_type,
             reference_genome,
             composition,
-        })
+            modified: modified.into(),
+        };
+        s.save_json(&json_path)?;
+        Ok(s)
+    }
+
+    pub fn load_json(path: &str) -> anyhow::Result<Self> {
+        let f = File::open(path)?;
+        let s: Self = serde_json::from_reader(f)?;
+        Ok(s)
+    }
+
+    pub fn save_json(&self, path: &str) -> anyhow::Result<()> {
+        let f = File::create(path)?;
+        serde_json::to_writer(f, self)?;
+        Ok(())
     }
 }
 
@@ -149,7 +178,7 @@ impl BamCollection {
                 Some(cramino) => match b.time_point.as_str() {
                     "diag" => cramino.mean_length >= min_diag_cov as f64,
                     "mrd" => cramino.mean_length >= min_mrd_cov as f64,
-                    _ => false
+                    _ => false,
                 },
                 _ => false,
             })

+ 104 - 21
src/collection/mod.rs

@@ -1,12 +1,15 @@
 use std::{
     collections::HashMap,
-    fmt, fs,
+    fmt,
+    fs::{self, metadata},
     path::{Path, PathBuf},
+    time::SystemTime,
 };
 
 use anyhow::Context;
+use chrono::{DateTime, Utc};
+use glob::glob;
 use log::{info, warn};
-use pandora_lib_scan::par_whole_scan;
 
 use self::{bam::BamCollection, pod5::Pod5Collection, vcf::VcfCollection};
 use crate::{
@@ -18,7 +21,11 @@ use crate::{
     collection::pod5::FlowCellCase,
     commands::dorado::Dorado as BasecallAlign,
     config::Config,
-    functions::whole_scan::{WholeScan, WholeScanConfig},
+    functions::{
+        assembler::{Assembler, AssemblerConfig},
+        variants::{Variants, VariantsConfig},
+        whole_scan::{WholeScan, WholeScanConfig},
+    },
 };
 
 pub mod bam;
@@ -131,8 +138,8 @@ impl Collections {
                 &config.result_dir, bam.id, bam.time_point, config.scan_dir
             );
             if PathBuf::from(&scan_dir).exists() {
-                let dir_mod = fs::metadata(&scan_dir)?.modified()?;
-                if bam.file_metadata.modified()? > dir_mod {
+                let dir_mod: DateTime<Utc> = fs::metadata(&scan_dir)?.modified()?.into();
+                if bam.modified > dir_mod {
                     fs::remove_dir_all(&scan_dir)?;
                 }
             }
@@ -148,11 +155,6 @@ impl Collections {
                         .to_string(),
                     config: WholeScanConfig::default(),
                 });
-                // par_whole_scan(
-                //     "/data/ref/hs1/chm13v2.0.dict",
-                //     bam.path.to_str().context("Cant convert path to string")?,
-                //     &scan_dir,
-                // )?;
             }
         }
 
@@ -163,21 +165,16 @@ impl Collections {
                 self.bam.get(id, "diag").first(),
                 self.bam.get(id, "mrd").first(),
             ) {
-                let diag_modified = diag
-                    .file_metadata
-                    .modified()
-                    .expect("Can't read Bam modified time.");
-                let mrd_modified = mrd
-                    .file_metadata
-                    .modified()
-                    .expect("Can't read Bam modified time.");
+                let diag_modified = diag.modified;
+                let mrd_modified = mrd.modified;
                 let mut rm_paths: Vec<&Path> = vcfs
                     .iter()
                     .flat_map(|vcf| {
-                        let vcf_mod = vcf
+                        let vcf_mod: DateTime<Utc> = vcf
                             .file_metadata
                             .modified()
-                            .expect("Can't read VCF modified time.");
+                            .expect("Can't read VCF modified time.")
+                            .into();
 
                         // For somatic caller erase if one bam (diag or mrd) is more recent.
                         if vcf.caller != "DeepVariant" {
@@ -263,6 +260,13 @@ impl Collections {
         });
 
         // Variants aggregation
+        // info!("Looking for variants aggregation tasks...");
+        // self.bam.bams.iter().filter(|b| b.time_point == "diag" ).for_each(|bam| {
+        //     let id = bam.id;
+        // });
+
+        // de novo
+        tasks.extend(self.todo_assembler()?);
 
         // Tasks sorting and dedup
         let mut hs = HashMap::new();
@@ -273,6 +277,62 @@ impl Collections {
         Ok(())
     }
 
+    pub fn tasks_dedup(&mut self) {
+        let mut hs = HashMap::new();
+        self.tasks.clone().into_iter().for_each(|t| {
+            hs.insert(t.to_string(), t);
+        });
+        self.tasks = hs.into_values().collect();
+    }
+
+    pub fn todo_assembler(&mut self) -> anyhow::Result<Vec<CollectionsTasks>> {
+        let mut tasks = Vec::new();
+        let config = AssemblerConfig::default();
+        for b in &self.bam.bams {
+            let assemblies_dir = format!(
+                "{}/{}/{}/{}",
+                config.result_dir, b.id, b.time_point, config.output_dir_name
+            );
+
+            if !Path::new(&assemblies_dir).exists() {
+                tasks.push(CollectionsTasks::Assemble {
+                    id: b.id.clone(),
+                    time_point: b.time_point.clone(),
+                    config: config.clone(),
+                });
+                continue;
+            }
+
+            let pattern = format!("{assemblies_dir}/*/*.bam");
+            let mut mtimes: Vec<SystemTime> = glob(&pattern)?
+                .filter_map(|entry| entry.ok())
+                .filter_map(|path| metadata(path).ok()?.modified().ok())
+                .collect();
+
+            if mtimes.is_empty() {
+                tasks.push(CollectionsTasks::Assemble {
+                    id: b.id.clone(),
+                    time_point: b.time_point.clone(),
+                    config: config.clone(),
+                });
+                continue;
+            }
+            mtimes.sort_unstable();
+            mtimes.dedup();
+            let max_mtime: DateTime<Utc> =
+                mtimes.last().context("No modified time")?.to_owned().into();
+            if b.modified > max_mtime {
+                tasks.push(CollectionsTasks::Assemble {
+                    id: b.id.clone(),
+                    time_point: b.time_point.clone(),
+                    config: config.clone(),
+                });
+            }
+        }
+
+        Ok(tasks)
+    }
+
     pub fn run(&mut self) -> anyhow::Result<()> {
         // self.tasks.reverse();
         if self.tasks.is_empty() {
@@ -297,7 +357,7 @@ impl Collections {
     }
 }
 
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub enum CollectionsTasks {
     Align(FlowCellCase),
     DemuxAlign(Vec<FlowCellCase>),
@@ -325,6 +385,15 @@ pub enum CollectionsTasks {
         bam: String,
         config: WholeScanConfig,
     },
+    Variants {
+        id: String,
+        config: VariantsConfig,
+    },
+    Assemble {
+        id: String,
+        time_point: String,
+        config: AssemblerConfig,
+    },
 }
 
 impl CollectionsTasks {
@@ -368,6 +437,16 @@ impl CollectionsTasks {
             } => {
                 WholeScan::new(id, time_point, bam, config)?.run()?;
             }
+            CollectionsTasks::Variants { id, config } => {
+                Variants::new(id, config).run()?;
+            }
+            CollectionsTasks::Assemble {
+                id,
+                time_point,
+                config,
+            } => {
+                Assembler::new(id, time_point, config).run()?;
+            }
         }
         Ok(())
     }
@@ -418,6 +497,10 @@ impl fmt::Display for CollectionsTasks {
                 )
             }
             WholeScan { id, bam, .. } => write!(f, "Whole scan for id: {}, bam: {}", id, bam),
+            Variants { id, .. } => write!(f, "Variants aggregation for id: {}", id),
+            Assemble { id, time_point, .. } => {
+                write!(f, "Assembly for id: {}, time point: {}", id, time_point)
+            }
         }
     }
 }

+ 21 - 10
src/collection/variants.rs

@@ -1,5 +1,6 @@
 use std::{fs::Metadata, path::PathBuf};
 
+use anyhow::Context;
 use glob::glob;
 use log::warn;
 use rayon::prelude::*;
@@ -18,16 +19,26 @@ pub struct VariantsCase {
 impl VariantsCase {
     pub fn new(path: PathBuf) -> anyhow::Result<Self> {
         let id = path
-            .parent()
-            .unwrap()
-            .parent()
-            .unwrap()
-            .file_name()
-            .unwrap()
-            .to_string_lossy()
-            .to_string();
-        let file_metadata = path.metadata()?;
-        Ok(VariantsCase { path, id, file_metadata })
+            .ancestors()
+            .nth(2)
+            .and_then(|p| p.file_name())
+            .and_then(|name| name.to_str())
+            .map(String::from)
+            .context(format!(
+                "Invalid path structure: unable to extract ID for {}",
+                path.display()
+            ))?;
+
+        let file_metadata = path.metadata().context(format!(
+            "Failed to read file metadata for {}",
+            path.display()
+        ))?;
+
+        Ok(VariantsCase {
+            path,
+            id,
+            file_metadata,
+        })
     }
 }
 

+ 57 - 0
src/functions/assembler.rs

@@ -0,0 +1,57 @@
+use pandora_lib_assembler::assembler::{
+    assemble_whole, spades::SpadesConfig, wtdbg2::Wtdbg2Config, AssembleConfig,
+};
+
+#[derive(Debug, Clone)]
+pub struct AssemblerConfig {
+    pub result_dir: String,
+    pub scan_dir_name: String,
+    pub output_dir_name: String,
+    pub dict_file: String,
+}
+
+impl Default for AssemblerConfig {
+    fn default() -> Self {
+        Self {
+            result_dir: "/data/longreads_basic_pipe".to_string(),
+            scan_dir_name: "scan".to_string(),
+            output_dir_name: "assemblies".to_string(),
+            dict_file: "/data/ref/hs1/chm13v2.0.dict".to_string(),
+        }
+    }
+}
+
+#[derive(Debug)]
+pub struct Assembler {
+    pub id: String,
+    pub time_point: String,
+    pub config: AssemblerConfig,
+}
+
+impl Assembler {
+    pub fn new(
+        id: String,
+        time_point: String,
+        config: AssemblerConfig,
+    ) -> Self {
+        Assembler {
+            id,
+            time_point,
+            config,
+        }
+    }
+    pub fn run(&self) -> anyhow::Result<()> {
+        let case_dir = format!("{}/{}/{}", self.config.result_dir, self.id, self.time_point);
+        let scan_reads_dir = format!("{case_dir}/{}/reads", self.config.scan_dir_name);
+        let output_dir = format!("{case_dir}/assemblies");
+
+        assemble_whole(
+            &scan_reads_dir,
+            &[
+                AssembleConfig::Spades(SpadesConfig::new(&output_dir)),
+                AssembleConfig::Wtdbg2(Wtdbg2Config::new(&output_dir)),
+            ],
+            1,
+        )
+    }
+}

+ 2 - 1
src/functions/mod.rs

@@ -1,2 +1,3 @@
 pub mod whole_scan;
-
+pub mod variants;
+pub mod assembler;

+ 35 - 0
src/functions/variants.rs

@@ -0,0 +1,35 @@
+use indicatif::MultiProgress;
+use pandora_lib_variants::variants;
+
+#[derive(Debug, Clone)]
+pub struct VariantsConfig {
+    pub result_dir: String,
+}
+
+impl Default for VariantsConfig {
+    fn default() -> Self {
+        Self {
+            result_dir: "/data/longreads_basic_pipe".to_string(),
+        }
+    }
+}
+
+#[derive(Debug)]
+pub struct Variants {
+    pub id: String,
+    pub config: VariantsConfig,
+}
+
+impl Variants {
+    pub fn new(id: String, config: VariantsConfig) -> Self {
+        Self { id, config }
+    }
+    pub fn run(&self) -> anyhow::Result<()> {
+        let multi = MultiProgress::default();
+        multi.suspend(|| {
+            variants::run_pipe(&self.id, &multi).unwrap();
+        });
+
+        Ok(())
+    }
+}

+ 1 - 1
src/functions/whole_scan.rs

@@ -1,6 +1,6 @@
 use pandora_lib_scan::par_whole_scan;
 
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct WholeScanConfig {
     pub result_dir: String,
     pub scan_dir: String,

+ 9 - 0
src/lib.rs

@@ -163,4 +163,13 @@ mod tests {
         res.iter().for_each(|(id, tp, path)| println!("{id}\t{tp}\t{path}"));
         Ok(())
     }
+
+    #[test_log::test]
+    fn todo_assembler() -> anyhow::Result<()> {
+        let mut collections = Collections::new(
+            CollectionsConfig::default()
+        )?;
+        collections.todo_assembler()?;
+        Ok(())
+    }
 }

Some files were not shown because too many files changed in this diff