Browse Source

upgraded jobs names with part index

Thomas 1 tuần trước cách đây
mục cha
commit
66faf6060c

+ 2 - 0
pandora-config.example.toml

@@ -13,6 +13,8 @@ tmp_dir = "/mnt/beegfs02/scratch/t_steimle/tmp"
 # Should use Slurm as runner
 slurm_runner = true
 
+# slurm_genome_chunks = 150
+
 # Run cache directory.
 run_cache_dir = "/home/t_steimle/data/prom_runs"
 

+ 24 - 11
src/callers/clairs.rs

@@ -167,15 +167,28 @@
 //! - [ClairS GitHub repository](https://github.com/HKU-BAL/ClairS)
 //! - [ClairS publication (Nature Communications, 2024)](https://doi.org/10.1038/s41467-024-52832-2)
 use crate::{
-    annotation::{Annotation, Annotations, Caller, CallerCat, Sample}, collection::vcf::Vcf, commands::{
-        Command as JobCommand, LocalBatchRunner, LocalRunner, SbatchRunner, SlurmParams, SlurmRunner, bcftools::{BcftoolsConcat, BcftoolsKeepPass}, samtools::SamtoolsMergeMany
-    }, config::Config, helpers::{
+    annotation::{Annotation, Annotations, Caller, CallerCat, Sample},
+    collection::vcf::Vcf,
+    commands::{
+        bcftools::{BcftoolsConcat, BcftoolsKeepPass},
+        samtools::SamtoolsMergeMany,
+        Command as JobCommand, LocalBatchRunner, LocalRunner, SbatchRunner, SlurmParams,
+        SlurmRunner,
+    },
+    config::Config,
+    helpers::{
         get_genome_sizes, is_file_older, list_files_recursive, remove_dir_if_exists,
         singularity_bind_flags, split_genome_into_n_regions_exact,
-    }, io::vcf::read_vcf, locker::SampleLock, pipes::{Initialize, ShouldRun, Version}, run, run_many, runners::Run, variant::{
+    },
+    io::vcf::read_vcf,
+    locker::SampleLock,
+    pipes::{Initialize, ShouldRun, Version},
+    run, run_many,
+    runners::Run,
+    variant::{
         variant_collection::VariantCollection,
         vcf_variant::{Label, Variants},
-    }
+    },
 };
 
 use anyhow::Context;
@@ -365,10 +378,11 @@ impl LocalBatchRunner for ClairS {}
 
 impl SlurmRunner for ClairS {
     fn slurm_args(&self) -> Vec<String> {
+        let batch_id = self.part_index.map(|i| format!("_{i}")).unwrap_or_default();
         SlurmParams {
-            job_name: Some(format!("clairs_{}", self.id)),
+            job_name: Some(format!("clairs_{}{}", self.id, batch_id)),
             cpus_per_task: Some(self.config.clairs_threads as u32),
-            mem: Some("60G".into()),
+            mem: Some("50G".into()),
             partition: Some("shortq".into()),
             gres: None,
         }
@@ -378,10 +392,11 @@ impl SlurmRunner for ClairS {
 
 impl SbatchRunner for ClairS {
     fn slurm_params(&self) -> SlurmParams {
+        let batch_id = self.part_index.map(|i| format!("_{i}")).unwrap_or_default();
         SlurmParams {
-            job_name: Some(format!("clairs_{}", self.id)),
+            job_name: Some(format!("clairs_{}{}", self.id, batch_id)),
             cpus_per_task: Some(self.config.clairs_threads as u32),
-            mem: Some("60G".into()),
+            mem: Some("50G".into()),
             partition: Some("shortq".into()),
             gres: None,
         }
@@ -400,7 +415,6 @@ impl Run for ClairS {
         let _lock = SampleLock::acquire(&lock_dir, &self.id, "clairs")
             .with_context(|| format!("Cannot start ClairS for {}", self.id))?;
 
-
         if self.config.slurm_runner {
             run_clairs_chunked(&self.id, &self.config, 50)?;
             // merge_haplotaged_tmp_bam(&self.config, &self.id)?;
@@ -863,7 +877,6 @@ pub fn run_clairs_chunked(id: &str, config: &Config, n_parts: usize) -> anyhow::
 
     let base = ClairS::initialize(id, config)?;
 
-
     let normal_bam = config.normal_bam(id);
     let reader = bam::Reader::from_path(&normal_bam)
         .with_context(|| format!("Failed to open BAM: {normal_bam}"))?;

+ 4 - 2
src/callers/deep_somatic.rs

@@ -284,8 +284,9 @@ impl LocalBatchRunner for DeepSomatic {}
 
 impl SlurmRunner for DeepSomatic {
     fn slurm_args(&self) -> Vec<String> {
+        let batch_id = self.part_index.map(|i| format!("_{i}")).unwrap_or_default();
         SlurmParams {
-            job_name: Some(format!("deepsomatic_{}", self.id)),
+            job_name: Some(format!("deepsomatic_{}{}", self.id, batch_id)),
             cpus_per_task: Some(self.config.deepsomatic_threads as u32),
             mem: Some("60G".into()),
             partition: Some("shortq".into()),
@@ -297,8 +298,9 @@ impl SlurmRunner for DeepSomatic {
 
 impl SbatchRunner for DeepSomatic {
     fn slurm_params(&self) -> SlurmParams {
+        let batch_id = self.part_index.map(|i| format!("_{i}")).unwrap_or_default();
         SlurmParams {
-            job_name: Some(format!("deepsomatic_{}", self.id)),
+            job_name: Some(format!("deepsomatic_{}{}", self.id, batch_id)),
             cpus_per_task: Some(self.config.deepsomatic_threads as u32),
             mem: Some("60G".into()),
             partition: Some("shortq".into()),

+ 10 - 2
src/callers/deep_variant.rs

@@ -389,8 +389,12 @@ impl LocalBatchRunner for DeepVariant {}
 
 impl SlurmRunner for DeepVariant {
     fn slurm_args(&self) -> Vec<String> {
+        let batch_id = self.part_index.map(|i| format!("_{i}")).unwrap_or_default();
         SlurmParams {
-            job_name: Some(format!("deepvariant_{}_{}", self.id, self.time_point)),
+            job_name: Some(format!(
+                "deepvariant_{}_{}{}",
+                self.id, self.time_point, batch_id
+            )),
             cpus_per_task: Some(self.config.deepvariant_threads.into()),
             mem: Some("40G".into()),
             partition: Some("shortq".into()),
@@ -401,8 +405,12 @@ impl SlurmRunner for DeepVariant {
 }
 impl SbatchRunner for DeepVariant {
     fn slurm_params(&self) -> SlurmParams {
+        let batch_id = self.part_index.map(|i| format!("_{i}")).unwrap_or_default();
         SlurmParams {
-            job_name: Some(format!("deepvariant_{}_{}", self.id, self.time_point)),
+            job_name: Some(format!(
+                "deepvariant_{}_{}{}",
+                self.id, self.time_point, batch_id
+            )),
             cpus_per_task: Some(self.config.deepvariant_threads.into()),
             mem: Some("40G".into()),
             partition: Some("shortq".into()),

+ 1 - 1
src/commands/mod.rs

@@ -818,7 +818,7 @@ where
     // }
 
     // Rule for the IGR cluster because the admins dont want to implement propoper rules so we are 
-    let max_parallel = 20;
+    let max_parallel = 30;
     let pool = rayon::ThreadPoolBuilder::new()
         .num_threads(max_parallel)
         .build()