Thomas 1 год назад
Родитель
Сommit
ecc6803c77

+ 8 - 19
src/annotation/mod.rs

@@ -226,32 +226,21 @@ impl Annotations {
                 false
             }
         });
-
-        // let keys: Vec<u128> = self
-        //     .store
-        //     .par_iter()
-        //     .filter(|entry| filter(entry.value()))
-        //     .map(|entry| *entry.key())
-        //     .collect();
         info!("{} unique Variants to keep", keys.len());
 
-        // info!("Removing annotations");
-        // self.store.retain(|key, _| keys.contains(key));
-
         info!("Removing variants from collections");
         let n_removed: usize = variants
             .par_iter_mut()
             .map(|c| {
                 let before = c.variants.len();
-                c.variants = c
-                    .variants
-                    .par_iter()
-                    .filter(|a| keys.contains(&a.hash()))
-                    // .filter(|a| keys.par_iter().any(|k| k == &a.hash_variant()))
-                    .cloned()
-                    .collect();
-                // c.variants
-                //     .retain(|a| keys.par_iter().any(|k| k == &a.hash_variant()));
+                c.variants.retain(|a| keys.contains(&a.hash()));
+
+                // c.variants = c
+                //     .variants
+                //     .par_iter()
+                //     .filter(|a| keys.contains(&a.hash()))
+                //     .cloned()
+                //     .collect();
                 let after = c.variants.len();
                 info!("{} {}\t{}/{}", c.caller, c.category, before - after, before);
                 before - after

+ 27 - 8
src/callers/clairs.rs

@@ -12,7 +12,7 @@ use crate::{
     },
 };
 use anyhow::{Context, Ok};
-use log::info;
+use log::{debug, info};
 use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
 use std::{fs, path::Path};
 
@@ -117,7 +117,7 @@ impl Run for ClairS {
                 .save_to_file(&log_file)
                 .context(format!("Error while writing logs into {log_file}"))?;
         } else {
-            info!("ClairS VCFs exist.");
+            debug!("ClairS VCFs exist.");
         }
 
         // Germline
@@ -138,7 +138,7 @@ impl Run for ClairS {
 
             // fs::remove_file(&tmp_file).context(format!("Can't remove tmp file {tmp_file}"))?;
         } else {
-            info!("ClairS germline VCF exists.");
+            debug!("ClairS germline VCF exists.");
         }
 
         if !Path::new(&self.vcf_passed).exists() {
@@ -174,7 +174,7 @@ impl Run for ClairS {
 
             fs::remove_file(&tmp_file).context(format!("Can't remove tmp file {tmp_file}"))?;
         } else {
-            info!("ClairS PASSED vcf exists.");
+            debug!("ClairS PASSED vcf exists.");
         }
 
         Ok(())
@@ -192,14 +192,20 @@ impl Variants for ClairS {
         let (caller, category) = self.caller_cat();
         let add = vec![Annotation::Callers(caller.clone()), category.clone()];
         info!(
-            "Loading variant from ClairS {} with annotations: {:?}",
-            self.id, add
+            "Loading variants from {} {}: {}",
+            caller, category, self.vcf_passed
         );
         let variants = read_vcf(&self.vcf_passed)?;
-
         variants.par_iter().for_each(|v| {
             annotations.insert_update(v.hash(), &add);
         });
+        info!(
+            "{} {}, {} variants loaded.",
+            caller,
+            category,
+            variants.len()
+        );
+
         Ok(VariantCollection {
             variants,
             vcf: Vcf::new(self.vcf_passed.clone().into())?,
@@ -211,11 +217,24 @@ impl Variants for ClairS {
 
 impl ClairS {
     pub fn germline(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
-        let add = vec![Annotation::Callers(Caller::ClairS), Annotation::Germline];
+        let caller = Caller::ClairS;
+        let category = Annotation::Germline;
+        let add = vec![Annotation::Callers(caller.clone()), category.clone()];
+        info!(
+            "Loading variants from {} {}: {}",
+            caller, category, self.vcf_passed
+        );
+
         let variants = read_vcf(&self.clair3_germline_passed)?;
         variants.par_iter().for_each(|v| {
             annotations.insert_update(v.hash(), &add);
         });
+        info!(
+            "{} {}, {} variants loaded.",
+            caller,
+            category,
+            variants.len()
+        );
 
         Ok(VariantCollection {
             variants,

+ 10 - 5
src/callers/deep_variant.rs

@@ -142,18 +142,24 @@ impl CallerCat for DeepVariant {
     }
 }
 
-
-
 impl Variants for DeepVariant {
     fn variants(&self, annotations: &Annotations) -> anyhow::Result<VariantCollection> {
         let (caller, category) = self.caller_cat();
         let add = vec![Annotation::Callers(caller.clone()), category.clone()];
-        info!("Loading variant from DeepVariant {} {} with annotations: {:?}", self.id, self.time, add);
+        info!(
+            "Loading variants from {} {}: {}",
+            caller, category, self.vcf_passed
+        );
         let variants = read_vcf(&self.vcf_passed)?;
         variants.par_iter().for_each(|v| {
             annotations.insert_update(v.hash(), &add);
         });
-
+        info!(
+            "{} {}, {} variants loaded.",
+            caller,
+            category,
+            variants.len()
+        );
         Ok(VariantCollection {
             variants,
             vcf: Vcf::new(self.vcf_passed.clone().into())?,
@@ -164,4 +170,3 @@ impl Variants for DeepVariant {
 }
 
 impl RunnerVariants for DeepVariant {}
-

+ 20 - 25
src/callers/nanomonsv.rs

@@ -6,7 +6,7 @@ use std::{
 };
 
 use anyhow::Context;
-use log::{error, info, warn};
+use log::{debug, error, info, warn};
 
 use crate::{
     annotation::{Annotation, Annotations, Caller, CallerCat},
@@ -69,7 +69,6 @@ impl Run for NanomonSV {
     fn run(&mut self) -> anyhow::Result<()> {
         somatic_parse(self)?;
 
-        info!("Nanomonsv Get");
         let diag_out_prefix = format!("{}/{}_diag", self.diag_out_dir, self.id);
         let mrd_out_prefix = format!("{}/{}_mrd", self.mrd_out_dir, self.id);
 
@@ -77,30 +76,17 @@ impl Run for NanomonSV {
         let mrd_result_vcf = format!("{mrd_out_prefix}.nanomonsv.result.vcf");
 
         if !Path::new(&mrd_result_vcf).exists() {
+            info!("Nanomonsv get from normal bam: {}.", self.mrd_bam);
             let report = nanomonsv_get(&self.mrd_bam, &mrd_out_prefix, None, None, &self.config)
                 .context(format!(
                     "Error while running NanomonSV get for {mrd_result_vcf}"
                 ))?;
-            report
-                .save_to_file(&format!("{}/nanomonsv_get_mrd_", self.log_dir))
-                .unwrap();
+            report.save_to_file(&format!("{}/nanomonsv_get_mrd_", self.log_dir))?;
         }
 
         if !Path::new(&diag_result_vcf).exists() {
-            let report = nanomonsv_get(
-                &self.diag_bam,
-                &diag_out_prefix,
-                Some(&self.mrd_bam),
-                Some(&mrd_out_prefix),
-                &self.config,
-            )
-            .context(format!(
-                "Error while running NanomonSV get for {diag_result_vcf}"
-            ))?;
-            report.save_to_file(&format!("{}/nanomonsv_get_diag_", self.log_dir,))?;
-        }
+            info!("NanomonSV get from tumoral bam: {}.", self.diag_bam);
 
-        if !Path::new(&diag_result_vcf).exists() {
             let report = nanomonsv_get(
                 &self.diag_bam,
                 &diag_out_prefix,
@@ -141,14 +127,21 @@ impl Variants for NanomonSV {
         let (caller, category) = self.caller_cat();
         let add = vec![Annotation::Callers(caller.clone()), category.clone()];
         info!(
-            "Loading variant from NanomonSV {} with annotations: {:?}",
-            self.id, add
+            "Loading variants from {} {}: {}",
+            caller, category, self.vcf_passed
         );
+
         let variants = read_vcf(&self.vcf_passed)?;
 
         variants.par_iter().for_each(|v| {
             annotations.insert_update(v.hash(), &add);
         });
+        info!(
+            "{} {}, {} variants loaded.",
+            caller,
+            category,
+            variants.len()
+        );
         Ok(VariantCollection {
             variants,
             vcf: Vcf::new(self.vcf_passed.clone().into())?,
@@ -220,10 +213,10 @@ impl Run for NanomonSVSolo {
         }
 
         // Get
-        info!("Nanomonsv Get");
         let result_vcf = format!("{out_prefix}.nanomonsv.result.vcf");
 
         if !Path::new(&result_vcf).exists() {
+            info!("Nanomonsv Get");
             let report = nanomonsv_get(&self.bam, &out_prefix, None, None, &self.config).context(
                 format!("Error while running NanomonSV get for {result_vcf}"),
             )?;
@@ -307,7 +300,6 @@ pub fn nanomonsv_parse(bam: &str, out_prefix: &str, config: &Config) -> anyhow::
 }
 
 pub fn somatic_parse(s: &NanomonSV) -> anyhow::Result<()> {
-    info!("Nanomonsv Parse");
     let diag_out_prefix = format!("{}/{}_diag", s.diag_out_dir, s.id);
     let mrd_out_prefix = format!("{}/{}_mrd", s.mrd_out_dir, s.id);
 
@@ -317,6 +309,7 @@ pub fn somatic_parse(s: &NanomonSV) -> anyhow::Result<()> {
     let mut threads_handles = Vec::new();
     if !Path::new(&diag_info_vcf).exists() {
         let diag_bam = s.diag_bam.clone();
+        info!("Nanomonsv parse {diag_bam}.");
         let diag_out_prefix = diag_out_prefix.clone();
         let config = s.config.clone();
         let log_dir = s.log_dir.clone();
@@ -328,12 +321,14 @@ pub fn somatic_parse(s: &NanomonSV) -> anyhow::Result<()> {
         });
         threads_handles.push(handle);
     } else {
-        info!("Nanonmonsv parse already exists: {diag_info_vcf}");
+        debug!("Nanonmonsv parse results already exists: {diag_info_vcf}");
     }
 
     if !Path::new(&mrd_info_vcf).exists() {
-        let mrd_out_prefix = mrd_out_prefix.clone();
         let mrd_bam = s.mrd_bam.clone();
+        info!("Nanomonsv parse {mrd_bam}.");
+        let mrd_out_prefix = mrd_out_prefix.clone();
+
         let config = s.config.clone();
         let log_dir = s.log_dir.clone();
         let handle = thread::spawn(move || {
@@ -344,7 +339,7 @@ pub fn somatic_parse(s: &NanomonSV) -> anyhow::Result<()> {
         });
         threads_handles.push(handle);
     } else {
-        info!("Nanonmonsv parse already exists: {diag_info_vcf}");
+        debug!("Nanonmonsv parse results already exists: {mrd_info_vcf}");
     }
 
     // Wait for all threads to finish

+ 42 - 34
src/callers/severus.rs

@@ -12,7 +12,7 @@ use crate::{
 };
 use anyhow::Context;
 use rayon::prelude::*;
-use log::info;
+use log::{debug, info};
 use std::{fs, path::Path};
 
 #[derive(Debug)]
@@ -206,41 +206,47 @@ impl Run for SeverusSolo {
 
         let output_vcf = &self.config.severus_solo_output_vcf(id, time);
         let passed_vcf = &self.config.severus_solo_passed_vcf(id, time);
+        
+        if !Path::new(output_vcf).exists() {
+            // Run command if output VCF doesn't exist
+            let severus_args = [
+                "--target-bam",
+                &self.config.solo_bam(id, time),
+                "--out-dir",
+                &self.config.severus_solo_output_dir(id, time),
+                "-t",
+                &self.config.severus_threads.to_string(),
+                "--write-alignments",
+                "--use-supplementary-tag",
+                "--resolve-overlaps",
+                "--between-junction-ins",
+                "--vntr-bed",
+                &self.config.vntrs_bed,
+            ];
+            let args = [
+                "-c",
+                &format!(
+                    "source {} && conda activate severus_env && {} {}",
+                    self.config.conda_sh,
+                    self.config.severus_bin,
+                    severus_args.join(" ")
+                ),
+            ];
+            let mut cmd_run = CommandRun::new("bash", &args);
+            let report = run_wait(&mut cmd_run).context(format!(
+                "Error while running `severus.py {}`",
+                args.join(" ")
+            ))?;
 
-        // Run command if output VCF doesn't exist
-        let severus_args = [
-            "--target-bam",
-            &self.config.solo_bam(id, time),
-            "--out-dir",
-            &self.config.severus_solo_output_dir(id, time),
-            "-t",
-            &self.config.severus_threads.to_string(),
-            "--write-alignments",
-            "--use-supplementary-tag",
-            "--resolve-overlaps",
-            "--between-junction-ins",
-            "--vntr-bed",
-            &self.config.vntrs_bed,
-        ];
-        let args = [
-            "-c",
-            &format!(
-                "source {} && conda activate severus_env && {} {}",
-                self.config.conda_sh,
-                self.config.severus_bin,
-                severus_args.join(" ")
-            ),
-        ];
-        let mut cmd_run = CommandRun::new("bash", &args);
-        let report = run_wait(&mut cmd_run).context(format!(
-            "Error while running `severus.py {}`",
-            args.join(" ")
-        ))?;
+            let log_file = format!("{}/severus_", self.log_dir);
+            report
+                .save_to_file(&log_file)
+                .context(format!("Error while writing logs into {log_file}"))?;
+
+        } else {
+            debug!("Severus output vcf already exists.");
+        }
 
-        let log_file = format!("{}/severus_", self.log_dir);
-        report
-            .save_to_file(&log_file)
-            .context(format!("Error while writing logs into {log_file}"))?;
 
         // Keep PASS
         if !Path::new(passed_vcf).exists() && Path::new(output_vcf).exists() {
@@ -252,6 +258,8 @@ impl Run for SeverusSolo {
             report
                 .save_to_file(&log_file)
                 .context(format!("Error while writing logs into {log_file}"))?;
+        } else {
+            debug!("VCF PASSED already exists for Severus.");
         }
         Ok(())
     }

+ 16 - 25
src/helpers.rs

@@ -63,21 +63,21 @@ pub fn path_prefix(out: &str) -> anyhow::Result<String> {
 }
 
 pub fn force_or_not(path: &str, force: bool) -> anyhow::Result<()> {
-    let path = Path::new(path);
-    let mut output_exists = path.exists();
-    let dir = path
-        .parent()
-        .context(format!("Can't parse the parent dir of {}", path.display()))?;
-
-    if force && output_exists {
-        fs::remove_dir_all(dir)?;
-        fs::create_dir_all(dir)?;
-        output_exists = false;
-    }
-
-    if output_exists {
-        warn!("{} already exists.", path.display())
-    }
+    // let path = Path::new(path);
+    // let mut output_exists = path.exists();
+    // let dir = path
+    //     .parent()
+    //     .context(format!("Can't parse the parent dir of {}", path.display()))?;
+    //
+    // if force && output_exists {
+    //     fs::remove_dir_all(dir)?;
+    //     fs::create_dir_all(dir)?;
+    //     output_exists = false;
+    // }
+    //
+    // if output_exists {
+    //     info!("{} already exists.", path.display())
+    // }
     Ok(())
 }
 
@@ -319,6 +319,7 @@ impl Hasher for Blake3Hash {
     }
 }
 
+/// Default Hasher 
 #[derive(Default, Clone)]
 pub struct Blake3BuildHasher;
 
@@ -333,15 +334,12 @@ impl BuildHasher for Blake3BuildHasher {
 #[derive(PartialEq, Eq, Clone, Copy, Serialize, Deserialize, Debug)]
 pub struct Hash128([u8; 16]);
 
-// use std::hash::{Hash, Hasher};
-
 impl std::hash::Hash for Hash128 {
     fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
         state.write(&self.0);
     }
 }
 
-
 impl Hash128 {
     pub fn new(bytes: [u8; 16]) -> Self {
         Hash128(bytes)
@@ -350,10 +348,3 @@ impl Hash128 {
         self.0
     }
 }
-
-// impl Hash for Hash128 {
-//     fn hash<H: Hasher>(&self, state: &mut H) {
-//         state.write(&self.0);
-//     }
-// }
-//

+ 2 - 2
src/io/dict.rs

@@ -1,10 +1,10 @@
 use std::fs;
 use anyhow::{Context, Ok, Result};
 use csv::ReaderBuilder;
-use log::info;
+use log::debug;
 
 pub fn read_dict(path: &str) -> Result<Vec<(String, u32)>> {
-    info!("Parsing {path}.");
+    debug!("Parsing {path}.");
 
     let mut reader = ReaderBuilder::new()
         .delimiter(b'\t')

+ 6 - 0
src/io/vcf.rs

@@ -69,6 +69,12 @@ pub fn vcf_header(dict: &str) -> anyhow::Result<Vec<String>> {
     // Format
     // Filter
     // Info
+    header.push(
+        "##INFO=<ID=SVTYPE,Number=1,Type=String,Description=\"Type of structural variant\">"
+            .to_string(),
+    );
+    header.push("##INFO=<ID=SVLEN,Number=1,Type=Integer,Description=\"Difference in length between REF and ALT alleles\">".to_string());
+    header.push("##INFO=<ID=END,Number=1,Type=Integer,Description=\"End position of the variant described in this record\">".to_string());
 
     // version
     header.push(format!("##Pandora_version={}", env!("CARGO_PKG_VERSION")));

+ 2 - 2
src/pipes/somatic.rs

@@ -128,7 +128,7 @@ impl Run for Somatic {
         annotations.callers_stat();
 
         // Filter: Variants neither Germline nor SoloConstit
-        info!("Keeping somatic variants (variants neither in solo nor in germline called).");
+        info!("Keeping somatic variants (variants neither in solo nor in germline).");
         somatic_stats.n_constit_germline =
             annotations.retain_variants(&mut variants_collections, |anns| {
                 !anns.contains(&Annotation::Germline) && !anns.contains(&Annotation::SoloConstit)
@@ -136,7 +136,7 @@ impl Run for Somatic {
         annotations.callers_stat();
 
         // Annotation: BAM depth, n_alt
-        info!("Reading Constit Bam file for depth and pileup annotation.");
+        info!("Reading Constit BAM file for depth and pileup annotation.");
         variants_collections.iter().try_for_each(|c| {
             c.annotate_with_constit_bam(&annotations, &self.config.normal_bam(&id), 150)
         })?;

+ 80 - 12
src/variant/variant.rs

@@ -33,17 +33,17 @@ impl PartialEq for VcfVariant {
     }
 }
 
-impl Hash for VcfVariant {
-    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
-        let mut hasher = Hasher::new();
-        hasher.update(&self.position.contig.to_ne_bytes()); // Convert position to bytes
-        hasher.update(&self.position.position.to_ne_bytes()); // Convert position to bytes
-        hasher.update(self.reference.to_string().as_bytes()); // Reference string as bytes
-        hasher.update(self.alternative.to_string().as_bytes()); // Alternative string as bytes
-        let hash = hasher.finalize();
-        state.write(&hash.as_bytes()[..16]);
-    }
-}
+// impl Hash for VcfVariant {
+//     fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+//         let mut hasher = Hasher::new();
+//         hasher.update(&self.position.contig.to_ne_bytes()); // Convert position to bytes
+//         hasher.update(&self.position.position.to_ne_bytes()); // Convert position to bytes
+//         hasher.update(self.reference.to_string().as_bytes()); // Reference string as bytes
+//         hasher.update(self.alternative.to_string().as_bytes()); // Alternative string as bytes
+//         let hash = hasher.finalize();
+//         state.write(&hash.as_bytes()[..16]);
+//     }
+// }
 
 impl Eq for VcfVariant {}
 
@@ -82,7 +82,6 @@ impl FromStr for VcfVariant {
             .parse()
             .context(format!("Can't parse alternative from: {s}"))?;
 
-
         let mut hasher = blake3::Hasher::new();
         hasher.update(&position.contig.to_ne_bytes()); // Convert position to bytes
         hasher.update(&position.position.to_ne_bytes()); // Convert position to bytes
@@ -165,6 +164,37 @@ impl VcfVariant {
         }
     }
 
+    pub fn has_svtype(&self) -> bool {
+        self.infos.0.iter().any(|i| matches!(i, Info::SVTYPE(_)))
+    }
+
+    // pub fn into_sv_vcf_row() {
+    //     let vcf_position: VcfPosition = self.position.clone().into();
+    //     let (contig, position) = vcf_position.into();
+    //
+    //     let mut columns = vec![
+    //         contig,
+    //         position,
+    //         self.i.to_string(),
+    //         self.reference.to_string(),
+    //         self.alternative.to_string(),
+    //         self.quality
+    //             .map(|v| v.to_string())
+    //             .unwrap_or(".".to_string()),
+    //         self.filter.to_string(),
+    //         self.infos.to_string(),
+    //     ];
+    //
+    //     if !self.formats.0.is_empty() {
+    //         let (format, values) = self.formats.clone().into();
+    //         columns.push(format);
+    //         columns.push(values);
+    //     }
+    //
+    //     columns.join("\t")
+    //
+    // }
+
     // pub fn hash_variant(&self) -> Hash128 {
     //     // Create a new BLAKE3 hasher
     //     let mut hasher = blake3::Hasher::new();
@@ -308,6 +338,12 @@ pub enum Info {
     RCU(u32),
     RGU(u32),
     RTU(u32),
+    SVTYPE(String),
+    SVLEN(i32),
+    END(u32),
+    MATEID(String),
+    SVINSLEN(u32),
+    SVINSSEQ(String),
 }
 
 impl FromStr for Info {
@@ -359,6 +395,24 @@ impl FromStr for Info {
                         .parse()
                         .context(format!("Can't parse into u32: {value}"))?,
                 ),
+                "SVTYPE" => Info::SVTYPE(value.to_string()),
+                "SVLEN" => Info::SVLEN(
+                    value
+                        .parse()
+                        .context(format!("Can't parse into u32: {value}"))?,
+                ),
+                "END" => Info::END(
+                    value
+                        .parse()
+                        .context(format!("Can't parse into u32: {value}"))?,
+                ),
+                "MATEID" => Info::MATEID(value.to_string()),
+                "SVINSLEN" => Info::SVINSLEN(
+                    value
+                        .parse()
+                        .context(format!("Can't parse into u32: {value}"))?,
+                ),
+                "SVINSSEQ" => Info::SVINSSEQ(value.to_string()),
 
                 _ => Info::Empty,
             })
@@ -389,6 +443,12 @@ impl fmt::Display for Info {
             Info::RCU(v) => write!(f, "RCU={v}"),
             Info::RGU(v) => write!(f, "RGU={v}"),
             Info::RTU(v) => write!(f, "RTU={v}"),
+            Info::SVTYPE(v) => write!(f, "SVTYPE={v}"),
+            Info::SVLEN(v) => write!(f, "SVLEN={v}"),
+            Info::END(v) => write!(f, "END={v}"),
+            Info::MATEID(v) => write!(f, "MATEID={v}"),
+            Info::SVINSLEN(v) => write!(f, "SVINSLEN={v}"),
+            Info::SVINSSEQ(v) => write!(f, "SVINSSEQ={v}"),
         }
     }
 }
@@ -418,6 +478,10 @@ pub enum Format {
     NGU(u32),
     NTU(u32),
 
+    // nanomonsv
+    TR(u32),
+    VR(u32),
+
     Other((String, String)), // (key, value)
 }
 
@@ -481,6 +545,8 @@ impl TryFrom<(&str, &str)> for Format {
                     .map(|e| e.parse().context("Failed to parse AD"))
                     .collect::<anyhow::Result<Vec<_>>>()?,
             ),
+            "TR" =>Format::TR(value.parse()?),
+            "VR" =>Format::TR(value.parse()?),
             _ => Format::Other((key.to_string(), value.to_string())),
         })
     }
@@ -516,6 +582,8 @@ impl From<Format> for (String, String) {
             Format::NCU(value) => ("NCU".to_string(), value.to_string()),
             Format::NGU(value) => ("NGU".to_string(), value.to_string()),
             Format::NTU(value) => ("NTU".to_string(), value.to_string()),
+            Format::TR(value) => ("TR".to_string(), value.to_string()),
+            Format::VR(value) => ("VR".to_string(), value.to_string()),
         }
     }
 }

+ 173 - 59
src/variant/variant_collection.rs

@@ -6,7 +6,7 @@ use std::{
 
 use anyhow::Context;
 use csv::ReaderBuilder;
-use log::{info, warn};
+use log::{debug, info, warn};
 use rayon::prelude::*;
 use uuid::Uuid;
 
@@ -42,8 +42,7 @@ impl VariantCollection {
     }
 
     pub fn retain_keys(&mut self, keys_to_keep: &HashSet<Hash128>) {
-        self.variants
-            .retain(|v| keys_to_keep.contains(&v.hash()));
+        self.variants.retain(|v| keys_to_keep.contains(&v.hash()));
     }
 
     pub fn remove_keys(&mut self, keys_to_remove: &HashSet<Hash128>) {
@@ -233,7 +232,7 @@ impl ExternalAnnotation {
         let mut unfound = Vec::new();
 
         for variant in variants {
-            let hash  = variant.hash();
+            let hash = variant.hash();
             let mut has_pushed = false;
 
             // Check COSMIC
@@ -305,8 +304,14 @@ impl ExternalAnnotation {
 
         let header = vcf_header("/data/ref/hs1/chm13v2.0.dict")?.join("\n");
 
+        let min_chunk_size = 1000;
+        let max_chunks = 150;
+
+        let optimal_chunk_size = unfound.len().div_ceil(max_chunks as usize);
+        let optimal_chunk_size = optimal_chunk_size.max(min_chunk_size);
+
         let results = unfound
-            .par_chunks(unfound.len() / 33)
+            .par_chunks(optimal_chunk_size)
             .flat_map(|chunk| -> anyhow::Result<Vec<_>> {
                 let in_tmp = temp_dir.join(format!("echtvar_in_{}.vcf", Uuid::new_v4()));
                 let out_tmp = temp_dir.join(format!("echtvar_out_{}.vcf.gz", Uuid::new_v4()));
@@ -450,72 +455,181 @@ impl ExternalAnnotation {
 
         let header = vcf_header("/data/ref/hs1/chm13v2.0.dict")?.join("\n");
 
-        let results = unfound
-            .par_chunks(unfound.len() / 33)
-            .flat_map(|chunk| -> anyhow::Result<Vec<_>> {
-                let in_tmp = temp_file_path("vcf")?.to_str().unwrap().to_string();
-                let out_vep = temp_file_path("_vep.txt")?.to_str().unwrap().to_string();
+        let (sv, unfound): (Vec<VcfVariant>, Vec<VcfVariant>) =
+            unfound.into_iter().partition(|v| v.has_svtype());
 
-                // Write input VCF
-                let mut vcf = File::create(&in_tmp)?;
-                writeln!(vcf, "{}", header)?;
-                for (i, row) in chunk.iter().enumerate() {
-                    writeln!(
-                        vcf,
-                        "{}\t{}\t{}\t{}\t{}\t.\tPASS\t.\t.\t.",
-                        row.position.contig(),
-                        row.position.position + 1, // vcf
-                        i + 1,
-                        row.reference,
-                        row.alternative
-                    )?;
-                }
+        warn!("SV {}", sv.len());
 
-                run_vep(&in_tmp, &out_vep)?;
+        let min_chunk_size = 1000;
+        let max_chunks = 150;
+
+        let mut results = if !unfound.is_empty() {
+            let optimal_chunk_size = unfound.len().div_ceil(max_chunks as usize);
+            let optimal_chunk_size = optimal_chunk_size.max(min_chunk_size);
+
+            unfound
+                .par_chunks(optimal_chunk_size)
+                .flat_map(|chunk| -> anyhow::Result<Vec<_>> {
+                    let in_tmp = temp_file_path("vcf")?.to_str().unwrap().to_string();
+                    let out_vep = temp_file_path("_vep.txt")?.to_str().unwrap().to_string();
+                    let out_summary = format!("{out_vep}_summary.html");
+                    let out_warnings = format!("{out_vep}_warnings.txt");
+
+                    // Write input VCF
+                    let mut vcf = File::create(&in_tmp)?;
+                    writeln!(vcf, "{}", header)?;
+                    for (i, row) in chunk.iter().enumerate() {
+                        writeln!(
+                            vcf,
+                            "{}\t{}\t{}\t{}\t{}\t.\tPASS\t.\t.\t.",
+                            row.position.contig(),
+                            row.position.position + 1, // vcf
+                            i + 1,
+                            row.reference,
+                            row.alternative
+                        )?;
+                    }
 
-                let mut reader_vep = ReaderBuilder::new()
-                    .delimiter(b'\t')
-                    .has_headers(false)
-                    .comment(Some(b'#'))
-                    .flexible(true)
-                    .from_reader(fs::File::open(out_vep.clone())?);
+                    run_vep(&in_tmp, &out_vep)?;
 
-                let mut lines: HashMap<u64, Vec<VepLine>> = HashMap::new();
-                for line in reader_vep.deserialize() {
-                    let line: VepLine = line.context("Failed to deserialize VepLine")?;
-                    let key = line
-                        .uploaded_variation
-                        .parse::<u64>()
-                        .context("Failed to parse uploaded_variation as u64")?;
+                    let mut reader_vep = ReaderBuilder::new()
+                        .delimiter(b'\t')
+                        .has_headers(false)
+                        .comment(Some(b'#'))
+                        .flexible(true)
+                        .from_reader(fs::File::open(out_vep.clone())?);
 
-                    lines.entry(key).or_default().push(line);
-                }
+                    let mut lines: HashMap<u64, Vec<VepLine>> = HashMap::new();
+                    for line in reader_vep.deserialize() {
+                        let line: VepLine = line.context("Failed to deserialize VepLine")?;
+                        let key = line
+                            .uploaded_variation
+                            .parse::<u64>()
+                            .context("Failed to parse uploaded_variation as u64")?;
 
-                fs::remove_file(in_tmp)?;
-                fs::remove_file(out_vep)?;
+                        lines.entry(key).or_default().push(line);
+                    }
+
+                    fs::remove_file(in_tmp)?;
+                    fs::remove_file(out_vep)?;
 
-                let mut n_not_vep = 0;
-                let mut chunk_results: Vec<(Hash128, Vec<VEP>)> = Vec::new();
+                    let mut n_not_vep = 0;
+                    let mut chunk_results: Vec<(Hash128, Vec<VEP>)> = Vec::new();
 
-                chunk.iter().enumerate().for_each(|(i, entry)| {
-                    let k = (i + 1) as u64;
+                    chunk.iter().enumerate().for_each(|(i, entry)| {
+                        let k = (i + 1) as u64;
 
-                    if let Some(vep_lines) = lines.get(&k) {
-                        if let Ok(veps) = vep_lines.iter().map(VEP::try_from).collect() {
-                            chunk_results.push((entry.hash(), veps));
+                        if let Some(vep_lines) = lines.get(&k) {
+                            if let Ok(veps) = vep_lines.iter().map(VEP::try_from).collect() {
+                                chunk_results.push((entry.hash(), veps));
+                            }
+                        } else {
+                            warn!(
+                                "No VEP entry for {}:{}>{}",
+                                entry.position.to_string(),
+                                entry.reference.to_string(),
+                                entry.alternative.to_string()
+                            );
+                            n_not_vep += 1;
                         }
-                    } else {
-                        n_not_vep += 1;
+                    });
+
+                    if n_not_vep > 0 {
+                        debug!("{n_not_vep} variants not annotated by VEP.");
+                        let warnings = fs::read_to_string(&out_warnings)
+                            .context(format!("Can't read VEP warnings: {out_warnings}"))?;
+                        warn!("VEP warnings:\n{warnings}");
+                    }
+                    fs::remove_file(out_warnings)?;
+                    fs::remove_file(out_summary)?;
+                    Ok(chunk_results)
+                })
+                .flatten()
+                .collect::<Vec<_>>()
+        } else {
+            Vec::new()
+        };
+
+        if !sv.is_empty() {
+            let optimal_chunk_size = sv.len().div_ceil(max_chunks as usize);
+            let optimal_chunk_size = optimal_chunk_size.max(min_chunk_size);
+
+            let results_sv = sv
+                .par_chunks(optimal_chunk_size)
+                .flat_map(|chunk| -> anyhow::Result<Vec<_>> {
+                    let in_tmp = temp_file_path(".vcf")?.to_str().unwrap().to_string();
+                    let out_vep = temp_file_path("_vep.txt")?.to_str().unwrap().to_string();
+                    let out_summary = format!("{out_vep}_summary.html");
+                    let out_warnings = format!("{out_vep}_warnings.txt");
+
+                    // Write input VCF
+                    let mut vcf = File::create(&in_tmp)?;
+                    writeln!(vcf, "{}", header)?;
+                    for (i, mut row) in chunk.iter().cloned().enumerate() {
+                        row.id = (i + 1).to_string();
+                        let s = row.into_vcf_row();
+                        writeln!(vcf, "{s}",)?;
                     }
-                });
 
-                if n_not_vep > 0 {
-                    warn!("{n_not_vep} variants not annotated by VEP");
-                }
-                Ok(chunk_results)
-            })
-            .flatten()
-            .collect::<Vec<_>>();
+                    run_vep(&in_tmp, &out_vep)?;
+
+                    let mut reader_vep = ReaderBuilder::new()
+                        .delimiter(b'\t')
+                        .has_headers(false)
+                        .comment(Some(b'#'))
+                        .flexible(true)
+                        .from_reader(fs::File::open(out_vep.clone())?);
+
+                    let mut lines: HashMap<u64, Vec<VepLine>> = HashMap::new();
+                    for line in reader_vep.deserialize() {
+                        let line: VepLine = line.context("Failed to deserialize VepLine")?;
+                        let key = line
+                            .uploaded_variation
+                            .parse::<u64>()
+                            .context("Failed to parse uploaded_variation as u64")?;
+
+                        lines.entry(key).or_default().push(line);
+                    }
+
+                    fs::remove_file(in_tmp)?;
+                    fs::remove_file(out_vep)?;
+
+                    let mut n_not_vep = 0;
+                    let mut chunk_results: Vec<(Hash128, Vec<VEP>)> = Vec::new();
+
+                    chunk.iter().enumerate().for_each(|(i, entry)| {
+                        let k = (i + 1) as u64;
+
+                        if let Some(vep_lines) = lines.get(&k) {
+                            if let Ok(veps) = vep_lines.iter().map(VEP::try_from).collect() {
+                                chunk_results.push((entry.hash(), veps));
+                            }
+                        } else {
+                            warn!(
+                                "No VEP entry for {}\t{}\t{}",
+                                entry.position.to_string(),
+                                entry.reference.to_string(),
+                                entry.alternative.to_string()
+                            );
+                            n_not_vep += 1;
+                        }
+                    });
+
+                    if n_not_vep > 0 {
+                        debug!("{n_not_vep} variants not annotated by VEP.");
+                        let warnings = fs::read_to_string(&out_warnings)
+                            .context(format!("Can't read VEP warnings: {out_warnings}"))?;
+                        warn!("VEP warnings:\n{warnings}");
+                    }
+                    fs::remove_file(out_warnings)?;
+                    fs::remove_file(out_summary)?;
+                    Ok(chunk_results)
+                })
+                .flatten()
+                .collect::<Vec<_>>();
+
+            results.extend(results_sv);
+        }
 
         for (hash, veps) in results {
             // self.update_database(hash, "vep", &serde_json::to_vec(&veps)?)?;