Thomas 1 year ago
parent
commit
ae3d3c1ee9
6 changed files with 245 additions and 44 deletions
  1. 57 26
      src/collection/mod.rs
  2. 135 0
      src/collection/modbases.rs
  3. 1 1
      src/collection/pod5.rs
  4. 10 6
      src/commands/bcftools.rs
  5. 13 11
      src/commands/modkit.rs
  6. 29 0
      src/lib.rs

+ 57 - 26
src/collection/mod.rs

@@ -11,6 +11,7 @@ use anyhow::Context;
 use chrono::{DateTime, Utc};
 use glob::glob;
 use log::{error, info, warn};
+use modbases::ModBasesCollection;
 
 use self::{bam::BamCollection, pod5::Pod5Collection, vcf::VcfCollection};
 use crate::{
@@ -20,7 +21,10 @@ use crate::{
         nanomonsv::{NanomonSV, NanomonSVConfig},
     },
     collection::pod5::FlowCellCase,
-    commands::dorado::Dorado as BasecallAlign,
+    commands::{
+        dorado::Dorado as BasecallAlign,
+        modkit::{bed_methyl, ModkitConfig},
+    },
     config::Config,
     functions::{
         assembler::{Assembler, AssemblerConfig},
@@ -30,6 +34,7 @@ use crate::{
 };
 
 pub mod bam;
+pub mod modbases;
 pub mod pod5;
 pub mod variants;
 pub mod vcf;
@@ -58,12 +63,14 @@ impl Default for CollectionsConfig {
         }
     }
 }
+
 #[derive(Debug)]
 pub struct Collections {
     pub config: CollectionsConfig,
     pub pod5: Pod5Collection,
     pub bam: BamCollection,
     pub vcf: VcfCollection,
+    pub modbases: ModBasesCollection,
     pub tasks: Vec<CollectionsTasks>,
 }
 
@@ -78,11 +85,13 @@ impl Collections {
         let pod5 = Pod5Collection::new(pod_dir, corrected_fc_path, result_dir)?;
         let bam = BamCollection::new(result_dir);
         let vcf = VcfCollection::new(result_dir);
+        let modbases = ModBasesCollection::new(result_dir);
 
         Ok(Self {
             pod5,
             bam,
             vcf,
+            modbases,
             tasks: Vec::new(),
             config,
         })
@@ -276,6 +285,9 @@ impl Collections {
         // Variants aggregation
         self.tasks.extend(self.todo_variants_agg()?);
 
+        // ModPileup
+        self.tasks.extend(self.todo_mod_pileup());
+
         // Tasks sorting
         self.tasks.sort_by_cached_key(|task| task.get_order());
 
@@ -339,6 +351,26 @@ impl Collections {
         Ok(tasks)
     }
 
+    pub fn todo_mod_pileup(&self) -> Vec<CollectionsTasks> {
+        let config = ModkitConfig::default();
+        self.bam
+            .bams
+            .iter()
+            .filter_map(|b| {
+                if self.modbases.modbases.iter().any(|mb| {
+                    mb.id == b.id && mb.time_point == b.time_point && mb.pileup_modif > b.modified
+                }) {
+                    None
+                } else {
+                    Some(CollectionsTasks::ModPileup {
+                        bam: b.path.clone(),
+                        config: config.clone(),
+                    })
+                }
+            })
+            .collect()
+    }
+
     /// Generates pairs of diagnostic and MRD BAM files.
     ///
     /// This function performs the following steps:
@@ -503,6 +535,10 @@ pub enum CollectionsTasks {
         time_point: String,
         config: AssemblerConfig,
     },
+    ModPileup {
+        bam: PathBuf,
+        config: ModkitConfig,
+    },
     DeepVariant {
         id: String,
         time_point: String,
@@ -536,48 +572,37 @@ impl CollectionsTasks {
             CollectionsTasks::DemuxAlign(cases) => {
                 BasecallAlign::from_mux(cases, Config::default())
             }
+            CollectionsTasks::ModPileup { bam, config } => bed_methyl(bam, &config),
             CollectionsTasks::DeepVariant {
                 id,
                 time_point,
                 bam,
                 config,
-            } => {
-                DeepVariant::new(&id, &time_point, &bam, config).run()
-            }
+            } => DeepVariant::new(&id, &time_point, &bam, config).run(),
             CollectionsTasks::ClairS {
                 id,
                 diag_bam,
                 mrd_bam,
                 config,
-            } => {
-                ClairS::new(&id, &diag_bam, &mrd_bam, config).run()
-            }
+            } => ClairS::new(&id, &diag_bam, &mrd_bam, config).run(),
             CollectionsTasks::NanomonSV {
                 id,
                 diag_bam,
                 mrd_bam,
                 config,
-            } => {
-                NanomonSV::new(&id, &diag_bam, &mrd_bam, config).run()
-            }
+            } => NanomonSV::new(&id, &diag_bam, &mrd_bam, config).run(),
             CollectionsTasks::WholeScan {
                 id,
                 time_point,
                 bam,
                 config,
-            } => {
-                WholeScan::new(id, time_point, bam, config)?.run()
-            }
-            CollectionsTasks::Variants { id, config } => {
-                Variants::new(id, config).run()
-            }
+            } => WholeScan::new(id, time_point, bam, config)?.run(),
+            CollectionsTasks::Variants { id, config } => Variants::new(id, config).run(),
             CollectionsTasks::Assemble {
                 id,
                 time_point,
                 config,
-            } => {
-                Assembler::new(id, time_point, config).run()
-            }
+            } => Assembler::new(id, time_point, config).run(),
         }
     }
 
