Thomas 1 year ago
parent
commit
62265a008b
7 changed files with 203 additions and 38 deletions
  1. 6 6
      src/callers/clairs.rs
  2. 7 0
      src/collection/bam.rs
  3. 57 8
      src/collection/mod.rs
  4. 62 4
      src/collection/modbases.rs
  5. 55 1
      src/commands/modkit.rs
  6. 1 1
      src/config.rs
  7. 15 18
      src/lib.rs

+ 6 - 6
src/callers/clairs.rs

@@ -20,7 +20,7 @@ impl Default for ClairSConfig {
             result_dir: "/data/longreads_basic_pipe".to_string(),
             reference: "/data/ref/hs1/chm13v2.0.fa".to_string(),
             threads: 155,
-            platform: "ont_r10_dorado_sup_5khz".to_string(),
+            platform: "ont_r10_dorado_sup_5khz_ssrs".to_string(),
         }
     }
 }
@@ -86,15 +86,15 @@ impl ClairS {
                 &format!("{}:{}", self.output_dir, self.output_dir),
                 "hkubal/clairs:latest",
                 "/opt/bin/run_clairs",
-                "--tumor_bam_fn",
+                "-T",
                 &self.diag_bam,
-                "--normal_bam_fn",
+                "-N",
                 &self.mrd_bam,
-                "--ref_fn",
+                "-R",
                 &self.config.reference,
-                "--threads",
+                "-t",
                 &self.config.threads.to_string(),
-                "--platform",
+                "-p",
                 &self.config.platform,
                 "--enable_indel_calling",
                 "--include_all_ctgs",

+ 7 - 0
src/collection/bam.rs

@@ -185,6 +185,13 @@ impl BamCollection {
             .cloned()
             .collect()
     }
+
+    pub fn ids(&self) -> Vec<String> {
+        let mut ids: Vec<String> = self.bams.iter().map(|b| b.id.clone()).collect();
+        ids.sort();
+        ids.dedup();
+        ids
+    }
 }
 
 pub fn load_bam_collection(result_dir: &str) -> BamCollection {

+ 57 - 8
src/collection/mod.rs

@@ -11,7 +11,7 @@ use anyhow::Context;
 use chrono::{DateTime, Utc};
 use glob::glob;
 use log::{error, info, warn};
-use modbases::ModBasesCollection;
+use modbases::{Dmr, ModBasesCollection, ModType};
 
 use self::{bam::BamCollection, pod5::Pod5Collection, vcf::VcfCollection};
 use crate::{
@@ -23,7 +23,7 @@ use crate::{
     collection::pod5::FlowCellCase,
     commands::{
         dorado::Dorado as BasecallAlign,
-        modkit::{bed_methyl, ModkitConfig},
+        modkit::{bed_methyl, dmr_c_mrd_diag, ModkitConfig},
     },
     config::Config,
     functions::{
@@ -288,6 +288,9 @@ impl Collections {
         // ModPileup
         self.tasks.extend(self.todo_mod_pileup());
 
+        // DMR C diag vs mrd
+        self.tasks.extend(self.todo_dmr_c_diag_mrd());
+
         // Tasks sorting
         self.tasks.sort_by_cached_key(|task| task.get_order());
 
@@ -371,6 +374,45 @@ impl Collections {
             .collect()
     }
 
+    pub fn todo_dmr_c_diag_mrd(&self) -> Vec<CollectionsTasks> {
+        let config = ModkitConfig::default();
+        self.bam
+            .ids()
+            .iter()
+            .filter_map(|id| {
+                if let Ok((diag, mrd)) = self.modbases.get_diag_mrd(id, ModType::Mod5mC5hmC) {
+                    let dmr: Vec<&Dmr> = diag
+                        .dmr_files
+                        .iter()
+                        .filter(|dmr| dmr.base == "C" && dmr.vs == "mrd")
+                        .collect();
+
+                    if dmr.len() == 1 {
+                        let dmr = dmr.first().unwrap();
+                        if let Ok(metadata) = dmr.path.metadata() {
+                            if let Ok(modif) = metadata.modified() {
+                                let m: DateTime<Utc> = modif.into();
+                                if diag.pileup_modif > m || mrd.pileup_modif > m {
+                                    return Some(CollectionsTasks::DMRCDiagMrd {
+                                        id: id.clone(),
+                                        config: config.clone(),
+                                    });
+                                }
+                            }
+                        }
+                    }
+
+                    Some(CollectionsTasks::DMRCDiagMrd {
+                        id: id.clone(),
+                        config: config.clone(),
+                    })
+                } else {
+                    None
+                }
+            })
+            .collect()
+    }
+
     /// Generates pairs of diagnostic and MRD BAM files.
     ///
     /// This function performs the following steps:
@@ -539,6 +581,10 @@ pub enum CollectionsTasks {
         bam: PathBuf,
         config: ModkitConfig,
     },
+    DMRCDiagMrd {
+        id: String,
+        config: ModkitConfig,
+    },
     DeepVariant {
         id: String,
         time_point: String,
@@ -603,6 +649,7 @@ impl CollectionsTasks {
                 time_point,
                 config,
             } => Assembler::new(id, time_point, config).run(),
+            CollectionsTasks::DMRCDiagMrd { id, config } => dmr_c_mrd_diag(&id, &config),
         }
     }
 
@@ -611,12 +658,13 @@ impl CollectionsTasks {
             CollectionsTasks::Align(_) => 0,
             CollectionsTasks::DemuxAlign(_) => 1,
             CollectionsTasks::ModPileup { .. } => 2,
-            CollectionsTasks::WholeScan { .. } => 3,
-            CollectionsTasks::Assemble { .. } => 4,
-            CollectionsTasks::DeepVariant { .. } => 5,
-            CollectionsTasks::ClairS { .. } => 6,
-            CollectionsTasks::NanomonSV { .. } => 7,
-            CollectionsTasks::Variants { .. } => 8,
+            CollectionsTasks::DMRCDiagMrd { .. } => 3,
+            CollectionsTasks::WholeScan { .. } => 4,
+            CollectionsTasks::Assemble { .. } => 5,
+            CollectionsTasks::DeepVariant { .. } => 6,
+            CollectionsTasks::ClairS { .. } => 7,
+            CollectionsTasks::NanomonSV { .. } => 8,
+            CollectionsTasks::Variants { .. } => 9,
         }
     }
 }
@@ -688,6 +736,7 @@ impl fmt::Display for CollectionsTasks {
                 write!(f, "De novo assemblage for {} {}", id, time_point)
             }
             ModPileup { bam, .. } => write!(f, "ModPileup for {}", bam.display()),
+            DMRCDiagMrd { id, .. } => write!(f, "DMR C methylation diag vs mrd for {id}"),
         }
     }
 }

