use std::sync::{Arc, Mutex}; pub mod commands; pub mod config; pub mod modkit; pub mod callers; pub mod runners; pub mod collection; pub mod functions; pub mod helpers; pub mod variant; pub mod io; pub mod pipes; pub mod positions; pub mod annotation; pub mod cases; #[macro_use] extern crate lazy_static; // Define DOCKER_ID lock for handling Docker kill when ctrlc is pressed lazy_static! { static ref DOCKER_ID: Arc>> = Arc::new(Mutex::new(Vec::new())); } #[cfg(test)] mod tests { use std::{collections::HashMap, fs, path::Path}; use annotation::{vep::{VepLine, VEP}, Annotations}; use callers::{nanomonsv::nanomonsv_create_pon, savana::{Savana, SavanaReadCounts}, severus::{Severus, SeverusSolo}}; use collection::{bam::{counts_at, counts_ins_at, nt_pileup, WGSBam, WGSBamStats}, pod5::{Pod5, Pod5Config}, Initialize, InitializeSolo, Version}; use commands::{longphase::{LongphaseConfig, LongphaseHap, LongphaseModcallSolo, LongphasePhase}, modkit::{bed_methyl, ModkitConfig}}; use functions::assembler::{Assembler, AssemblerConfig}; use helpers::estimate_shannon_entropy; use io::bed::read_bed; use log::{error, info, warn}; use pipes::somatic::Somatic; use positions::{overlaps_par, GenomePosition, GenomeRange}; use rayon::prelude::*; use runners::Run; use variant::{variant::{Variants, VcfVariant}, variant_collection}; use self::{collection::pod5::{FlowCellCase, Pod5Collection}, commands::dorado, config::Config}; use super::*; use crate::{callers::{clairs::ClairS, deep_variant::DeepVariant, nanomonsv::{NanomonSV, NanomonSVSolo}}, collection::{bam, run_tasks, vcf::VcfCollection, Collections, CollectionsConfig}, commands::dorado::Dorado}; // export RUST_LOG="debug" fn init() { let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")) .is_test(true) .try_init(); } #[test] fn it_works() { let bam_path = "/data/longreads_basic_pipe/PARACHINI/diag/PARACHINI_diag_hs1.bam"; modkit::modkit(bam_path); } #[test] fn run_dorado() -> anyhow::Result<()> { let case = FlowCellCase { id: "CONSIGNY".to_string(), time_point: "mrd".to_string(), barcode: "07".to_string(), pod_dir: "/data/run_data/20240326-CL/CONSIGNY-MRD-NB07_RICCO-DIAG-NB08/20240326_1355_1E_PAU78333_bc25da25/pod5_pass/barcode07".into() }; dorado::Dorado::init(case, Config::default())?.run_pipe() } #[test] fn pod5() -> anyhow::Result<()> { let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")) .build(); let _ = Pod5Collection::new( "/data/run_data", "/data/flow_cells.tsv", "/data/longreads_basic_pipe", )?; // let runs = Runs::import_dir("/home/prom/store/banana-pool/run_data", "/data/flow_cells.tsv")?; Ok(()) } #[test] fn bam() -> anyhow::Result<()> { init(); let bam_collection = bam::load_bam_collection("/data/longreads_basic_pipe"); bam_collection .bams .iter() // .filter(|b| matches!(b.bam_type, BamType::Panel(_))) .for_each(|b| println!("{b:#?}")); let u = bam_collection.get("PARACHINI", "mrd"); println!("{u:#?}"); Ok(()) } #[test] fn vcf() -> anyhow::Result<()> { init(); let mut vcf_collection = VcfCollection::new("/data/longreads_basic_pipe"); vcf_collection.sort_by_id(); vcf_collection .vcfs .iter() .for_each(|v| v.println().unwrap()); Ok(()) } // pod5 view -I /data/run_data/20240903-CL/ARMEM-DG-N02_ASSJU-DG-N03/20240903_1428_1B_PAW47629_fc24c3cf/pod5/PAW47629_fc24c3cf_77b07847_0.pod5 | head -5000 | awk '{if(NR==1){print "target,"$0}else{print "subset_1.pod5,"$0}}' > /tmp/subset_ids.csv // pod5 subset /data/run_data/20240903-CL/ARMEM-DG-N02_ASSJU-DG-N03/20240903_1428_1B_PAW47629_fc24c3cf/pod5/PAW47629_fc24c3cf_77b07847_0.pod5 --csv /tmp/subset_ids.csv -o /data/test_suite/pod5/muxed/ #[test] fn mux() -> anyhow::Result<()> { init(); let result_dir = "/data/test_suite/results".to_string(); let cases = vec![ FlowCellCase { id: "test_02".to_string(), time_point: "diag".to_string(), barcode: "02".to_string(), pod_dir: "/data/test_suite/pod5/muxed".into() }, FlowCellCase { id: "test_03".to_string(), time_point: "diag".to_string(), barcode: "03".to_string(), pod_dir: "/data/test_suite/pod5/muxed".into() }, ]; cases.iter().for_each(|c| { let dir = format!("{result_dir}/{}", c.id); if Path::new(&dir).exists() { fs::remove_dir_all(dir).unwrap(); } }); let config = Config { result_dir, ..Default::default() }; Dorado::from_mux(cases, config) } // #[test_log::test] // fn clairs() -> anyhow::Result<()> { // let config = ClairSConfig { // result_dir: "/data/test".to_string(), // ..ClairSConfig::default() // }; // ClairS::new("test_a", "/data/test_data/subset.bam", "/data/test_data/subset_mrd.bam", config).run() // } #[test] fn nanomonsv() -> anyhow::Result<()> { init(); let id = "HAMROUNE"; NanomonSV::initialize(id, Config::default())?.run() } #[test] fn nanomddonsv_solo() -> anyhow::Result<()> { init(); NanomonSVSolo::initialize("BRETON", "diag", Config::default())?.run() } // cargo test run -- --nocapture; ~/run_scripts/notify_finish.sh & #[test] fn todo_all() -> anyhow::Result<()> { init(); // let config = CollectionsConfig::default(); let config = CollectionsConfig { pod_dir: "/data/run_data".to_string(), ..Default::default() }; info!("Runing todo with config: {:#?}", config); let mut collections = Collections::new(config)?; collections.todo()?; collections.tasks.iter().for_each(|t| println!("{t}")); println!("{}", collections.tasks.len()); Ok(()) } #[test] fn todo_agg() -> anyhow::Result<()> { init(); let config = CollectionsConfig::default(); info!("Runing todo with config: {:#?}", config); let collections = Collections::new(config)?; let agg_tasks = collections.todo_variants_agg()?; println!("{:#?}", agg_tasks); println!("{}", agg_tasks.len()); Ok(()) } #[test] fn run_agg() -> anyhow::Result<()> { init(); let config = CollectionsConfig { id_black_list: vec!["MANCUSO".to_string(),"HAMROUNE".to_string()], ..Default::default() }; info!("Runing todo with config: {:#?}", config); let mut collections = Collections::new(config)?; collections.tasks = collections.todo_variants_agg()?; collections.run()?; Ok(()) } // export RUST_LOG="debug" #[test] fn run_t() -> anyhow::Result<()> { init(); // let config = CollectionsConfig::default(); let config = CollectionsConfig { pod_dir: "/data/run_data".to_string(), ..Default::default() }; run_tasks(config) } // #[test_log::test] // fn bcftools_pass() { // let config = BcftoolsConfig::default(); // let id = "RICCO"; // let time = "diag"; // let caller = "DeepVariant"; // // Config::default(); // // // let (i, o) = // // let i = format!("/data/longreads_basic_pipe/{id}/{time}/nanomonsv/{id}_diag.nanomonsv.result.vcf"); // // let o = format!("/data/longreads_basic_pipe/{id}/{time}/nanomonsv/{id}_diag_nanomonsv_PASSED.vcf.gz"); // bcftools_keep_pass(&i, &o, config).unwrap(); // } #[test] fn bam_ok() -> anyhow::Result<()> { init(); let collections = Collections::new( CollectionsConfig::default() )?; let mut res: Vec<_> = collections.bam.by_id_completed(15.0, 10.0).iter().map(|b| { (b.id.to_string(), b.time_point.to_string(), b.path.to_str().unwrap().to_string()) }).collect(); res.sort_by_key(|b| b.1.clone()); res.sort_by_key(|b| b.0.clone()); res.iter().for_each(|(id, tp, path)| println!("{id}\t{tp}\t{path}")); Ok(()) } #[test] fn todo_assembler() -> anyhow::Result<()> { init(); let collections = Collections::new( CollectionsConfig::default() )?; collections.todo_assembler()?; Ok(()) } #[test] fn sv_pon() -> anyhow::Result<()> { init(); nanomonsv_create_pon(&Config::default(), "/data/ref/hs1/nanomonsv_pon.vcf.gz") } #[test] fn todo_mod() -> anyhow::Result<()> { init(); let collections = Collections::new( CollectionsConfig::default() )?; collections.todo_mod_pileup(); Ok(()) } #[test] fn todo_deepv() -> anyhow::Result<()> { init(); let collections = Collections::new( CollectionsConfig::default() )?; let tasks = collections.todo_deepvariants(); tasks.iter().for_each(|t| info!("{t}")); info!("n tasks {}", tasks.len()); Ok(()) } #[test] fn todo_clairs() -> anyhow::Result<()> { init(); let collections = Collections::new( CollectionsConfig::default() )?; collections.todo_clairs().iter().for_each(|t| info!("{t}")); Ok(()) } #[test] fn run_assemblers() -> anyhow::Result<()> { Assembler::new("CAMEL".to_string(), "diag".to_string(), AssemblerConfig::default()).run() } // #[test] // fn run_dmr_par() -> anyhow::Result<()> { // init(); // let collections = Collections::new( // CollectionsConfig::default() // )?; // let tasks = collections.todo_dmr_c_diag_mrd(); // tasks.iter().for_each(|t| info!("{t}")); // let len = tasks.len(); // // let pool = ThreadPoolBuilder::new().num_threads(10).build().unwrap(); // // pool.install(|| { // // tasks.par_iter().enumerate().for_each(|(i, t)| { // // let config = ModkitConfig {threads: 2, ..Default::default() }; // // if let collection::CollectionsTasks::DMRCDiagMrd { id, .. } = t { let _ = dmr_c_mrd_diag(id, &config); } // // println!("⚡ {i}/{len}"); // // }); // // }); // Ok(()) // } #[test] fn run_mod_par() -> anyhow::Result<()> { init(); let collections = Collections::new( CollectionsConfig::default() )?; let tasks = collections.todo_mod_pileup(); let len = tasks.len(); tasks.par_iter().enumerate().for_each(|(i, t)| { let config = ModkitConfig {threads: 2, ..Default::default() }; if let collection::CollectionsTasks::ModPileup { bam, .. } = t { let _ = bed_methyl(bam.to_owned(), &config); } println!("⚡ {i}/{len}"); }); Ok(()) } #[test] fn run_severus() -> anyhow::Result<()> { init(); Severus::initialize("CAMEL", Config::default())?.run() } #[test] fn run_severus_solo() -> anyhow::Result<()> { init(); SeverusSolo::initialize("CAMEL","diag", Config::default())?.run() } #[test] fn run_savana() -> anyhow::Result<()> { init(); let collections = Collections::new( CollectionsConfig::default() )?; for bam in collections.bam.by_id_completed(15.0, 10.0).iter() { let id = &bam.id; match ClairS::initialize(id, Config::default())?.run() { Ok(_) => match Savana::initialize(id, Config::default())?.run() { Ok(_) => (), Err(e) => error!("{e}"), }, Err(e) => error!("{e}"), } ; } Ok(()) } #[test] fn check_versions() -> anyhow::Result<()> { init(); let config = Config::default(); let v = Savana::version(&config)?; info!("Savanna version {v}"); let v = Severus::version(&config)?; info!("Severus version {v}"); Ok(()) } #[test] fn run_multi_deepvariant() -> anyhow::Result<()> { init(); let mut collections = Collections::new( CollectionsConfig::default() )?; collections.run_deepvariant() } #[test] fn run_deepvariant() -> anyhow::Result<()> { init(); DeepVariant::initialize("HAMROUNE", "diag", Config::default())?.run() } #[test] fn run_clairs() -> anyhow::Result<()> { init(); ClairS::initialize("ADJAGBA", Config::default())?.run() } #[test] fn run_longphase() -> anyhow::Result<()> { init(); let id = "BECERRA"; let diag_bam = format!("/data/longreads_basic_pipe/{id}/diag/{id}_diag_hs1.bam"); let vcf = format!("/data/longreads_basic_pipe/{id}/diag/ClairS/clair3_normal_tumoral_germline_output.vcf.gz"); let mrd_bam = format!("/data/longreads_basic_pipe/{id}/mrd/{id}_mrd_hs1.bam"); LongphaseHap::new(id, &diag_bam, &vcf, LongphaseConfig::default()).run()?; LongphaseHap::new(id, &mrd_bam, &vcf, LongphaseConfig::default()).run() } #[test] fn run_longphase_modcall() -> anyhow::Result<()> { init(); let id = "ADJAGBA"; let time = "diag"; LongphaseModcallSolo::initialize(id, time, Config::default())?.run() } #[test] fn run_longphase_phase() -> anyhow::Result<()> { init(); let id = "ADJAGBA"; LongphasePhase::initialize(id, Config::default())?.run() } #[test] fn variant_parse() -> anyhow::Result<()> { let row = "chr1\t1366\t.\tC\tCCCT\t8.2\tPASS\t.\tGT:GQ:DP:AD:VAF:PL\t1/1:4:6:1,4:0.666667:6,4,0"; let variant: VcfVariant = row.parse()?; let var_string = variant.into_vcf_row(); assert_eq!(row, &var_string); let row = "chr1\t1366\t.\tC\tCCCT\t8.2\tPASS\t."; let variant: VcfVariant = row.parse()?; let var_string = variant.into_vcf_row(); assert_eq!(row, &var_string); let row = "chr1\t2628434\t.\tC\tT\t17.973\tPASS\tH;FAU=0;FCU=7;FGU=0;FTU=7;RAU=0;RCU=2;RGU=0;RTU=2\tGT:GQ:DP:AF:AD:NAF:NDP:NAD:AU:CU:GU:TU:NAU:NCU:NGU:NTU\t0/1:17:18:0.5:0,9:0:11:0,0:0:9:0:9:0:11:0:0"; let variant: VcfVariant = row.parse()?; let var_string = variant.into_vcf_row(); assert_eq!(row, &var_string); let row = "chr1\t52232\t.\tC\tCT\t18\t.\t.\tGT:GQ:DP:AD:AF\t1/.:1:24:3,5:0.208333"; let variant: VcfVariant = row.parse()?; let var_string = variant.into_vcf_row(); assert_eq!(row, &var_string); let row = "chr1\t52232\t.\tC\tCT\t18\t.\t.\tGT:GQ:DP:AD:AF\t1/1:1:24:3,5:0.208333"; let variant_b: VcfVariant = row.parse()?; assert_eq!(variant, variant_b); let row = "chr1\t475157\t.\tA\tG\t12.301\tPASS\tH;FAU=2;FCU=0;FGU=2;FTU=0;RAU=3;RCU=0;RGU=3;RTU=0\tGT:GQ:DP:AF:AD:NAF:NDP:NAD:AU:CU:GU:TU:NAU:NCU:NGU:NTU\t0/1:12:10:0.5:0,5:0.0769:13:0,1:5:0:5:0:12:0:1:0"; let variant: VcfVariant = row.parse()?; let var_string = variant.into_vcf_row(); assert_eq!(row, &var_string); Ok(()) } #[test] fn variant_load_deepvariant() -> anyhow::Result<()> { init(); let id = "ADJAGBA"; let time = "diag"; let dv = DeepVariant::initialize(id, time, Config::default())?; let annotations = Annotations::default(); let variant_collection = dv.variants(&annotations)?; println!("Deepvariant for {id} {time}: variants {} {}", variant_collection.variants.len(), variant_collection.vcf.n_variants); Ok(()) } #[test] fn variant_load_clairs() -> anyhow::Result<()> { init(); let id = "ADJAGBA"; let clairs = ClairS::initialize(id, Config::default())?; let annotations = Annotations::default(); let variant_collection = clairs.variants(&annotations)?; println!("ClairS for {id}: variants {} {}", variant_collection.variants.len(), variant_collection.vcf.n_variants); Ok(()) } #[test] fn variant_load_clairs_germline() -> anyhow::Result<()> { init(); let id = "ADJAGBA"; let clairs = ClairS::initialize(id, Config::default())?; let annotations = Annotations::default(); let germline_variant_collection = clairs.germline(&annotations)?; println!("ClairS for {id}: variants {} {}", germline_variant_collection.variants.len(), germline_variant_collection.vcf.n_variants); Ok(()) } #[test] fn pipe_somatic() -> anyhow::Result<()> { init(); let id = "ADJAGBA"; Somatic::initialize(id, Config::default())?.run() } #[test] fn overlaps() { init(); let positions = vec![ &GenomePosition { contig: 1, position: 100 }, &GenomePosition { contig: 1, position: 150 }, &GenomePosition { contig: 1, position: 200 }, &GenomePosition { contig: 2, position: 150 }, ]; let ranges = vec![ &GenomeRange { contig: 1, range: 50..150 }, &GenomeRange { contig: 2, range: 100..200 }, ]; let parallel_overlapping_indices = overlaps_par(&positions, &ranges); assert_eq!(parallel_overlapping_indices, vec![0, 3]) } #[test] fn bed_read() -> anyhow::Result<()> { init(); let path = &Config::default().mask_bed("ADJAGBA"); let r = read_bed(path)?; println!("{}", r.len()); Ok(()) } #[test] fn bases_at() -> anyhow::Result<()> { init(); let id = "ADJAGBA"; let c = Config::default(); let chr = "chr3"; let position = 62416039; // 1-based let mut bam = rust_htslib::bam::IndexedReader::from_path(c.solo_bam(id, "diag"))?; let p = nt_pileup(&mut bam, chr, position - 1, false)?.iter().map(|e| String::from_utf8(vec![*e]).unwrap()).collect::>(); let mut counts = HashMap::new(); for item in p.iter() { *counts.entry(item.as_str()).or_insert(0) += 1; } for (key, value) in &counts { println!("{}: {}", key, value); } assert_eq!(8, *counts.get("C").unwrap()); assert_eq!(13, *counts.get("G").unwrap()); assert_eq!(6, *counts.get("D").unwrap()); let chr = "chr1"; let position = 3220; // 1-based let mut bam = rust_htslib::bam::IndexedReader::from_path(c.solo_bam(id, "mrd"))?; let p = counts_at(&mut bam, chr, position - 1)?; println!("{p:#?}"); Ok(()) } #[test] fn seq_at() -> anyhow::Result<()> { init(); let c = Config::default(); let chr = "chr3"; let position = 716766; let mut fasta_reader = noodles_fasta::indexed_reader::Builder::default().build_from_path(c.reference)?; let r = pipes::somatic::sequence_at(&mut fasta_reader, chr, position, 10)?; println!("{r} ({} {:.2})", r.len(), estimate_shannon_entropy(r.as_str())); Ok(()) } #[test] fn ins_at() -> anyhow::Result<()> { init(); let id = "ADJAGBA"; let c = Config::default(); let chr = "chr1"; let position = 52232; // 1-based like in vcf let mut bam = rust_htslib::bam::IndexedReader::from_path(c.solo_bam(id, "mrd"))?; // let p = ins_pileup(&mut bam, chr, position - 1, true)?.iter().map(|e| String::from_utf8(vec![*e]).unwrap()).collect::>(); let counts = counts_ins_at(&mut bam, chr, position -1)?; for (key, value) in &counts { println!("{}: {}", key, value); } Ok(()) } #[test] fn vep_line() -> anyhow::Result<()> { init(); let line = "chr2_1922358_-/T\tchr2:1922357-1922358\tT\tMYT1L\tNM_001303052.2\tTranscript\tintron_variant\t-\t-\t-\t-\t-\t-\tIMPACT=MODIFIER;STRAND=-1;SYMBOL=MYT1L;SOURCE=chm13v2.0_RefSeq_Liftoff_v5.1_sorted.gff3.gz;HGVSc=NM_001303052.2:c.1619-613dup"; let vep_line: VepLine = line.parse()?; println!("{vep_line:#?}"); let vep: VEP = VEP::try_from(&vep_line)?; println!("{vep:#?}"); Ok(()) } #[test] fn savana_cn() -> anyhow::Result<()> { init(); let id = "CAMARA"; // let s = SavanaCopyNumber::load_id(id, Config::default())?; let s = SavanaReadCounts::load_id(id, Config::default())?; println!("tumoral reads: {}", s.n_tumoral_reads()); println!("normal reads: {}", s.n_normal_reads()); println!("tumoral:\n{:#?}", s.norm_chr_counts()); Ok(()) } #[test] fn coverage() -> anyhow::Result<()> { init(); let id = "CAMARA"; let time = "diag"; WGSBamStats::new(id, time, Config::default())?.print(); Ok(()) } #[test] fn load_bam() -> anyhow::Result<()> { init(); let id = "ADJAGBA"; let time = "diag"; let bam_path = Config::default().solo_bam(id, time); WGSBam::new(Path::new(&bam_path).to_path_buf())?; Ok(()) } #[test] fn tar() { init(); let mut ar = tar::Archive::new(fs::File::open("/data/lto/20241030_CUNVI-DG-N20_SEBZA-DG-N21.tar").unwrap()); for file in ar.entries().unwrap() { let file = file.unwrap(); let p = String::from_utf8(file.path_bytes().into_owned()).unwrap(); if p.contains("/pod5/") { let u = Pod5::from_path(&file.path().unwrap().into_owned(), &Pod5Config::default()); println!("{p}\n{u:#?}"); } } } #[test] fn run_somatic() -> anyhow::Result<()> { init(); let collections = Collections::new( CollectionsConfig::default() )?; let bams = collections.bam.by_id_completed(15.0, 10.0); let n = bams.len(); warn!("{n} cases"); for (i, bam) in bams.iter().enumerate() { let id = &bam.id; warn!("{i}/{n} {id}"); if id == "BANGA" { continue; } match Somatic::initialize(id, Config::default())?.run() { Ok(_) => (), Err(e) => error!("{id} {e}"), }; } Ok(()) } #[test] fn run_somatic_case() -> anyhow::Result<()> { init(); let id = "ADJAGBA"; let mut config = Config::default(); config.somatic_pipe_force = true; match Somatic::initialize(id, config)?.run() { Ok(_) => (), Err(e) => error!("{id} {e}"), }; Ok(()) } #[test] fn load_variants() -> anyhow::Result<()> { init(); let id = "ADJAGBA"; let config = Config::default(); let path = format!("{}/{id}/diag//somatic_variants.json.gz", config.result_dir); let variants = variant_collection::Variants::load_from_json(&path)?; println!("n variants {}", variants.data.len()); let n_vep: usize = variants.data.iter().map(|v| v.vep().len()).sum(); println!("VEP: {n_vep}"); Ok(()) } }