Browse Source

multiallelic normalization (just for INFO FREQ of dbsnp) for fetch_vcf, prefer normalized vcf...

Thomas 1 tháng trước cách đây
mục cha
commit
a0f6db44b0

+ 105 - 0
src/annotation/dbsnp.rs

@@ -0,0 +1,105 @@
+use std::{fmt, str::FromStr};
+
+use bitcode::{Decode, Encode};
+use serde::{Deserialize, Serialize};
+
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Encode, Decode)]
+pub struct DbSnpFreqEntry {
+    pub source: String,
+    pub values: Vec<Option<f64>>,
+}
+
+impl fmt::Display for DbSnpFreqEntry {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        let values = self
+            .values
+            .iter()
+            .map(|v| match v {
+                Some(v) => v.to_string(),
+                None => ".".to_string(),
+            })
+            .collect::<Vec<_>>()
+            .join(",");
+
+        write!(f, "{}:{}", self.source, values)
+    }
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Encode, Decode)]
+pub struct DbSnpFreq(pub Vec<DbSnpFreqEntry>);
+
+impl FromStr for DbSnpFreq {
+    type Err = anyhow::Error;
+
+    fn from_str(s: &str) -> anyhow::Result<Self> {
+        let entries = s
+            .split('|')
+            .filter(|x| !x.is_empty())
+            .map(|entry| {
+                let (source, values) = entry
+                    .split_once(':')
+                    .ok_or_else(|| anyhow::anyhow!("Invalid FREQ entry: {entry}"))?;
+
+                let values = values
+                    .split(',')
+                    .map(|v| {
+                        if v == "." {
+                            Ok(None)
+                        } else {
+                            v.parse::<f64>().map(Some).map_err(|e| {
+                                anyhow::anyhow!("Invalid FREQ value `{v}` in {source}: {e}")
+                            })
+                        }
+                    })
+                    .collect::<anyhow::Result<Vec<_>>>()?;
+
+                Ok(DbSnpFreqEntry {
+                    source: source.to_string(),
+                    values,
+                })
+            })
+            .collect::<anyhow::Result<Vec<_>>>()?;
+
+        Ok(DbSnpFreq(entries))
+    }
+}
+
+impl fmt::Display for DbSnpFreq {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        let s = self
+            .0
+            .iter()
+            .map(|e| e.to_string())
+            .collect::<Vec<_>>()
+            .join("|");
+
+        write!(f, "{s}")
+    }
+}
+
+impl DbSnpFreq {
+    /// Average ALT frequency across real population sources in a dbSNP FREQ field.
+    pub fn maf(&self) -> Option<f32> {
+        const EXCLUDED: &[&str] = &["SGDP_PRJ", "dbGaP_PopFreq"];
+
+        let (sum, count) = self
+            .0
+            .iter()
+            .filter_map(|entry| {
+                if EXCLUDED.contains(&entry.source.as_str()) {
+                    return None;
+                }
+
+                let alt = entry.values.get(1).copied().flatten()? as f32;
+
+                if alt <= 0.0 {
+                    None
+                } else {
+                    Some(alt)
+                }
+            })
+            .fold((0.0_f32, 0usize), |(s, c), af| (s + af, c + 1));
+
+        (count > 0).then_some(sum / count as f32)
+    }
+}

+ 1 - 1
src/annotation/echtvar.rs

@@ -149,7 +149,7 @@ mod tests {
             annotations.insert_update(v.hash(), std::slice::from_ref(&caller));
         });
 
-        let ext_annot = ExternalAnnotation::init(&config)?;
+        let ext_annot = ExternalAnnotation::init("TEST", &config)?;
         ext_annot.annotate(&variants, &annotations)?;
 
         println!("{annotations:#?}");

+ 1 - 0
src/annotation/mod.rs

@@ -4,6 +4,7 @@ pub mod echtvar;
 pub mod gnomad;
 pub mod ncbi;
 pub mod vep;