+ 62 - 4
src/collection/modbases.rs

@@ -19,7 +19,7 @@ pub struct ModBases {
     pub mod_type: ModType,
     pub pileup: PathBuf,
     pub pileup_modif: DateTime<Utc>,
-    pub dmr_files: Vec<PathBuf>,
+    pub dmr_files: Vec<Dmr>,
 }
 
 #[derive(Debug, PartialEq, Clone, Deserialize, Serialize)]
@@ -38,6 +38,7 @@ impl FromStr for ModType {
         }
     }
 }
+
 impl ModBases {
     pub fn new(path: PathBuf) -> anyhow::Result<Self> {
         let dir = path
@@ -49,7 +50,6 @@ impl ModBases {
             .to_string_lossy()
             .to_string();
 
-        // /BACOU_diag_5mC_5hmC_modPileup.bed.gz
         let parts: Vec<&str> = filename.split("_").collect();
 
         if parts.len() < 4 {
@@ -63,7 +63,7 @@ impl ModBases {
 
         let entries = fs::read_dir(dir)?;
 
-        let dmr_files: Vec<PathBuf> = entries
+        let dmr_files: Vec<Dmr> = entries
             .filter_map(|entry| {
                 let entry = entry.ok()?;
                 let path = entry.path();
@@ -75,7 +75,21 @@ impl ModBases {
                         .map(|name| name.contains("_DMR_"))
                         .unwrap_or(false)
                 {
-                    Some(path)
+                    let filename = path
+                        .file_stem()
+                        .unwrap_or_default()
+                        .to_string_lossy()
+                        .to_string();
+                    match Dmr::from_str(&filename) {
+                        Ok(mut dmr) => {
+                            dmr.path = path.clone();
+                            Some(dmr)
+                        }
+                        Err(e) => {
+                            warn!("{e}");
+                            None
+                        }
+                    }
                 } else {
                     None
                 }
@@ -112,6 +126,26 @@ impl ModBasesCollection {
     pub fn new(result_dir: &str) -> Self {
         load_modbases_collection(result_dir)
     }
+
+    pub fn get_id(&self, id: &str, modif: ModType) -> Vec<&ModBases> {
+        self.modbases
+            .iter()
+            .filter(|m| m.id == id && m.mod_type == modif)
+            .collect()
+    }
+
+    pub fn get_diag_mrd(&self, id: &str, modif: ModType) -> anyhow::Result<(&ModBases, &ModBases)> {
+        let v = self.get_id(id, modif);
+        if v.len() != 2 {
+            anyhow::bail!("Error not found or too many files");
+        } else {
+            match (v[0].time_point.as_str(), v[1].time_point.as_str()) {
+                ("diag", "mrd") => Ok((v[0], v[1])),
+                ("mrd", "diag") => Ok((v[1], v[0])),
+                _ => anyhow::bail!("Error in DMR files"),
+            }
+        }
+    }
 }
 
 pub fn load_modbases_collection(result_dir: &str) -> ModBasesCollection {
@@ -133,3 +167,27 @@ pub fn load_modbases_collection(result_dir: &str) -> ModBasesCollection {
 
     ModBasesCollection { modbases }
 }
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+pub struct Dmr {
+    pub base: String,
+    pub vs: String,
+    pub path: PathBuf,
+}
+
+impl FromStr for Dmr {
+    type Err = anyhow::Error;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        if !s.contains("_DMR_") {
+            anyhow::bail!("Error paring DMR not `_DMR_` in filename {s}");
+        }
+
+        let spl: Vec<&str> = s.split("_").collect();
+        Ok(Self {
+            base: spl[spl.len() - 2].to_string(),
+            vs: spl[spl.len() - 1].to_string(),
+            path: PathBuf::default(),
+        })
+    }
+}

+ 55 - 1
src/commands/modkit.rs

@@ -9,7 +9,10 @@ pub struct ModkitConfig {
     pub bin: String,
     pub crabz_bin: String,
     pub tabix_bin: String,
+    pub reference: String,
+    pub cgi: String,
     pub threads: u8,
+    pub result_dir: String,
 }
 
 impl Default for ModkitConfig {
@@ -18,7 +21,10 @@ impl Default for ModkitConfig {
             bin: "modkit".to_string(),
             crabz_bin: "crabz".to_string(),
             tabix_bin: "tabix".to_string(),
+            reference: "/data/ref/hs1/chm13v2.0.fa".to_string(),
+            cgi: "/data/ref/hs1/chm13v2.0_CGI_corrected.bed".to_string(),
             threads: 155,
+            result_dir: "/data/longreads_basic_pipe".to_string(),
         }
     }
 }
@@ -75,4 +81,52 @@ pub fn bed_methyl(bam: PathBuf, config: &ModkitConfig) -> anyhow::Result<()> {
     Ok(())
 }
 
-// modkit dmr pair -m C -t 160 --ref /data/ref/hs1/chm13v2.0.fa -r /data/ref/hs1/chm13v2.0_CGI_corrected.bed -a /data/longreads_basic_pipe/BACOU/mrd/5mC_5hmC/BACOU_mrd_5mC_5hmC.bed.gz -b /data/longreads_basic_pipe/BACOU/diag/5mC_5hmC/BACOU_diag_5mC_5hmC.bed.gz -o /data/longreads_basic_pipe/BACOU/diag/5mC_5hmC/BACOU_diag_5mC_5hmC_DMR_C_mrd.bed.gz
+pub fn modkit_dmr(
+    a: &str,
+    b: &str,
+    out: &str,
+    base: &str,
+    config: &ModkitConfig,
+) -> anyhow::Result<()> {
+    let args = [
+        "dmr",
+        "pair",
+        "-m",
+        base,
+        "--ref",
+        &config.reference,
+        "-r",
+        &config.cgi,
+        "-t",
+        &config.threads.to_string(),
+        "-a",
+        a,
+        "-b",
+        b,
+        "-o",
+        out,
+    ];
+    let mut cmd_run = CommandRun::new(&config.bin, &args);
+    run_wait(&mut cmd_run).context(format!("Error while running modkit {}", args.join(" ")))?;
+
+    let mut cmd_tabix = CommandRun::new(&config.tabix_bin, &[out]);
+    let _ = run_wait(&mut cmd_tabix).context(format!("Error while runnig tabix for {out}"))?;
+    Ok(())
+}
+
+pub fn dmr_c_mrd_diag(id: &str, config: &ModkitConfig) -> anyhow::Result<()> {
+    let mrd = format!(
+        "{}/{id}/mrd/5mC_5hmC/{id}_mrd_5mC_5hmC_modPileup.bed.gz",
+        config.result_dir
+    );
+    let diag = format!(
+        "{}/{id}/diag/5mC_5hmC/{id}_diag_5mC_5hmC_modPileup.bed.gz",
+        config.result_dir
+    );
+    let out = format!(
+        "{}/{id}/diag/5mC_5hmC/{id}_diag_5mC_5hmC_DMR_C_mrd.bed.gz",
+        config.result_dir
+    );
+
+    modkit_dmr(&mrd, &diag, &out, "C", config)
+}

+ 1 - 1
src/config.rs

@@ -28,7 +28,7 @@ pub struct AlignConfig {
 impl Default for AlignConfig {
     fn default() -> Self {
         Self {
-            dorado_bin: "/data/tools/dorado-0.8.1-linux-x64/bin/dorado".to_string(),
+            dorado_bin: "/data/tools/dorado-0.8.2-linux-x64/bin/dorado".to_string(),
             dorado_basecall_arg: "-x 'cuda:0,1,2,3' sup,5mC_5hmC".to_string(), // since v0.8.0 need
             // to specify cuda devices (exclude the T1000)
             ref_fa: "/data/ref/hs1/chm13v2.0.fa".to_string(),

+ 15 - 18
src/lib.rs

@@ -36,7 +36,6 @@ mod tests {
             .try_init();
     }
 
-
     // export RUST_LOG="debug"
     #[test]
     fn it_works() {
@@ -143,14 +142,14 @@ mod tests {
     #[test]
     fn nanomonsv_solo() -> anyhow::Result<()> {
         init();
-        // let id = "CAZIER";
-        // let time_point = "diag";
-        // NanomonSVSolo::new(id, &format!("/data/longreads_basic_pipe/{id}/{time_point}/{id}_{time_point}_hs1.bam"), time_point, NanomonSVConfig::default()).run()?;
-        let time_point = "mrd";
-        for id in ["MERY", "CAMARA", "FRANIATTE", "FERATI", "IQBAL", "COLLE", "JOLIVET", "BAFFREAU", "MANCUS", "BELARBI", "BENGUIRAT", "HENAUX", "MEDDAH"] {
-
+        let id = "GILLOUX";
+        let time_point = "diag";
         NanomonSVSolo::new(id, &format!("/data/longreads_basic_pipe/{id}/{time_point}/{id}_{time_point}_hs1.bam"), time_point, NanomonSVConfig::default()).run()?;
-        }
+        // let time_point = "mrd";
+        // for id in ["MERY", "CAMARA", "FRANIATTE", "FERATI", "IQBAL", "COLLE", "JOLIVET", "BAFFREAU", "MANCUS", "BELARBI", "BENGUIRAT", "HENAUX", "MEDDAH"] {
+        //
+        // NanomonSVSolo::new(id, &format!("/data/longreads_basic_pipe/{id}/{time_point}/{id}_{time_point}_hs1.bam"), time_point, NanomonSVConfig::default()).run()?;
+        // }
         Ok(())
     }
 
@@ -158,12 +157,11 @@ mod tests {
     #[test]
     fn todo_all() -> anyhow::Result<()> {
         init();
-        let mut collections = Collections::new(
-            CollectionsConfig::default()
-        )?;
-        info!("Runing todo with config: {:#?}", collections);
+        let config = CollectionsConfig::default();
+        info!("Runing todo with config: {:#?}", config);
+        let mut collections = Collections::new(config)?;
         collections.todo()?;
-        println!("{:#?}", collections.tasks);
+        collections.tasks.iter().for_each(|t| println!("{t}"));
         println!("{}", collections.tasks.len());
         Ok(())
     }
@@ -171,10 +169,9 @@ mod tests {
     #[test]
     fn todo_agg() -> anyhow::Result<()> {
         init();
-        let collections = Collections::new(
-            CollectionsConfig::default()
-        )?;
-        info!("Runing todo with config: {:#?}", collections);
+        let config = CollectionsConfig::default();
+        info!("Runing todo with config: {:#?}", config);
+        let collections = Collections::new(config)?;
         let agg_tasks = collections.todo_variants_agg()?;
         println!("{:#?}", agg_tasks);
         println!("{}", agg_tasks.len());
@@ -188,8 +185,8 @@ mod tests {
             id_black_list: vec!["MANCUSO".to_string(),"HAMROUNE".to_string()],
             ..Default::default()
         };
+        info!("Runing todo with config: {:#?}", config);
         let mut collections = Collections::new(config)?;
-        info!("Runing todo with config: {:#?}", collections);
         collections.tasks = collections.todo_variants_agg()?;
         collections.run()?;