|
@@ -2,12 +2,13 @@ use std::{
|
|
|
collections::{HashMap, HashSet},
|
|
collections::{HashMap, HashSet},
|
|
|
fs::{self, File},
|
|
fs::{self, File},
|
|
|
io::Write,
|
|
io::Write,
|
|
|
|
|
+ path::Path,
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
use anyhow::Context;
|
|
use anyhow::Context;
|
|
|
use bgzip::{BGZFReader, BGZFWriter};
|
|
use bgzip::{BGZFReader, BGZFWriter};
|
|
|
use csv::ReaderBuilder;
|
|
use csv::ReaderBuilder;
|
|
|
-use log::{debug, info, warn};
|
|
|
|
|
|
|
+use log::{debug, error, info, warn};
|
|
|
use rayon::prelude::*;
|
|
use rayon::prelude::*;
|
|
|
use serde::{Deserialize, Serialize};
|
|
use serde::{Deserialize, Serialize};
|
|
|
use uuid::Uuid;
|
|
use uuid::Uuid;
|
|
@@ -207,6 +208,21 @@ impl PartialEq for Variant {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+impl Variant {
|
|
|
|
|
+ pub fn vep(&self) -> Vec<VEP> {
|
|
|
|
|
+ self.annotations
|
|
|
|
|
+ .iter()
|
|
|
|
|
+ .flat_map(|a| {
|
|
|
|
|
+ if let Annotation::VEP(v) = a {
|
|
|
|
|
+ v.to_vec()
|
|
|
|
|
+ } else {
|
|
|
|
|
+ vec![]
|
|
|
|
|
+ }
|
|
|
|
|
+ })
|
|
|
|
|
+ .collect()
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
#[derive(Debug, Default, Serialize, Deserialize)]
|
|
#[derive(Debug, Default, Serialize, Deserialize)]
|
|
|
pub struct Variants {
|
|
pub struct Variants {
|
|
|
pub data: Vec<Variant>,
|
|
pub data: Vec<Variant>,
|
|
@@ -572,6 +588,7 @@ impl ExternalAnnotation {
|
|
|
) -> anyhow::Result<()> {
|
|
) -> anyhow::Result<()> {
|
|
|
unfound.par_sort();
|
|
unfound.par_sort();
|
|
|
unfound.dedup();
|
|
unfound.dedup();
|
|
|
|
|
+ info!("{} variants to annotate with VEP.", unfound.len());
|
|
|
|
|
|
|
|
let header = vcf_header("/data/ref/hs1/chm13v2.0.dict")?.join("\n");
|
|
let header = vcf_header("/data/ref/hs1/chm13v2.0.dict")?.join("\n");
|
|
|
|
|
|
|
@@ -583,87 +600,23 @@ impl ExternalAnnotation {
|
|
|
let min_chunk_size = 1000;
|
|
let min_chunk_size = 1000;
|
|
|
let max_chunks = 150;
|
|
let max_chunks = 150;
|
|
|
|
|
|
|
|
- let mut results = if !unfound.is_empty() {
|
|
|
|
|
|
|
+ let mut results: Vec<(Hash128, Vec<VEP>)> = if !unfound.is_empty() {
|
|
|
let optimal_chunk_size = unfound.len().div_ceil(max_chunks as usize);
|
|
let optimal_chunk_size = unfound.len().div_ceil(max_chunks as usize);
|
|
|
let optimal_chunk_size = optimal_chunk_size.max(min_chunk_size);
|
|
let optimal_chunk_size = optimal_chunk_size.max(min_chunk_size);
|
|
|
|
|
|
|
|
|
|
+ debug!("{} chunks to process.", unfound.len() / optimal_chunk_size);
|
|
|
unfound
|
|
unfound
|
|
|
.par_chunks(optimal_chunk_size)
|
|
.par_chunks(optimal_chunk_size)
|
|
|
- .flat_map(|chunk| -> anyhow::Result<Vec<_>> {
|
|
|
|
|
- let in_tmp = temp_file_path("vcf")?.to_str().unwrap().to_string();
|
|
|
|
|
- let out_vep = temp_file_path("_vep.txt")?.to_str().unwrap().to_string();
|
|
|
|
|
- let out_summary = format!("{out_vep}_summary.html");
|
|
|
|
|
- let out_warnings = format!("{out_vep}_warnings.txt");
|
|
|
|
|
-
|
|
|
|
|
- // Write input VCF
|
|
|
|
|
- let mut vcf = File::create(&in_tmp)?;
|
|
|
|
|
- writeln!(vcf, "{}", header)?;
|
|
|
|
|
- for (i, row) in chunk.iter().enumerate() {
|
|
|
|
|
- writeln!(
|
|
|
|
|
- vcf,
|
|
|
|
|
- "{}\t{}\t{}\t{}\t{}\t.\tPASS\t.\t.\t.",
|
|
|
|
|
- row.position.contig(),
|
|
|
|
|
- row.position.position + 1, // vcf
|
|
|
|
|
- i + 1,
|
|
|
|
|
- row.reference,
|
|
|
|
|
- row.alternative
|
|
|
|
|
- )?;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- run_vep(&in_tmp, &out_vep)?;
|
|
|
|
|
-
|
|
|
|
|
- let mut reader_vep = ReaderBuilder::new()
|
|
|
|
|
- .delimiter(b'\t')
|
|
|
|
|
- .has_headers(false)
|
|
|
|
|
- .comment(Some(b'#'))
|
|
|
|
|
- .flexible(true)
|
|
|
|
|
- .from_reader(fs::File::open(out_vep.clone())?);
|
|
|
|
|
-
|
|
|
|
|
- let mut lines: HashMap<u64, Vec<VepLine>> = HashMap::new();
|
|
|
|
|
- for line in reader_vep.deserialize() {
|
|
|
|
|
- let line: VepLine = line.context("Failed to deserialize VepLine")?;
|
|
|
|
|
- let key = line
|
|
|
|
|
- .uploaded_variation
|
|
|
|
|
- .parse::<u64>()
|
|
|
|
|
- .context("Failed to parse uploaded_variation as u64")?;
|
|
|
|
|
-
|
|
|
|
|
- lines.entry(key).or_default().push(line);
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- fs::remove_file(in_tmp)?;
|
|
|
|
|
- fs::remove_file(out_vep)?;
|
|
|
|
|
-
|
|
|
|
|
- let mut n_not_vep = 0;
|
|
|
|
|
- let mut chunk_results: Vec<(Hash128, Vec<VEP>)> = Vec::new();
|
|
|
|
|
-
|
|
|
|
|
- chunk.iter().enumerate().for_each(|(i, entry)| {
|
|
|
|
|
- let k = (i + 1) as u64;
|
|
|
|
|
-
|
|
|
|
|
- if let Some(vep_lines) = lines.get(&k) {
|
|
|
|
|
- if let Ok(veps) = vep_lines.iter().map(VEP::try_from).collect() {
|
|
|
|
|
- chunk_results.push((entry.hash(), veps));
|
|
|
|
|
- }
|
|
|
|
|
- } else {
|
|
|
|
|
- warn!(
|
|
|
|
|
- "No VEP entry for {}:{}>{}",
|
|
|
|
|
- entry.position.to_string(),
|
|
|
|
|
- entry.reference.to_string(),
|
|
|
|
|
- entry.alternative.to_string()
|
|
|
|
|
- );
|
|
|
|
|
- n_not_vep += 1;
|
|
|
|
|
- }
|
|
|
|
|
- });
|
|
|
|
|
-
|
|
|
|
|
- if n_not_vep > 0 {
|
|
|
|
|
- debug!("{n_not_vep} variants not annotated by VEP.");
|
|
|
|
|
- let warnings = fs::read_to_string(&out_warnings)
|
|
|
|
|
- .context(format!("Can't read VEP warnings: {out_warnings}"))?;
|
|
|
|
|
- warn!("VEP warnings:\n{warnings}");
|
|
|
|
|
- }
|
|
|
|
|
- fs::remove_file(out_warnings)?;
|
|
|
|
|
- fs::remove_file(out_summary)?;
|
|
|
|
|
- Ok(chunk_results)
|
|
|
|
|
|
|
+ .enumerate()
|
|
|
|
|
+ .map(|(chunk_i, chunk)| {
|
|
|
|
|
+ debug!("Processing chunk {chunk_i}");
|
|
|
|
|
+ process_vep_chunk(chunk, &header).map_err(|e| {
|
|
|
|
|
+ error!("Error processing chunk {chunk_i}: {e}");
|
|
|
|
|
+ e
|
|
|
|
|
+ })
|
|
|
})
|
|
})
|
|
|
|
|
+ .collect::<Result<Vec<_>, _>>()? // Collect results into a Result<Vec<_>>
|
|
|
|
|
+ .into_iter()
|
|
|
.flatten()
|
|
.flatten()
|
|
|
.collect::<Vec<_>>()
|
|
.collect::<Vec<_>>()
|
|
|
} else {
|
|
} else {
|
|
@@ -677,13 +630,23 @@ impl ExternalAnnotation {
|
|
|
let results_sv = sv
|
|
let results_sv = sv
|
|
|
.par_chunks(optimal_chunk_size)
|
|
.par_chunks(optimal_chunk_size)
|
|
|
.flat_map(|chunk| -> anyhow::Result<Vec<_>> {
|
|
.flat_map(|chunk| -> anyhow::Result<Vec<_>> {
|
|
|
- let in_tmp = temp_file_path(".vcf")?.to_str().unwrap().to_string();
|
|
|
|
|
- let out_vep = temp_file_path("_vep.txt")?.to_str().unwrap().to_string();
|
|
|
|
|
|
|
+ let in_tmp = temp_file_path(".vcf")
|
|
|
|
|
+ .context("Can't create tmp path for in tmp")?
|
|
|
|
|
+ .to_str()
|
|
|
|
|
+ .unwrap()
|
|
|
|
|
+ .to_string();
|
|
|
|
|
+ let out_vep = temp_file_path("_vep.txt")
|
|
|
|
|
+ .context("Can't create tmp path for in tmp")?
|
|
|
|
|
+ .to_str()
|
|
|
|
|
+ .unwrap()
|
|
|
|
|
+ .to_string();
|
|
|
|
|
+
|
|
|
let out_summary = format!("{out_vep}_summary.html");
|
|
let out_summary = format!("{out_vep}_summary.html");
|
|
|
let out_warnings = format!("{out_vep}_warnings.txt");
|
|
let out_warnings = format!("{out_vep}_warnings.txt");
|
|
|
|
|
|
|
|
// Write input VCF
|
|
// Write input VCF
|
|
|
- let mut vcf = File::create(&in_tmp)?;
|
|
|
|
|
|
|
+ let mut vcf =
|
|
|
|
|
+ File::create(&in_tmp).context("Can't create input vcf file for VEP.")?;
|
|
|
writeln!(vcf, "{}", header)?;
|
|
writeln!(vcf, "{}", header)?;
|
|
|
for (i, mut row) in chunk.iter().cloned().enumerate() {
|
|
for (i, mut row) in chunk.iter().cloned().enumerate() {
|
|
|
row.id = (i + 1).to_string();
|
|
row.id = (i + 1).to_string();
|
|
@@ -691,14 +654,16 @@ impl ExternalAnnotation {
|
|
|
writeln!(vcf, "{s}",)?;
|
|
writeln!(vcf, "{s}",)?;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- run_vep(&in_tmp, &out_vep)?;
|
|
|
|
|
|
|
+ run_vep(&in_tmp, &out_vep).context("Error while running VEP.")?;
|
|
|
|
|
|
|
|
let mut reader_vep = ReaderBuilder::new()
|
|
let mut reader_vep = ReaderBuilder::new()
|
|
|
.delimiter(b'\t')
|
|
.delimiter(b'\t')
|
|
|
.has_headers(false)
|
|
.has_headers(false)
|
|
|
.comment(Some(b'#'))
|
|
.comment(Some(b'#'))
|
|
|
.flexible(true)
|
|
.flexible(true)
|
|
|
- .from_reader(fs::File::open(out_vep.clone())?);
|
|
|
|
|
|
|
+ .from_reader(
|
|
|
|
|
+ fs::File::open(&out_vep).context("Can't open VEP result file.")?,
|
|
|
|
|
+ );
|
|
|
|
|
|
|
|
let mut lines: HashMap<u64, Vec<VepLine>> = HashMap::new();
|
|
let mut lines: HashMap<u64, Vec<VepLine>> = HashMap::new();
|
|
|
for line in reader_vep.deserialize() {
|
|
for line in reader_vep.deserialize() {
|
|
@@ -711,8 +676,7 @@ impl ExternalAnnotation {
|
|
|
lines.entry(key).or_default().push(line);
|
|
lines.entry(key).or_default().push(line);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- fs::remove_file(in_tmp)?;
|
|
|
|
|
- fs::remove_file(out_vep)?;
|
|
|
|
|
|
|
+ // fs::remove_file(&in_tmp).context(format!("Can't remove file {in_tmp}"))?;
|
|
|
|
|
|
|
|
let mut n_not_vep = 0;
|
|
let mut n_not_vep = 0;
|
|
|
let mut chunk_results: Vec<(Hash128, Vec<VEP>)> = Vec::new();
|
|
let mut chunk_results: Vec<(Hash128, Vec<VEP>)> = Vec::new();
|
|
@@ -734,6 +698,7 @@ impl ExternalAnnotation {
|
|
|
n_not_vep += 1;
|
|
n_not_vep += 1;
|
|
|
}
|
|
}
|
|
|
});
|
|
});
|
|
|
|
|
+ // fs::remove_file(&out_vep).context(format!("Can't remove file {out_vep}"))?;
|
|
|
|
|
|
|
|
if n_not_vep > 0 {
|
|
if n_not_vep > 0 {
|
|
|
debug!("{n_not_vep} variants not annotated by VEP.");
|
|
debug!("{n_not_vep} variants not annotated by VEP.");
|
|
@@ -741,8 +706,14 @@ impl ExternalAnnotation {
|
|
|
.context(format!("Can't read VEP warnings: {out_warnings}"))?;
|
|
.context(format!("Can't read VEP warnings: {out_warnings}"))?;
|
|
|
warn!("VEP warnings:\n{warnings}");
|
|
warn!("VEP warnings:\n{warnings}");
|
|
|
}
|
|
}
|
|
|
- fs::remove_file(out_warnings)?;
|
|
|
|
|
- fs::remove_file(out_summary)?;
|
|
|
|
|
|
|
+ if Path::new(&out_warnings).exists() {
|
|
|
|
|
+ fs::remove_file(&out_warnings)
|
|
|
|
|
+ .context(format!("Can't remove file {out_warnings}"))?;
|
|
|
|
|
+ }
|
|
|
|
|
+ if Path::new(&out_summary).exists() {
|
|
|
|
|
+ fs::remove_file(&out_summary)
|
|
|
|
|
+ .context(format!("Can't remove file {out_summary}"))?;
|
|
|
|
|
+ }
|
|
|
Ok(chunk_results)
|
|
Ok(chunk_results)
|
|
|
})
|
|
})
|
|
|
.flatten()
|
|
.flatten()
|
|
@@ -750,6 +721,7 @@ impl ExternalAnnotation {
|
|
|
|
|
|
|
|
results.extend(results_sv);
|
|
results.extend(results_sv);
|
|
|
}
|
|
}
|
|
|
|
|
+ info!("{} total variants annotaded by VEP.", results.len());
|
|
|
|
|
|
|
|
for (hash, veps) in results {
|
|
for (hash, veps) in results {
|
|
|
// self.update_database(hash, "vep", &serde_json::to_vec(&veps)?)?;
|
|
// self.update_database(hash, "vep", &serde_json::to_vec(&veps)?)?;
|
|
@@ -774,3 +746,97 @@ impl ExternalAnnotation {
|
|
|
Ok(())
|
|
Ok(())
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+fn process_vep_chunk(
|
|
|
|
|
+ chunk: &[VcfVariant],
|
|
|
|
|
+ header: &str,
|
|
|
|
|
+) -> anyhow::Result<Vec<(Hash128, Vec<VEP>)>> {
|
|
|
|
|
+ let in_tmp = temp_file_path("vcf")?
|
|
|
|
|
+ .to_str()
|
|
|
|
|
+ .ok_or_else(|| anyhow::anyhow!("Failed to convert temp file path to string"))?
|
|
|
|
|
+ .to_string();
|
|
|
|
|
+ let out_vep = temp_file_path("_vep.txt")?
|
|
|
|
|
+ .to_str()
|
|
|
|
|
+ .ok_or_else(|| anyhow::anyhow!("Failed to convert temp file path to string"))?
|
|
|
|
|
+ .to_string();
|
|
|
|
|
+
|
|
|
|
|
+ let out_summary = format!("{out_vep}_summary.html");
|
|
|
|
|
+ let out_warnings = format!("{out_vep}_warnings.txt");
|
|
|
|
|
+
|
|
|
|
|
+ let mut vcf = File::create(&in_tmp)?; // If this fails, the error is propagated.
|
|
|
|
|
+ writeln!(vcf, "{}", header)?; // If this fails, the error is propagated.
|
|
|
|
|
+
|
|
|
|
|
+ for (i, row) in chunk.iter().enumerate() {
|
|
|
|
|
+ writeln!(
|
|
|
|
|
+ vcf,
|
|
|
|
|
+ "{}\t{}\t{}\t{}\t{}\t.\tPASS\t.\t.\t.",
|
|
|
|
|
+ row.position.contig(),
|
|
|
|
|
+ row.position.position + 1,
|
|
|
|
|
+ i + 1,
|
|
|
|
|
+ row.reference,
|
|
|
|
|
+ row.alternative
|
|
|
|
|
+ )?;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if let Err(e) = run_vep(&in_tmp, &out_vep) {
|
|
|
|
|
+ error!("VEP error: {e}");
|
|
|
|
|
+ return Err(anyhow::anyhow!("VEP execution failed: {}", e)); // Propagate the error.
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ let mut reader_vep = ReaderBuilder::new()
|
|
|
|
|
+ .delimiter(b'\t')
|
|
|
|
|
+ .has_headers(false)
|
|
|
|
|
+ .comment(Some(b'#'))
|
|
|
|
|
+ .flexible(true)
|
|
|
|
|
+ .from_reader(fs::File::open(&out_vep)?); // If this fails, the error is propagated.
|
|
|
|
|
+
|
|
|
|
|
+ let mut lines: HashMap<u64, Vec<VepLine>> = HashMap::new();
|
|
|
|
|
+ for line in reader_vep.deserialize() {
|
|
|
|
|
+ let line: VepLine = line.context("Failed to deserialize VepLine")?; // Propagate the error.
|
|
|
|
|
+ let key = line
|
|
|
|
|
+ .uploaded_variation
|
|
|
|
|
+ .parse::<u64>()
|
|
|
|
|
+ .context("Failed to parse uploaded_variation as u64")?; // Propagate the error.
|
|
|
|
|
+ lines.entry(key).or_default().push(line);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ fs::remove_file(&in_tmp).context(format!("Can't remove file {in_tmp}"))?;
|
|
|
|
|
+
|
|
|
|
|
+ let mut n_not_vep = 0;
|
|
|
|
|
+ let mut chunk_results: Vec<(Hash128, Vec<VEP>)> = Vec::new();
|
|
|
|
|
+
|
|
|
|
|
+ chunk.iter().enumerate().for_each(|(i, entry)| {
|
|
|
|
|
+ let k = (i + 1) as u64;
|
|
|
|
|
+
|
|
|
|
|
+ if let Some(vep_lines) = lines.get(&k) {
|
|
|
|
|
+ if let Ok(veps) = vep_lines.iter().map(VEP::try_from).collect() {
|
|
|
|
|
+ chunk_results.push((entry.hash(), veps));
|
|
|
|
|
+ }
|
|
|
|
|
+ } else {
|
|
|
|
|
+ warn!(
|
|
|
|
|
+ "No VEP entry for {}\t{}\t{}",
|
|
|
|
|
+ entry.position.to_string(),
|
|
|
|
|
+ entry.reference.to_string(),
|
|
|
|
|
+ entry.alternative.to_string()
|
|
|
|
|
+ );
|
|
|
|
|
+ n_not_vep += 1;
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
+ fs::remove_file(&out_vep).context(format!("Can't remove file {out_vep}"))?;
|
|
|
|
|
+
|
|
|
|
|
+ if n_not_vep > 0 {
|
|
|
|
|
+ debug!("{n_not_vep} variants not annotated by VEP.");
|
|
|
|
|
+ let warnings = fs::read_to_string(&out_warnings)
|
|
|
|
|
+ .context(format!("Can't read VEP warnings: {out_warnings}"))?;
|
|
|
|
|
+ warn!("VEP warnings:\n{warnings}");
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if Path::new(&out_warnings).exists() {
|
|
|
|
|
+ fs::remove_file(&out_warnings).context(format!("Can't remove file {out_warnings}"))?;
|
|
|
|
|
+ }
|
|
|
|
|
+ if Path::new(&out_summary).exists() {
|
|
|
|
|
+ fs::remove_file(&out_summary).context(format!("Can't remove file {out_summary}"))?;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ Ok(chunk_results) // Return the successful result.
|
|
|
|
|
+}
|