+pub mod dbsnp;
 
 use std::{
     collections::{HashMap, HashSet},

+ 32 - 17
src/annotation/vep.rs

@@ -1,4 +1,4 @@
-use anyhow::anyhow;
+use anyhow::{anyhow, Context};
 use bitcode::{Decode, Encode};
 use hashbrown::HashMap;
 use itertools::Itertools;
@@ -15,6 +15,8 @@ use crate::{
     commands::{Command as JobCommand, LocalBatchRunner, LocalRunner, SbatchRunner},
     config::Config,
     helpers::singularity_bind_flags,
+    run,
+    runners::Run,
 };
 
 use super::ncbi::NCBIAcc;
@@ -642,17 +644,28 @@ impl FromStr for VEPExtra {
 
 #[derive(Debug)]
 pub struct VepJob {
+    case_id: String,
     in_vcf: PathBuf,
     out_vcf: PathBuf,
     config: Config,
+    log_dir: String,
 }
 
 impl VepJob {
-    pub fn new(in_path: impl AsRef<Path>, out_path: impl AsRef<Path>, config: &Config) -> Self {
+    pub fn new(
+        in_path: impl AsRef<Path>,
+        out_path: impl AsRef<Path>,
+        case_id: &str,
+        config: &Config,
+    ) -> Self {
+        let log_dir = format!("{}/{}/log/VEP", config.result_dir, &case_id);
+
         VepJob {
+            case_id: case_id.to_string(),
             in_vcf: in_path.as_ref().to_path_buf(),
             out_vcf: out_path.as_ref().to_path_buf(),
             config: config.clone(),
+            log_dir,
         }
     }
 }
@@ -710,16 +723,17 @@ impl SbatchRunner for VepJob {
     }
 }
 
-// impl Run for VepJob {
-//     fn run(&mut self) -> anyhow::Result<()> {
-//         let report = run!(&self.config, self).with_context(|| format!("Failed to filter PASS for {}", self.id))?;
-//         report
-//             .save_to_file(format!("{}/bcftools_pass_", self.config.log_dir))
-//             .context("Can't save bcftools PASS logs")?;;
-//         Ok(())
-//     }
-//     // add code here
-// }
+impl Run for VepJob {
+    fn run(&mut self) -> anyhow::Result<()> {
+        let report = run!(&self.config, self)
+            .with_context(|| format!("Failed VEP chunk for {}", self.case_id))?;
+        report
+            .save_to_file(format!("{}/VEP_chunk_", self.log_dir))
+            .context("Can't save VEP logs")?;
+        Ok(())
+    }
+    // add code here
+}
 
 /// Selects the "best" VEP annotation from a list, based on biological impact and transcript priority.
 ///
@@ -791,15 +805,16 @@ pub fn get_best_vep(d: &[VEP]) -> anyhow::Result<VEP> {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::{helpers::test_init, run};
+    use crate::helpers::test_init;
 
     #[test]
     fn vep_run() -> anyhow::Result<()> {
         test_init();
         let config = Config::default();
-        let mut vep_job = VepJob { in_vcf: "/mnt/beegfs02/scratch/t_steimle/data/wgs/DUMCO/diag/ClairS/DUMCO_diag_clairs_PASSED.vcf.gz".into(), out_vcf: "/mnt/beegfs02/scratch/t_steimle/data/wgs/DUMCO/diag/TEST_clairs_PASSED_VEP.vcf".into(), config: config.clone() };
-        let r = run!(config, &mut vep_job)?;
-        println!("{r:#?}");
-        Ok(())
+        let mut vep_job = VepJob::new("/mnt/beegfs02/scratch/t_steimle/data/wgs/DUMCO/diag/ClairS/DUMCO_diag_clairs_PASSED.vcf.gz", "/mnt/beegfs02/scratch/t_steimle/data/wgs/DUMCO/diag/TEST_clairs_PASSED_VEP.vcf", "TEST", &config);
+        crate::runners::Run::run(&mut vep_job)
+        // let mut vep_job = VepJob { in_vcf: "/mnt/beegfs02/scratch/t_steimle/data/wgs/DUMCO/diag/ClairS/DUMCO_diag_clairs_PASSED.vcf.gz".into(), out_vcf: "/mnt/beegfs02/scratch/t_steimle/data/wgs/DUMCO/diag/TEST_clairs_PASSED_VEP.vcf".into(), config: config.clone() };
+        // let r = run!(config, &mut vep_job)?;
+        // println!("{r:#?}");
     }
 }

+ 8 - 4
src/io/vcf.rs

@@ -24,7 +24,7 @@ use noodles_core::Position;
 use crate::{
     io::writers::{BgzTabixWriter, IndexFormat},
     positions::GenomeRange,
-    variant::vcf_variant::VcfVariant,
+    variant::vcf_variant::{from_multiallelic, VcfVariant},
 };
 
 use super::{
@@ -154,7 +154,9 @@ pub fn fetch_vcf(bgz_path: &str, region: &GenomeRange) -> anyhow::Result<Vec<Vcf
             .parse()
             .with_context(|| format!("failed to parse VCF line: {line}"))?;
         if v.position.contig == region.contig && region.range.contains(&v.position.position) {
-            variants.push(v);
+            for split in from_multiallelic(v) {
+                variants.push(split);
+            }
         }
         Ok(())
     })?;
@@ -259,8 +261,10 @@ mod tests {
     use super::*;
 
     #[test]
-    fn vcf_dbsnp() {
+    fn vcf_dbsnp() -> anyhow::Result<()> {
         let config = Config::default();
-        fetch_vcf(conf.db_snp, &GenomeRange::new("chr10", , end))
+        let v = fetch_vcf(&config.db_snp, &GenomeRange::new("chr1", 5656, 5657))?;
+        println!("{v:#?}");
+        Ok(())
     }
 }

+ 3 - 3
src/pipes/somatic.rs

@@ -427,7 +427,7 @@ impl Run for SomaticPipe {
         variants_collections
             .iter()
             .try_for_each(|c| -> anyhow::Result<()> {
-                let ext_annot = ExternalAnnotation::init(&config)?;
+                let ext_annot = ExternalAnnotation::init(&id, &config)?;
                 ext_annot.annotate(&c.variants, &annotations)?;
                 Ok(())
             })?;
@@ -493,7 +493,7 @@ impl Run for SomaticPipe {
         variants_collections
             .iter()
             .try_for_each(|c| -> anyhow::Result<()> {
-                let ext_annot = ExternalAnnotation::init(&config)?;
+                let ext_annot = ExternalAnnotation::init(&id, &config)?;
                 ext_annot.annotate_vep(&c.variants, &annotations)?;
                 Ok(())
             })?;
@@ -548,7 +548,7 @@ pub fn const_stats(id: String, config: Config) -> anyhow::Result<()> {
     let clairs_germline = ClairS::initialize(&id, &config.clone())?.germline(&annotations)?;
 
     info!("Annotation with Cosmic and GnomAD.");
-    let ext_annot = ExternalAnnotation::init(&config)?;
+    let ext_annot = ExternalAnnotation::init(&id, &config)?;
     ext_annot.annotate(&clairs_germline.variants, &annotations)?;
 
     let mut variants = Variants::default();

+ 23 - 9
src/variant/variant_collection.rs

@@ -8,7 +8,7 @@ use std::{
 
 use anyhow::Context;
 // use bgzip::{BGZFReader, BGZFWriter};
-use crate::io::tsv::TsvLine;
+use crate::{io::tsv::TsvLine, runners::Run};
 use bitcode::{Decode, Encode};
 use dashmap::DashMap;
 use log::{debug, error, info, warn};
@@ -1672,12 +1672,13 @@ pub enum ExtAnnotationSource {
 use rusqlite::{params, Connection, Result as SqliteResult};
 
 pub struct ExternalAnnotation {
+    pub case_id: String,
     pub conn: Connection,
     pub config: Config,
 }
 
 impl ExternalAnnotation {
-    pub fn init(config: &Config) -> anyhow::Result<Self> {
+    pub fn init(case_id: &str, config: &Config) -> anyhow::Result<Self> {
         let mut db_path = app_storage_dir()?;
         db_path.push("annotations_db.sqlite");
         info!("Opening DB: {}", db_path.display());
@@ -1695,6 +1696,7 @@ impl ExternalAnnotation {
         )?;
 
         Ok(Self {
+            case_id: case_id.to_string(),
             conn,
             config: config.clone(),
         })
@@ -1945,6 +1947,7 @@ impl ExternalAnnotation {
         let max_chunks = 150;
 
         let config: &Config = &self.config;
+        let case_id = self.case_id.clone();
 
         let mut results: Vec<(Hash128, Vec<VEP>)> = if !unfound.is_empty() {
             let optimal_chunk_size = unfound
@@ -1958,7 +1961,7 @@ impl ExternalAnnotation {
                 .enumerate()
                 .map(|(chunk_i, chunk)| {
                     debug!("Processing chunk {chunk_i}");
-                    process_vep_chunk(chunk, &header, config).map_err(|e| {
+                    process_vep_chunk(&case_id, chunk, &header, config).map_err(|e| {
                         error!("Error processing chunk {chunk_i}: {e}");
                         e
                     })
@@ -2004,7 +2007,7 @@ impl ExternalAnnotation {
                             }
                         }
 
-                        let mut vep_job = VepJob::new(&in_tmp, &out_vep, config);
+                        let mut vep_job = VepJob::new(&in_tmp, &out_vep, &case_id, config);
                         run!(config, &mut vep_job).context("Error while running VEP.")?;
 
                         let mut vep_rdr = std::io::BufReader::new(
@@ -2095,6 +2098,7 @@ impl ExternalAnnotation {
 }
 
 fn process_vep_chunk(
+    case_id: &str,
     chunk: &[VcfVariant],
     header: &str,
     config: &Config,
@@ -2124,11 +2128,21 @@ fn process_vep_chunk(
         )?;
     }
 
-    let mut vep_job = VepJob::new(&in_tmp, &out_vep, config);
-    if let Err(e) = run!(config, &mut vep_job) {
-        error!("VEP error: {e}");
-        return Err(anyhow::anyhow!("VEP execution failed: {}", e));
-    }
+    let mut vep_job = VepJob::new(&in_tmp, &out_vep, &case_id, config);
+    vep_job.run();
+    // match run!(config, &mut vep_job) {
+    //     Ok(c) => {
+    //         c.save_to_file(format!("{}/{}/log/"), config.result_dir, );
+    //     },
+    //     Err(e) => {
+    //         error!("VEP error: {e}");
+    //         return Err(anyhow::anyhow!("VEP execution failed: {}", e));
+    //     }
+    // }
+    // if let Err(e) = run!(config, &mut vep_job) {
+    //     error!("VEP error: {e}");
+    //     return Err(anyhow::anyhow!("VEP execution failed: {}", e));
+    // }
 
     let mut vep_rdr = std::io::BufReader::new(fs::File::open(&out_vep)?);
     let mut vep_line = TsvLine::new();

+ 160 - 41
src/variant/vcf_variant.rs

@@ -107,7 +107,10 @@
 //! ```
 
 use crate::{
-    annotation::Annotations,
+    annotation::{
+        dbsnp::{DbSnpFreq, DbSnpFreqEntry},
+        Annotations,
+    },
     helpers::{estimate_shannon_entropy, mean, revcomp, Hash128},
     io::fasta::sequence_range,
     pipes::ShouldRun,
@@ -826,6 +829,154 @@ impl core::fmt::Display for BNDDesc {
     }
 }
 
+// --------------------- Normalize Multiallelic
+// #[derive(Debug, Clone, PartialEq)]
+// pub enum VcfNumber {
+//     A,            // one per ALT allele
+//     R,            // one per allele (REF + ALTs)
+//     G,            // one per genotype — leave alone
+//     Dot,          // unknown — leave alone
+//     Fixed(usize), // fixed count — leave alone
+// }
+
+/// Split a multiallelic VcfVariant into N biallelic VcfVariants.
+/// Rewrites A/R-number INFO fields per allele using header metadata.
+pub fn from_multiallelic(variant: VcfVariant) -> Vec<VcfVariant> {
+    let alt_str = variant.alternative.to_string();
+    let alts: Vec<&str> = alt_str.split(',').collect();
+
+    // Already biallelic
+    if alts.len() == 1 {
+        return vec![variant];
+    }
+
+    alts.iter()
+        .enumerate()
+        .map(|(i, alt)| {
+            let mut v = variant.clone();
+
+            // Rewrite ALT
+            v.alternative = alt.parse().expect("Failed to parse split ALT");
+
+            // Rewrite INFO fields (A/R-number)
+            v.infos = rewrite_infos(&variant.infos, i);
+
+            // Rewrite FORMAT/GT
+            v.formats = rewrite_formats(&variant.formats, i);
+
+            // Recompute hash (pos + ref + new alt)
+            let mut hasher = blake3::Hasher::new();
+            hasher.update(&v.position.contig.to_ne_bytes());
+            hasher.update(&v.position.position.to_ne_bytes());
+            hasher.update(v.reference.to_string().as_bytes());
+            hasher.update(v.alternative.to_string().as_bytes());
+            let hash = hasher.finalize();
+            v.hash = Hash128::new(hash.as_bytes()[..16].try_into().unwrap());
+
+            v
+        })
+        .collect()
+}
+
+fn rewrite_infos(infos: &Infos, allele_idx: usize) -> Infos {
+    let rewritten = infos
+        .0
+        .iter()
+        .map(|info| {
+            match info {
+                Info::FREQ(DbSnpFreq(entries)) => {
+                    let new_entries = entries
+                        .iter()
+                        .map(|entry| {
+                            // R-length: [REF, ALT1, ALT2, ...] → keep REF + allele_idx-th ALT
+                            let ref_val = entry.values.first().copied().flatten();
+                            let alt_val = entry.values.get(allele_idx + 1).copied().flatten();
+                            DbSnpFreqEntry {
+                                source: entry.source.clone(),
+                                values: vec![ref_val, alt_val],
+                            }
+                        })
+                        .collect();
+                    Info::FREQ(DbSnpFreq(new_entries))
+                }
+                // All other typed Info variants: pass through unchanged
+                other => other.clone(),
+            }
+        })
+        .collect();
+
+    Infos(rewritten)
+}
+
+fn rewrite_formats(formats: &Formats, allele_idx: usize) -> Formats {
+    if formats.0.is_empty() {
+        return formats.clone();
+    }
+    // Reconstruct a fake FORMAT+sample row to reuse existing FromStr
+    let (fmt_str, sample_str): (String, String) = formats.clone().into();
+    let fmt_keys: Vec<&str> = fmt_str.split(':').collect();
+    let rewritten_sample = rewrite_sample(&sample_str, &fmt_keys, allele_idx);
+    (fmt_str.as_str(), rewritten_sample.as_str())
+        .try_into()
+        .expect("Failed to parse rewritten FORMAT")
+}
+
+/// Recode a sample's GT from multiallelic to biallelic representation.
+/// e.g. GT "0/2" with allele_idx=1 → "0/1"  (allele 2 becomes allele 1 in new row)
+///      GT "1/2" with allele_idx=1 → "0/1"  (allele 1 in original becomes REF-equivalent here)
+///      GT "2/2" with allele_idx=1 → "1/1"
+fn rewrite_sample(sample: &str, format_keys: &[&str], allele_idx: usize) -> String {
+    let values: Vec<&str> = sample.split(':').collect();
+    let mut out = Vec::with_capacity(values.len());
+
+    for (k, &key) in format_keys.iter().enumerate() {
+        let val = values.get(k).copied().unwrap_or(".");
+        if key == "GT" {
+            out.push(recode_gt(val, allele_idx));
+        } else {
+            out.push(val.to_string());
+        }
+    }
+
+    out.join(":")
+}
+
+/// Recode GT for one allele split.
+/// Original ALT allele (allele_idx + 1) → 1; all others → 0; missing → .
+fn recode_gt(gt: &str, allele_idx: usize) -> String {
+    let phased = gt.contains('|');
+    let sep = if phased { '|' } else { '/' };
+    let target = allele_idx + 1; // 1-based ALT index in original
+
+    gt.split(sep)
+        .map(|a| match a {
+            "." => ".".to_string(),
+            a => {
+                let n: usize = a.parse().unwrap_or(0);
+                if n == target {
+                    "1".to_string()
+                } else {
+                    "0".to_string()
+                }
+            }
+        })
+        .collect::<Vec<_>>()
+        .join(&sep.to_string())
+}
+
+fn pick_nth(values: &str, idx: usize) -> &str {
+    values.split(',').nth(idx).unwrap_or(".")
+}
+
+fn pick_r(values: &str, allele_idx: usize) -> String {
+    let mut it = values.split(',');
+    let ref_val = it.next().unwrap_or(".");
+    let alt_val = it.nth(allele_idx).unwrap_or(".");
+    format!("{ref_val},{alt_val}")
+}
+
+// ---------------------- VcfVariant SV graph
+
 use petgraph::graph::{DiGraph, NodeIndex};
 use petgraph::visit::{IntoNodeIdentifiers, NodeIndexable};
 use petgraph::Direction;
@@ -1321,8 +1472,8 @@ impl Infos {
 
     pub fn freq_maf(&self) -> Option<f32> {
         self.0.iter().find_map(|i| {
-            if let Info::FREQ(s) = i {
-                parse_maf_from_freq(s)
+            if let Info::FREQ(freq) = i {
+                freq.maf()
             } else {
                 None
             }
@@ -1330,42 +1481,6 @@ impl Infos {
     }
 }
 
-/// Average ALT frequency across real population sources in a dbSNP FREQ field.
-///
-/// Format: `KOREAN:0.92,0.08|TOMMO:0.94,0.06|SGDP_PRJ:0.5,0.5|...`
-/// - Index 0 = REF freq, index 1 = ALT freq
-/// - Excludes SGDP_PRJ (encodes presence as 0.5, not real frequency)
-/// - Excludes dbGaP_PopFreq (ascertainment bias from disease cohorts)
-/// - Skips sources with ALT freq == 0.0 (not observed, not absent)
-///
-/// Returns `None` if no valid sources found.
-pub fn parse_maf_from_freq(freq: &str) -> Option<f32> {
-    const EXCLUDED: &[&str] = &["SGDP_PRJ", "dbGaP_PopFreq"];
-
-    let (sum, count) = freq
-        .split('|')
-        .filter_map(|source| {
-            let (name, alleles) = source.split_once(':')?;
-            if EXCLUDED.contains(&name) {
-                return None;
-            }
-            let alt = alleles.split(',').nth(1)?.parse::<f32>().ok()?;
-            // 0.0 means not observed in this source, skip rather than pulling MAF down
-            if alt <= 0.0 {
-                None
-            } else {
-                Some(alt)
-            }
-        })
-        .fold((0.0_f32, 0usize), |(s, c), af| (s + af, c + 1));
-
-    if count == 0 {
-        None
-    } else {
-        Some(sum / count as f32)
-    }
-}
-
 /// Enum representing a single INFO field in a VCF record.
 ///
 /// Supports both standard fields and Severus-specific structural variant annotations.
@@ -1441,7 +1556,7 @@ pub enum Info {
     INSIDE_VNTR(String),
     ALINGED_POS(String),
     // dbSNP
-    FREQ(String),
+    FREQ(DbSnpFreq),
     COMMON,
     RS(u32),
 }
@@ -1524,7 +1639,11 @@ impl FromStr for Info {
                 "MATE_ID" => Info::MATE_ID(value.to_string()),
                 "INSIDE_VNTR" => Info::INSIDE_VNTR(value.to_string()),
                 "ALINGED_POS" => Info::ALINGED_POS(value.to_string()),
-                "FREQ" => Info::FREQ(value.to_string()),
+                "FREQ" => Info::FREQ(
+                    value
+                        .parse()
+                        .with_context(|| format!("Failed to parse FREQ: `{value}`"))?,
+                ),
                 "RS" => Info::RS(parse_value(value, key)?),
 
                 _ => Info::Empty,