@@ -585,12 +610,13 @@ impl CollectionsTasks {
         match self {
             CollectionsTasks::Align(_) => 0,
             CollectionsTasks::DemuxAlign(_) => 1,
-            CollectionsTasks::WholeScan { .. } => 2,
-            CollectionsTasks::Assemble { .. } => 3,
-            CollectionsTasks::DeepVariant { .. } => 4,
-            CollectionsTasks::ClairS { .. } => 5,
-            CollectionsTasks::NanomonSV { .. } => 6,
-            CollectionsTasks::Variants { .. } => 7,
+            CollectionsTasks::ModPileup { .. } => 2,
+            CollectionsTasks::WholeScan { .. } => 3,
+            CollectionsTasks::Assemble { .. } => 4,
+            CollectionsTasks::DeepVariant { .. } => 5,
+            CollectionsTasks::ClairS { .. } => 6,
+            CollectionsTasks::NanomonSV { .. } => 7,
+            CollectionsTasks::Variants { .. } => 8,
         }
     }
 }
@@ -601,7 +627,11 @@ impl fmt::Display for CollectionsTasks {
         use CollectionsTasks::*;
 
         match self {
-            Align(case) => write!(f, "Alignment task for: {} {} {}", case.id, case.time_point, case.barcode),
+            Align(case) => write!(
+                f,
+                "Alignment task for: {} {} {}",
+                case.id, case.time_point, case.barcode
+            ),
             DemuxAlign(cases) => write!(
                 f,
                 "Demultiplex and alignment task for: {}",
@@ -657,6 +687,7 @@ impl fmt::Display for CollectionsTasks {
             Assemble { id, time_point, .. } => {
                 write!(f, "De novo assemblage for {} {}", id, time_point)
             }
+            ModPileup { bam, .. } => write!(f, "ModPileup for {}", bam.display()),
         }
     }
 }

+ 135 - 0
src/collection/modbases.rs

@@ -0,0 +1,135 @@
+use std::{
+    fs::{self},
+    path::PathBuf,
+    str::FromStr,
+};
+
+use anyhow::Context;
+use chrono::{DateTime, Utc};
+use log::warn;
+use serde::{Deserialize, Serialize};
+
+use glob::glob;
+use rayon::prelude::*;
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+pub struct ModBases {
+    pub id: String,
+    pub time_point: String,
+    pub mod_type: ModType,
+    pub pileup: PathBuf,
+    pub pileup_modif: DateTime<Utc>,
+    pub dmr_files: Vec<PathBuf>,
+}
+
+#[derive(Debug, PartialEq, Clone, Deserialize, Serialize)]
+pub enum ModType {
+    #[serde(rename = "5mC_5hmC")]
+    Mod5mC5hmC,
+}
+
+impl FromStr for ModType {
+    type Err = anyhow::Error;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        match s {
+            "5mC_5hmC" => Ok(ModType::Mod5mC5hmC),
+            _ => Err(anyhow::anyhow!("Unknown ModType: {}", s)),
+        }
+    }
+}
+impl ModBases {
+    pub fn new(path: PathBuf) -> anyhow::Result<Self> {
+        let dir = path
+            .parent()
+            .context(format!("Can't open the parent dir of {}", path.display()))?;
+        let filename = path
+            .file_name()
+            .context(format!("Can't parse filename for {}", path.display()))?
+            .to_string_lossy()
+            .to_string();
+
+        // /BACOU_diag_5mC_5hmC_modPileup.bed.gz
+        let parts: Vec<&str> = filename.split("_").collect();
+
+        if parts.len() < 4 {
+            anyhow::bail!("Error in modPileup filename format {}", path.display());
+        }
+
+        let id = parts.first().unwrap().to_string();
+        let time_point = parts.get(1).unwrap().to_string();
+        let modif = parts.get(2..parts.len() - 1).unwrap().join("_");
+        let mod_type = ModType::from_str(&modif)?;
+
+        let entries = fs::read_dir(dir)?;
+
+        let dmr_files: Vec<PathBuf> = entries
+            .filter_map(|entry| {
+                let entry = entry.ok()?;
+                let path = entry.path();
+
+                if path.is_file()
+                    && path
+                        .file_name()
+                        .and_then(|name| name.to_str())
+                        .map(|name| name.contains("_DMR_"))
+                        .unwrap_or(false)
+                {
+                    Some(path)
+                } else {
+                    None
+                }
+            })
+            .collect();
+
+        let pileup_modif: DateTime<Utc> = path
+            .metadata()
+            .context(format!("Can't access metadata for {}", path.display()))?
+            .modified()
+            .context(format!(
+                "Can't access modified date metadata for {}",
+                path.display()
+            ))?
+            .into();
+
+        Ok(Self {
+            id,
+            time_point,
+            mod_type,
+            pileup: path,
+            pileup_modif,
+            dmr_files,
+        })
+    }
+}
+
+#[derive(Debug)]
+pub struct ModBasesCollection {
+    pub modbases: Vec<ModBases>,
+}
+
+impl ModBasesCollection {
+    pub fn new(result_dir: &str) -> Self {
+        load_modbases_collection(result_dir)
+    }
+}
+
+pub fn load_modbases_collection(result_dir: &str) -> ModBasesCollection {
+    let pattern = format!("{result_dir}/*/*/*/*_modPileup.bed.gz");
+    let modbases = glob(&pattern)
+        .expect("Failed to read glob pattern")
+        .par_bridge()
+        .filter_map(|entry| {
+            match entry {
+                Ok(path) => match ModBases::new(path) {
+                    Ok(modb) => return Some(modb),
+                    Err(e) => warn!("{e}"),
+                },
+                Err(e) => warn!("Error: {:?}", e),
+            }
+            None
+        })
+        .collect();
+
+    ModBasesCollection { modbases }
+}

