Przeglądaj źródła

gatk modif to use exec multiple jobs

Thomas 1 tydzień temu
rodzic
commit
f11370f0cf
5 zmienionych plików z 84 dodań i 28 usunięć
  1. 4 0
      Cargo.lock
  2. 2 2
      src/callers/clairs.rs
  3. 29 18
      src/callers/gatk.rs
  4. 48 7
      src/de_novo/de_novo_pipe.rs
  5. 1 1
      src/pipes/somatic.rs

+ 4 - 0
Cargo.lock

@@ -1376,6 +1376,8 @@ dependencies = [
 [[package]]
 name = "hts-sys"
 version = "2.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e38d7f1c121cd22aa214cb4dadd4277dc5447391eac518b899b29ba6356fbbb2"
 dependencies = [
  "bindgen",
  "bzip2-sys",
@@ -2628,6 +2630,8 @@ dependencies = [
 [[package]]
 name = "rust-htslib"
 version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f22161678c3d72e6434c5f3383325dbf88c3cacce665f0c7b4b077fc6e957ba9"
 dependencies = [
  "bio-types",
  "byteorder",

+ 2 - 2
src/callers/clairs.rs

@@ -1179,7 +1179,7 @@ mod tests {
     #[test]
     fn clairs_run() -> anyhow::Result<()> {
         test_init();
-        let id = "CHAHA";
+        let id = "DUMCO";
         let config = Config::default();
         let mut clairs = ClairS::initialize(id, &config)?;
         crate::runners::Run::run(&mut clairs)?;
@@ -1190,7 +1190,7 @@ mod tests {
     #[test]
     fn clairs_resume() -> anyhow::Result<()> {
         test_init();
-        let id = "CHAHA";
+        let id = "DUMCO";
         let config = Config::default();
         let  clairs = ClairS::initialize(id, &config)?;
 

+ 29 - 18
src/callers/gatk.rs

@@ -134,15 +134,16 @@ use crate::{
     collection::vcf::Vcf,
     commands::{
         bcftools::{BcftoolsConcat, BcftoolsIndex, BcftoolsKeepPass},
-        Command as JobCommand, LocalBatchRunner, LocalRunner, SbatchRunner, SlurmParams,
+        exec_jobs, AnyJob, Command as JobCommand, LocalRunner, SbatchRunner, SlurmParams,
         SlurmRunner,
     },
     config::Config,
     helpers::{is_file_older, remove_dir_if_exists, singularity_bind_flags},
     io::{bam::read_sm_tag_or_inject, bed::BedRow, vcf::read_vcf},
+    jobs_seq,
     locker::SampleLock,
     pipes::{Initialize, ShouldRun, Version},
-    run, run_many,
+    run,
     runners::Run,
     variant::{
         variant_collection::VariantCollection,
@@ -362,7 +363,7 @@ impl JobCommand for Mutect2 {
 }
 
 impl LocalRunner for Mutect2 {}
-impl LocalBatchRunner for Mutect2 {}
+// impl LocalBatchRunner for Mutect2 {}
 
 impl SlurmRunner for Mutect2 {
     fn slurm_args(&self) -> Vec<String> {
@@ -815,27 +816,36 @@ pub fn run_mutect2_chunked(id: &str, config: &Config, n_parts: usize) -> anyhow:
 
         job.bed_path = sub_bed;
         info!("Planned Mutect2 job:\n{job}");
-        jobs.push(job);
+
+        let keep_pass = BcftoolsKeepPass::from_config(
+            &job.config,
+            job.output_vcf_path(),
+            job.passed_vcf_path(),
+        );
+        jobs.push(jobs_seq![job, keep_pass]);
     }
 
+    let outputs = exec_jobs(jobs, false, 1).context("Failed to run Mutect2 chunked")?;
+
     // Run all Mutect2 jobs (local or Slurm, depending on config)
-    let outputs = run_many!(config, jobs.clone())?;
+    // let outputs = run_many!(config, jobs.clone())?;
     for output in outputs.iter() {
         output.save_to_file(format!("{}/mutect2_", base.log_dir))?;
     }
 
-    // Filter PASS variants for each part
-    info!(
-        "Filtering PASS variants for all {} Mutect2 parts",
-        actual_n_parts
-    );
-    let filter_jobs: Vec<_> = jobs
-        .iter()
-        .map(|job| {
-            BcftoolsKeepPass::from_config(&job.config, job.output_vcf_path(), job.passed_vcf_path())
-        })
-        .collect();
-    run_many!(config, filter_jobs)?;
+    // // Filter PASS variants for each part
+    // info!(
+    //     "Filtering PASS variants for all {} Mutect2 parts",
+    //     actual_n_parts
+    // );
+    // let filter_jobs: Vec<_> = jobs
+    //     .iter()
+    //     .flatten()
+    //     .map(|job| {
+    //         BcftoolsKeepPass::from_config(&job.config, job.output_vcf_path(), job.passed_vcf_path())
+    //     })
+    //     .collect();
+    // run_many!(config, filter_jobs)?;
 
     // Merge all PASS VCFs
     merge_mutect2_parts(&base, actual_n_parts)?;
@@ -873,7 +883,8 @@ mod tests {
     fn mutect2_run() -> anyhow::Result<()> {
         test_init();
         let config = Config::default();
-        Run::run(&mut Mutect2::initialize("CHALO", &config)?)?;
+        Mutect2::initialize("DUMCO", &config)?.run()?;
+        // Run::run(&mut Mutect2::initialize("DUMCO", &config)?)?;
         // Mutect2::initialize("CHALO", &config)?.run()?;
         Ok(())
     }

+ 48 - 7
src/de_novo/de_novo_pipe.rs

@@ -456,16 +456,54 @@ mod tests {
     fn test_local_assembly() -> anyhow::Result<()> {
         init();
 
-        let id = "CML2518";
-        let abl_locus = ("chr9", 142_958_315, 142_958_913);
-        let bcr_locus = ("chr22", 23_713_734, 23_714_329);
+        // CHAHA
+        // let id = "CHAHA";
+        // let reads: Vec<Vec<u8>> = vec![
+        //     "80d3882e-3eba-4dad-a35a-058f2d2de701",
+        //     "20d5dbfd-9545-4008-b4ba-8b7945c5ac98",
+        //     "621d3c51-dee5-4b43-9763-ed373dbf3bbd",
+        //     "67f962a3-63e9-4f29-831d-963a30abe0dd",
+        //     "4893348b-917d-4df3-9f8f-c9e33e2a30e9",
+        //     "d249d6cb-f55e-47b7-a08d-7d21fbe5793a",
+        //     "63ccecb8-28cc-4f0e-93b2-712ff3fae469",
+        // ]
+        // .into_iter()
+        // .map(|e| e.to_string().as_bytes().to_vec())
+        // .collect();
+        // let locus = ("chr4", 146857960, 146858960);
+
+        let id = "DUMCO";
+        let reads: Vec<Vec<u8>> = vec![
+            "1ca90c3f-0ef6-4d8d-82ee-11ff20697362",
+            "6bb25dce-0002-444d-842f-85b7f4d1bb5d",
+            "b542a247-ec2f-447e-b41c-4e01b6fd137e",
+            "4fae3a60-9232-4bb9-a4de-c52aad994002",
+            "3b9b3ba1-af8d-4398-8aba-10f97efbe85d", // not sure
+        ]
+        .into_iter()
+        .map(|e| e.to_string().as_bytes().to_vec())
+        .collect();
+        let locus = ("chr11", 118502440, 118503440);
+
+        let id = "DUMCO";
+        let reads: Vec<Vec<u8>> = vec![
+            "ae6e4942-1d78-4b99-b259-8aea8a5fb2bb",
+            "8fb37629-4646-4b0d-b558-5a454bfae659",
+            "4fae3a60-9232-4bb9-a4de-c52aad994002",
+            "27d217b2-509c-450f-8edd-e0d09c93a5c0",
+            "930f9466-6a9f-44ba-9cfa-a18090236961",
+        ]
+        .into_iter()
+        .map(|e| e.to_string().as_bytes().to_vec())
+        .collect();
+        let locus = ("chr7", 47667964, 47668964);
 
         let config = Config::default();
 
         let bam_path = PathBuf::from(config.tumoral_bam(id));
         let work_dir = PathBuf::from(format!(
-            "{}/{id}/{}/asm_wtdbg2_22",
-            config.result_dir, config.tumoral_name
+            "{}/{id}/{}/asm_wtdbg2_{}_{}_{}",
+            config.result_dir, config.tumoral_name, locus.0, locus.1, locus.2
         ));
         if work_dir.exists() {
             std::fs::remove_dir_all(&work_dir)?;
@@ -473,14 +511,17 @@ mod tests {
         std::fs::create_dir_all(&work_dir)?;
 
         let assembly_config = LocalAssemblyConfig {
-            min_records: 10,
+            min_records: 3,
             case_id: id.to_string(),
             config: config.clone(),
         };
 
         // Fetch initial primary records from locus
         // let records = fetch_primary_records(&bam_path, Some(abl_locus), None)?;
-        let records = fetch_primary_records(&bam_path, Some(bcr_locus), None)?;
+        let mut records = fetch_primary_records(&bam_path, Some(locus), None)?;
+
+        info!("Fetched {} primary records from locus", records.len());
+        records.retain(|r| reads.contains(&r.qname().to_vec()));
 
         info!("Fetched {} primary records from locus", records.len());
 

+ 1 - 1
src/pipes/somatic.rs

@@ -1126,7 +1126,7 @@ mod tests {
             .num_threads(config.threads.into())
             .build()?;
 
-        pool.install(|| SomaticPipe::initialize("CHAHA", &config)?.run())?;
+        pool.install(|| SomaticPipe::initialize("DUMCO", &config)?.run())?;
         Ok(())
     }
 }