+ 1 - 1
src/collection/pod5.rs

@@ -161,7 +161,7 @@ pub struct FlowCell {
 //     }
 // }
 
-#[derive(Debug)]
+#[derive(Debug, Default)]
 pub struct Pod5Collection {
     pub importation_date: DateTime<Utc>,
     pub runs: Vec<Run>,

+ 10 - 6
src/commands/bcftools.rs

@@ -72,17 +72,21 @@ pub fn bcftools_concat(
     ];
     // Then filter
     let mut cmd_run = CommandRun::new(&config.bin, &args);
-    let res = run_wait(&mut cmd_run)?;
+    let res = run_wait(&mut cmd_run)
+        .context(format!("Error while running `bcftools {}`", args.join(" ")))?;
     fs::remove_file(tmp_file)?;
     Ok(res)
 }
 
-pub fn bcftools_keep_only_in_a(a: &str, b: &str, out: &str, config: &BcftoolsConfig) -> anyhow::Result<()> {
+pub fn bcftools_keep_only_in_a(
+    a: &str,
+    b: &str,
+    out: &str,
+    config: &BcftoolsConfig,
+) -> anyhow::Result<()> {
     let args = ["isec", "-C", "-w", "1", a, b, "-o", out];
     let mut cmd_run = CommandRun::new(&config.bin, &args);
-    let _ = run_wait(&mut cmd_run).context(format!(
-        "Error while running bcftools isec {}",
-        args.join(" ")
-    ))?;
+    let _ = run_wait(&mut cmd_run)
+        .context(format!("Error while running `bcftools {}`", args.join(" ")))?;
     Ok(())
 }

+ 13 - 11
src/commands/modkit.rs

@@ -1,10 +1,10 @@
-use std::{fs, path::Path};
+use std::{fs, path::PathBuf};
 
 use anyhow::Context;
 
 use crate::runners::{run_wait, CommandRun};
 
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct ModkitConfig {
     pub bin: String,
     pub crabz_bin: String,
@@ -23,25 +23,27 @@ impl Default for ModkitConfig {
     }
 }
 
-pub fn bed_methyl(bam: &Path, config: &ModkitConfig) -> anyhow::Result<()> {
+pub fn bed_methyl(bam: PathBuf, config: &ModkitConfig) -> anyhow::Result<()> {
     let dir = bam
         .parent()
         .context(format!("No parent dir for {}", bam.display()))?;
     let meth_dir = dir.join("5mC_5hmC");
+
+    // create modified base dir
     if !meth_dir.exists() {
         fs::create_dir(&meth_dir).context(format!("Can't create {}", meth_dir.display()))?;
     }
-    let time_point_dir = dir
-        .parent()
-        .context(format!("No parent dir for {}", dir.display()))?;
-    let time_point = time_point_dir.file_name().unwrap().to_str().unwrap();
+    // let time_point_dir = dir
+    //     .parent()
+    //     .context(format!("No parent dir for {}", dir.display()))?;
+    let time_point = dir.file_name().unwrap().to_str().unwrap();
 
-    let id_dir = time_point_dir
+    let id_dir = dir
         .parent()
-        .context(format!("No parent dir for {}", time_point_dir.display()))?;
+        .context(format!("No parent dir for {}", dir.display()))?;
     let id = id_dir.file_name().unwrap().to_str().unwrap();
 
-    let pileup_bed = meth_dir.join(format!("{id}_${time_point}_5mC_5hmC_modPileup.bed"));
+    let pileup_bed = meth_dir.join(format!("{id}_{time_point}_5mC_5hmC_modPileup.bed"));
 
     let pileup_bed_str = pileup_bed.to_str().unwrap();
     let mut cmd_run = CommandRun::new(
@@ -73,4 +75,4 @@ pub fn bed_methyl(bam: &Path, config: &ModkitConfig) -> anyhow::Result<()> {
     Ok(())
 }
 
-// odkit dmr pair -m C -t 160 --ref /data/ref/hs1/chm13v2.0.fa -r /data/ref/hs1/chm13v2.0_CGI_corrected.bed -a /data/longreads_basic_pipe/BACOU/mrd/5mC_5hmC/BACOU_mrd_5mC_5hmC.bed.gz -b /data/longreads_basic_pipe/BACOU/diag/5mC_5hmC/BACOU_diag_5mC_5hmC.bed.gz -o /data/longreads_basic_pipe/BACOU/diag/5mC_5hmC/BACOU_diag_5mC_5hmC_DMR_C_mrd.bed.gz
+// modkit dmr pair -m C -t 160 --ref /data/ref/hs1/chm13v2.0.fa -r /data/ref/hs1/chm13v2.0_CGI_corrected.bed -a /data/longreads_basic_pipe/BACOU/mrd/5mC_5hmC/BACOU_mrd_5mC_5hmC.bed.gz -b /data/longreads_basic_pipe/BACOU/diag/5mC_5hmC/BACOU_diag_5mC_5hmC.bed.gz -o /data/longreads_basic_pipe/BACOU/diag/5mC_5hmC/BACOU_diag_5mC_5hmC_DMR_C_mrd.bed.gz

+ 29 - 0
src/lib.rs

@@ -22,7 +22,9 @@ mod tests {
     use std::{fs, path::Path};
 
     use callers::nanomonsv::nanomonsv_create_pon;
+    use commands::modkit::{bed_methyl, ModkitConfig};
     use log::info;
+    use rayon::prelude::*;
 
     use self::{callers::deep_variant::DeepVariantConfig, collection::pod5::{FlowCellCase, Pod5Collection}, commands::dorado, config::Config};
     use super::*;
@@ -245,4 +247,31 @@ mod tests {
         init();
         nanomonsv_create_pon(NanomonSVConfig::default(), "/data/ref/hs1/nanomonsv_pon.vcf.gz")
     }
+
+    #[test]
+    fn todo_mod() -> anyhow::Result<()> {
+        init();
+        let collections = Collections::new(
+            CollectionsConfig::default()
+        )?;
+        collections.todo_mod_pileup();
+        Ok(())
+    }
+
+    #[test]
+    fn run_mod_par() -> anyhow::Result<()> {
+        init();
+        let collections = Collections::new(
+            CollectionsConfig::default()
+        )?;
+        let tasks = collections.todo_mod_pileup();
+        let len = tasks.len();
+
+        tasks.par_iter().enumerate().for_each(|(i, t)| {
+            let config = ModkitConfig {threads: 2,  ..Default::default() };
+            if let collection::CollectionsTasks::ModPileup { bam, .. } = t { let _ = bed_methyl(bam.to_owned(), &config); }
+            println!("⚡ {i}/{len}");
+        });
+        Ok(())
+    }
 }