|
@@ -0,0 +1,685 @@
|
|
|
|
|
+//! Metadata and payload models for the `.pandora` SomaticPipe output container.
|
|
|
|
|
+//!
|
|
|
|
|
+//! This module intentionally defines only the format model. The writer and reader
|
|
|
|
|
+//! implementation should be added separately so the schema can stabilize first.
|
|
|
|
|
+
|
|
|
|
|
+use std::{
|
|
|
|
|
+ collections::BTreeMap,
|
|
|
|
|
+ fs::File,
|
|
|
|
|
+ io::{Read, Seek, SeekFrom, Write},
|
|
|
|
|
+ path::Path,
|
|
|
|
|
+};
|
|
|
|
|
+
|
|
|
|
|
+use anyhow::{bail, Context};
|
|
|
|
|
+use chrono::{DateTime, Utc};
|
|
|
|
|
+use serde::de::DeserializeOwned;
|
|
|
|
|
+use serde::{Deserialize, Serialize};
|
|
|
|
|
+
|
|
|
|
|
+use crate::{
|
|
|
|
|
+ annotation::{AnnotationsStats, VepStats},
|
|
|
|
|
+ callers::savana::SavanaCN,
|
|
|
|
|
+ collection::bam_stats::WGSBamStats,
|
|
|
|
|
+ pipes::somatic::SomaticPipeStats,
|
|
|
|
|
+ variant::{variant_collection::Variants, variants_stats::VariantsStats},
|
|
|
|
|
+};
|
|
|
|
|
+
|
|
|
|
|
+pub const PANDORA_MAGIC: &[u8; 8] = b"PANDORA\0";
|
|
|
|
|
+pub const PANDORA_FORMAT_VERSION: u16 = 1;
|
|
|
|
|
+pub const PANDORA_EXTENSION: &str = "pandora";
|
|
|
|
|
+pub const PANDORA_PRELUDE_LEN: usize = 8 + 2 + 8 + 32;
|
|
|
|
|
+
|
|
|
|
|
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
|
|
|
|
+pub struct ContainerPrelude {
|
|
|
|
|
+ pub magic: [u8; 8],
|
|
|
|
|
+ pub version: u16,
|
|
|
|
|
+ pub header_len: u64,
|
|
|
|
|
+ pub header_checksum: [u8; 32],
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+impl Default for ContainerPrelude {
|
|
|
|
|
+ fn default() -> Self {
|
|
|
|
|
+ Self {
|
|
|
|
|
+ magic: *PANDORA_MAGIC,
|
|
|
|
|
+ version: PANDORA_FORMAT_VERSION,
|
|
|
|
|
+ header_len: 0,
|
|
|
|
|
+ header_checksum: [0; 32],
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+impl ContainerPrelude {
|
|
|
|
|
+ pub fn from_reader(mut reader: impl Read) -> anyhow::Result<Self> {
|
|
|
|
|
+ let mut magic = [0; 8];
|
|
|
|
|
+ reader.read_exact(&mut magic)?;
|
|
|
|
|
+
|
|
|
|
|
+ let mut version = [0; 2];
|
|
|
|
|
+ reader.read_exact(&mut version)?;
|
|
|
|
|
+
|
|
|
|
|
+ let mut header_len = [0; 8];
|
|
|
|
|
+ reader.read_exact(&mut header_len)?;
|
|
|
|
|
+
|
|
|
|
|
+ let mut header_checksum = [0; 32];
|
|
|
|
|
+ reader.read_exact(&mut header_checksum)?;
|
|
|
|
|
+
|
|
|
|
|
+ Ok(Self {
|
|
|
|
|
+ magic,
|
|
|
|
|
+ version: u16::from_be_bytes(version),
|
|
|
|
|
+ header_len: u64::from_be_bytes(header_len),
|
|
|
|
|
+ header_checksum,
|
|
|
|
|
+ })
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ pub fn write_to(&self, mut writer: impl Write) -> anyhow::Result<()> {
|
|
|
|
|
+ writer.write_all(&self.magic)?;
|
|
|
|
|
+ writer.write_all(&self.version.to_be_bytes())?;
|
|
|
|
|
+ writer.write_all(&self.header_len.to_be_bytes())?;
|
|
|
|
|
+ writer.write_all(&self.header_checksum)?;
|
|
|
|
|
+ Ok(())
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
|
|
|
|
+#[serde(rename_all = "snake_case")]
|
|
|
|
|
+pub enum SectionName {
|
|
|
|
|
+ Variants,
|
|
|
|
|
+ VariantIndex,
|
|
|
|
|
+ CopyNumber,
|
|
|
|
|
+ BamQc,
|
|
|
|
|
+ PipeQc,
|
|
|
|
|
+ Methylation,
|
|
|
|
|
+ Fingerprint,
|
|
|
|
|
+ Provenance,
|
|
|
|
|
+ PublicIndex,
|
|
|
|
|
+ Other(String),
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
|
|
|
|
+#[serde(rename_all = "snake_case")]
|
|
|
|
|
+pub enum SectionKind {
|
|
|
|
|
+ Bitcode,
|
|
|
|
|
+ MessagePack,
|
|
|
|
|
+ ArrowIpcFile,
|
|
|
|
|
+ RawBytes,
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
|
|
|
|
+#[serde(rename_all = "snake_case")]
|
|
|
|
|
+pub enum CompressionAlgorithm {
|
|
|
|
|
+ None,
|
|
|
|
|
+ Zstd,
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
|
|
|
|
+pub struct CompressionMetadata {
|
|
|
|
|
+ pub algorithm: CompressionAlgorithm,
|
|
|
|
|
+ pub level: Option<i32>,
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+impl Default for CompressionMetadata {
|
|
|
|
|
+ fn default() -> Self {
|
|
|
|
|
+ Self {
|
|
|
|
|
+ algorithm: CompressionAlgorithm::Zstd,
|
|
|
|
|
+ level: Some(3),
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
|
|
|
|
+#[serde(rename_all = "snake_case")]
|
|
|
|
|
+pub enum EncryptionAlgorithm {
|
|
|
|
|
+ None,
|
|
|
|
|
+ Aes256Gcm,
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
|
|
|
|
+#[serde(rename_all = "snake_case")]
|
|
|
|
|
+pub enum KeyDerivationAlgorithm {
|
|
|
|
|
+ None,
|
|
|
|
|
+ Argon2id,
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
|
|
|
|
+pub struct EncryptionMetadata {
|
|
|
|
|
+ pub algorithm: EncryptionAlgorithm,
|
|
|
|
|
+ pub key_derivation: KeyDerivationAlgorithm,
|
|
|
|
|
+ pub salt_b64: Option<String>,
|
|
|
|
|
+ pub aad_context: String,
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+impl Default for EncryptionMetadata {
|
|
|
|
|
+ fn default() -> Self {
|
|
|
|
|
+ Self {
|
|
|
|
|
+ algorithm: EncryptionAlgorithm::None,
|
|
|
|
|
+ key_derivation: KeyDerivationAlgorithm::None,
|
|
|
|
|
+ salt_b64: None,
|
|
|
|
|
+ aad_context: "fixed-prelude + canonical-section-descriptor".to_string(),
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
|
|
|
|
+pub struct ProducerMetadata {
|
|
|
|
|
+ pub name: String,
|
|
|
|
|
+ pub pipeline: String,
|
|
|
|
|
+ pub pipeline_version: String,
|
|
|
|
|
+ pub git_commit: Option<String>,
|
|
|
|
|
+ pub created_at: DateTime<Utc>,
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
|
|
|
|
+pub struct SampleMetadata {
|
|
|
|
|
+ pub sample_id: String,
|
|
|
|
|
+ pub tumor_timepoint: Option<String>,
|
|
|
|
|
+ pub normal_timepoint: Option<String>,
|
|
|
|
|
+ pub reference: Option<String>,
|
|
|
|
|
+ pub reference_digest: Option<String>,
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
|
|
|
|
+pub struct SectionDescriptor {
|
|
|
|
|
+ pub name: SectionName,
|
|
|
|
|
+ pub kind: SectionKind,
|
|
|
|
|
+ pub compression: CompressionMetadata,
|
|
|
|
|
+ pub encryption: EncryptionAlgorithm,
|
|
|
|
|
+ pub offset: u64,
|
|
|
|
|
+ pub length: u64,
|
|
|
|
|
+ pub nonce_b64: Option<String>,
|
|
|
|
|
+ pub tag_b64: Option<String>,
|
|
|
|
|
+ pub checksum: String,
|
|
|
|
|
+ pub schema_hash: Option<String>,
|
|
|
|
|
+ pub required: bool,
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
|
|
|
|
+pub struct ContainerHeader {
|
|
|
|
|
+ pub format: String,
|
|
|
|
|
+ pub format_version: u16,
|
|
|
|
|
+ pub producer: ProducerMetadata,
|
|
|
|
|
+ pub sample: SampleMetadata,
|
|
|
|
|
+ pub compression: CompressionMetadata,
|
|
|
|
|
+ pub encryption: EncryptionMetadata,
|
|
|
|
|
+ pub sections: Vec<SectionDescriptor>,
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+impl ContainerHeader {
|
|
|
|
|
+ pub fn section(&self, name: &SectionName) -> Option<&SectionDescriptor> {
|
|
|
|
|
+ self.sections.iter().find(|section| §ion.name == name)
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+#[derive(Debug, Clone)]
|
|
|
|
|
+pub struct PendingSection {
|
|
|
|
|
+ pub name: SectionName,
|
|
|
|
|
+ pub kind: SectionKind,
|
|
|
|
|
+ pub compression: CompressionMetadata,
|
|
|
|
|
+ pub schema_hash: Option<String>,
|
|
|
|
|
+ pub required: bool,
|
|
|
|
|
+ pub payload: Vec<u8>,
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+impl PendingSection {
|
|
|
|
|
+ pub fn new(name: SectionName, kind: SectionKind, payload: Vec<u8>) -> Self {
|
|
|
|
|
+ Self {
|
|
|
|
|
+ name,
|
|
|
|
|
+ kind,
|
|
|
|
|
+ compression: CompressionMetadata::default(),
|
|
|
|
|
+ schema_hash: None,
|
|
|
|
|
+ required: true,
|
|
|
|
|
+ payload,
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ pub fn optional(mut self) -> Self {
|
|
|
|
|
+ self.required = false;
|
|
|
|
|
+ self
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ pub fn with_schema_hash(mut self, schema_hash: impl Into<String>) -> Self {
|
|
|
|
|
+ self.schema_hash = Some(schema_hash.into());
|
|
|
|
|
+ self
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ pub fn with_compression(mut self, compression: CompressionMetadata) -> Self {
|
|
|
|
|
+ self.compression = compression;
|
|
|
|
|
+ self
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+#[derive(Debug, Clone)]
|
|
|
|
|
+pub struct DecodedSection {
|
|
|
|
|
+ pub descriptor: SectionDescriptor,
|
|
|
|
|
+ pub payload: Vec<u8>,
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+pub fn encode_header(header: &ContainerHeader) -> anyhow::Result<Vec<u8>> {
|
|
|
|
|
+ let packed = rmp_serde::to_vec_named(header)?;
|
|
|
|
|
+ zstd::bulk::compress(&packed, header.compression.level.unwrap_or(3))
|
|
|
|
|
+ .context("failed to zstd-compress .pandora header")
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+pub fn decode_header(bytes: &[u8]) -> anyhow::Result<ContainerHeader> {
|
|
|
|
|
+ let unpacked = zstd::bulk::decompress(bytes, usize::MAX)
|
|
|
|
|
+ .context("failed to decompress .pandora header")?;
|
|
|
|
|
+ rmp_serde::from_slice(&unpacked).context("failed to decode .pandora header")
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+pub fn write_container(
|
|
|
|
|
+ path: impl AsRef<Path>,
|
|
|
|
|
+ mut header: ContainerHeader,
|
|
|
|
|
+ sections: Vec<PendingSection>,
|
|
|
|
|
+) -> anyhow::Result<()> {
|
|
|
|
|
+ if header.format_version != PANDORA_FORMAT_VERSION {
|
|
|
|
|
+ bail!(
|
|
|
|
|
+ "unsupported .pandora format version for writer: {}",
|
|
|
|
|
+ header.format_version
|
|
|
|
|
+ );
|
|
|
|
|
+ }
|
|
|
|
|
+ if matches!(header.encryption.algorithm, EncryptionAlgorithm::Aes256Gcm) {
|
|
|
|
|
+ bail!("encrypted .pandora writing is not implemented yet");
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ let stored_sections = sections
|
|
|
|
|
+ .into_iter()
|
|
|
|
|
+ .map(encode_pending_section)
|
|
|
|
|
+ .collect::<anyhow::Result<Vec<_>>>()?;
|
|
|
|
|
+
|
|
|
|
|
+ let (header_bytes, descriptors) = finalize_header_sections(&header, &stored_sections)?;
|
|
|
|
|
+ header.sections = descriptors;
|
|
|
|
|
+ let header_bytes = encode_header(&header)?;
|
|
|
|
|
+ let header_checksum = blake3::hash(&header_bytes);
|
|
|
|
|
+
|
|
|
|
|
+ let prelude = ContainerPrelude {
|
|
|
|
|
+ header_len: header_bytes.len() as u64,
|
|
|
|
|
+ header_checksum: *header_checksum.as_bytes(),
|
|
|
|
|
+ ..Default::default()
|
|
|
|
|
+ };
|
|
|
|
|
+
|
|
|
|
|
+ let mut writer = File::create(path.as_ref())
|
|
|
|
|
+ .with_context(|| format!("failed to create {}", path.as_ref().display()))?;
|
|
|
|
|
+ prelude.write_to(&mut writer)?;
|
|
|
|
|
+ writer.write_all(&header_bytes)?;
|
|
|
|
|
+ for stored in stored_sections {
|
|
|
|
|
+ writer.write_all(&stored.payload)?;
|
|
|
|
|
+ }
|
|
|
|
|
+ Ok(())
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+pub fn read_header(path: impl AsRef<Path>) -> anyhow::Result<(ContainerPrelude, ContainerHeader)> {
|
|
|
|
|
+ let mut reader = File::open(path.as_ref())
|
|
|
|
|
+ .with_context(|| format!("failed to open {}", path.as_ref().display()))?;
|
|
|
|
|
+ let prelude = ContainerPrelude::from_reader(&mut reader)?;
|
|
|
|
|
+ validate_prelude(&prelude)?;
|
|
|
|
|
+
|
|
|
|
|
+ let mut header_bytes = vec![0; prelude.header_len as usize];
|
|
|
|
|
+ reader.read_exact(&mut header_bytes)?;
|
|
|
|
|
+ verify_checksum(&header_bytes, &prelude.header_checksum, "header")?;
|
|
|
|
|
+
|
|
|
|
|
+ let header = decode_header(&header_bytes)?;
|
|
|
|
|
+ validate_header(&header)?;
|
|
|
|
|
+ Ok((prelude, header))
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+pub fn read_section(
|
|
|
|
|
+ path: impl AsRef<Path>,
|
|
|
|
|
+ name: &SectionName,
|
|
|
|
|
+) -> anyhow::Result<Option<DecodedSection>> {
|
|
|
|
|
+ let (_, header) = read_header(path.as_ref())?;
|
|
|
|
|
+ let Some(descriptor) = header.section(name).cloned() else {
|
|
|
|
|
+ return Ok(None);
|
|
|
|
|
+ };
|
|
|
|
|
+
|
|
|
|
|
+ let mut reader = File::open(path.as_ref())
|
|
|
|
|
+ .with_context(|| format!("failed to open {}", path.as_ref().display()))?;
|
|
|
|
|
+ reader.seek(SeekFrom::Start(descriptor.offset))?;
|
|
|
|
|
+
|
|
|
|
|
+ let mut stored = vec![0; descriptor.length as usize];
|
|
|
|
|
+ reader.read_exact(&mut stored)?;
|
|
|
|
|
+ verify_prefixed_checksum(
|
|
|
|
|
+ &stored,
|
|
|
|
|
+ &descriptor.checksum,
|
|
|
|
|
+ &format!("{:?}", descriptor.name),
|
|
|
|
|
+ )?;
|
|
|
|
|
+
|
|
|
|
|
+ if matches!(descriptor.encryption, EncryptionAlgorithm::Aes256Gcm) {
|
|
|
|
|
+ bail!("encrypted .pandora sections are not implemented yet");
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ let payload = decode_section_payload(&stored, &descriptor.compression)?;
|
|
|
|
|
+ Ok(Some(DecodedSection {
|
|
|
|
|
+ descriptor,
|
|
|
|
|
+ payload,
|
|
|
|
|
+ }))
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+pub fn read_required_section(
|
|
|
|
|
+ path: impl AsRef<Path>,
|
|
|
|
|
+ name: &SectionName,
|
|
|
|
|
+) -> anyhow::Result<DecodedSection> {
|
|
|
|
|
+ read_section(path, name)?.with_context(|| format!("missing required section: {name:?}"))
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+pub fn variants_section(variants: &Variants) -> PendingSection {
|
|
|
|
|
+ PendingSection::new(
|
|
|
|
|
+ SectionName::Variants,
|
|
|
|
|
+ SectionKind::Bitcode,
|
|
|
|
|
+ bitcode::encode(variants),
|
|
|
|
|
+ )
|
|
|
|
|
+ .with_schema_hash(logical_schema_hash("Variants"))
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+pub fn decode_variants_section(section: &DecodedSection) -> anyhow::Result<Variants> {
|
|
|
|
|
+ ensure_section_kind(section, &SectionName::Variants, SectionKind::Bitcode)?;
|
|
|
|
|
+ bitcode::decode(§ion.payload).context("failed to decode Variants bitcode payload")
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+pub fn copy_number_section(copy_number: &SavanaCN) -> anyhow::Result<PendingSection> {
|
|
|
|
|
+ message_pack_section(
|
|
|
|
|
+ SectionName::CopyNumber,
|
|
|
|
|
+ copy_number,
|
|
|
|
|
+ logical_schema_hash("SavanaCN"),
|
|
|
|
|
+ )
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+pub fn decode_copy_number_section(section: &DecodedSection) -> anyhow::Result<SavanaCN> {
|
|
|
|
|
+ ensure_section_kind(section, &SectionName::CopyNumber, SectionKind::MessagePack)?;
|
|
|
|
|
+ decode_message_pack_payload(§ion.payload, "SavanaCN")
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+pub fn bam_qc_section(payload: &BamQcPayload) -> anyhow::Result<PendingSection> {
|
|
|
|
|
+ message_pack_section(
|
|
|
|
|
+ SectionName::BamQc,
|
|
|
|
|
+ payload,
|
|
|
|
|
+ logical_schema_hash("BamQcPayload"),
|
|
|
|
|
+ )
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+pub fn decode_bam_qc_section(section: &DecodedSection) -> anyhow::Result<BamQcPayload> {
|
|
|
|
|
+ ensure_section_kind(section, &SectionName::BamQc, SectionKind::MessagePack)?;
|
|
|
|
|
+ decode_message_pack_payload(§ion.payload, "BamQcPayload")
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+pub fn pipe_qc_section(payload: &PipeQcPayload) -> anyhow::Result<PendingSection> {
|
|
|
|
|
+ message_pack_section(
|
|
|
|
|
+ SectionName::PipeQc,
|
|
|
|
|
+ payload,
|
|
|
|
|
+ logical_schema_hash("PipeQcPayload"),
|
|
|
|
|
+ )
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+pub fn decode_pipe_qc_section(section: &DecodedSection) -> anyhow::Result<PipeQcPayload> {
|
|
|
|
|
+ ensure_section_kind(section, &SectionName::PipeQc, SectionKind::MessagePack)?;
|
|
|
|
|
+ decode_message_pack_payload(§ion.payload, "PipeQcPayload")
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+pub fn provenance_section(payload: &ProvenancePayload) -> anyhow::Result<PendingSection> {
|
|
|
|
|
+ message_pack_section(
|
|
|
|
|
+ SectionName::Provenance,
|
|
|
|
|
+ payload,
|
|
|
|
|
+ logical_schema_hash("ProvenancePayload"),
|
|
|
|
|
+ )
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+pub fn decode_provenance_section(section: &DecodedSection) -> anyhow::Result<ProvenancePayload> {
|
|
|
|
|
+ ensure_section_kind(section, &SectionName::Provenance, SectionKind::MessagePack)?;
|
|
|
|
|
+ decode_message_pack_payload(§ion.payload, "ProvenancePayload")
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+fn message_pack_section<T: Serialize>(
|
|
|
|
|
+ name: SectionName,
|
|
|
|
|
+ payload: &T,
|
|
|
|
|
+ schema_hash: String,
|
|
|
|
|
+) -> anyhow::Result<PendingSection> {
|
|
|
|
|
+ let encoded = rmp_serde::to_vec_named(payload)
|
|
|
|
|
+ .with_context(|| format!("failed to encode {name:?} MessagePack payload"))?;
|
|
|
|
|
+ Ok(PendingSection::new(name, SectionKind::MessagePack, encoded).with_schema_hash(schema_hash))
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+fn decode_message_pack_payload<T: DeserializeOwned>(
|
|
|
|
|
+ bytes: &[u8],
|
|
|
|
|
+ label: &str,
|
|
|
|
|
+) -> anyhow::Result<T> {
|
|
|
|
|
+ rmp_serde::from_slice(bytes)
|
|
|
|
|
+ .with_context(|| format!("failed to decode {label} MessagePack payload"))
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+fn ensure_section_kind(
|
|
|
|
|
+ section: &DecodedSection,
|
|
|
|
|
+ expected_name: &SectionName,
|
|
|
|
|
+ expected_kind: SectionKind,
|
|
|
|
|
+) -> anyhow::Result<()> {
|
|
|
|
|
+ if §ion.descriptor.name != expected_name {
|
|
|
|
|
+ bail!(
|
|
|
|
|
+ "expected section {expected_name:?}, found {:?}",
|
|
|
|
|
+ section.descriptor.name
|
|
|
|
|
+ );
|
|
|
|
|
+ }
|
|
|
|
|
+ if section.descriptor.kind != expected_kind {
|
|
|
|
|
+ bail!(
|
|
|
|
|
+ "expected section kind {expected_kind:?}, found {:?}",
|
|
|
|
|
+ section.descriptor.kind
|
|
|
|
|
+ );
|
|
|
|
|
+ }
|
|
|
|
|
+ Ok(())
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+fn logical_schema_hash(name: &str) -> String {
|
|
|
|
|
+ format!("blake3:{}", blake3::hash(name.as_bytes()).to_hex())
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+fn validate_prelude(prelude: &ContainerPrelude) -> anyhow::Result<()> {
|
|
|
|
|
+ if &prelude.magic != PANDORA_MAGIC {
|
|
|
|
|
+ bail!("invalid .pandora magic");
|
|
|
|
|
+ }
|
|
|
|
|
+ if prelude.version != PANDORA_FORMAT_VERSION {
|
|
|
|
|
+ bail!("unsupported .pandora format version: {}", prelude.version);
|
|
|
|
|
+ }
|
|
|
|
|
+ Ok(())
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+fn validate_header(header: &ContainerHeader) -> anyhow::Result<()> {
|
|
|
|
|
+ if header.format != "somaticpipe.output" {
|
|
|
|
|
+ bail!("invalid .pandora header format: {}", header.format);
|
|
|
|
|
+ }
|
|
|
|
|
+ if header.format_version != PANDORA_FORMAT_VERSION {
|
|
|
|
|
+ bail!(
|
|
|
|
|
+ "unsupported .pandora header version: {}",
|
|
|
|
|
+ header.format_version
|
|
|
|
|
+ );
|
|
|
|
|
+ }
|
|
|
|
|
+ Ok(())
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+fn verify_checksum(bytes: &[u8], expected: &[u8; 32], label: &str) -> anyhow::Result<()> {
|
|
|
|
|
+ let actual = blake3::hash(bytes);
|
|
|
|
|
+ if actual.as_bytes() != expected {
|
|
|
|
|
+ bail!("{label} checksum mismatch");
|
|
|
|
|
+ }
|
|
|
|
|
+ Ok(())
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+fn verify_prefixed_checksum(bytes: &[u8], expected: &str, label: &str) -> anyhow::Result<()> {
|
|
|
|
|
+ let Some(hex_digest) = expected.strip_prefix("blake3:") else {
|
|
|
|
|
+ bail!("{label} checksum does not use blake3 prefix");
|
|
|
|
|
+ };
|
|
|
|
|
+ let actual = blake3::hash(bytes).to_hex().to_string();
|
|
|
|
|
+ if actual != hex_digest {
|
|
|
|
|
+ bail!("{label} checksum mismatch");
|
|
|
|
|
+ }
|
|
|
|
|
+ Ok(())
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+#[derive(Debug, Clone)]
|
|
|
|
|
+struct StoredSection {
|
|
|
|
|
+ descriptor: SectionDescriptor,
|
|
|
|
|
+ payload: Vec<u8>,
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+fn encode_pending_section(section: PendingSection) -> anyhow::Result<StoredSection> {
|
|
|
|
|
+ let payload = encode_section_payload(§ion.payload, §ion.compression)?;
|
|
|
|
|
+ let checksum = format!("blake3:{}", blake3::hash(&payload).to_hex());
|
|
|
|
|
+
|
|
|
|
|
+ Ok(StoredSection {
|
|
|
|
|
+ descriptor: SectionDescriptor {
|
|
|
|
|
+ name: section.name,
|
|
|
|
|
+ kind: section.kind,
|
|
|
|
|
+ compression: section.compression,
|
|
|
|
|
+ encryption: EncryptionAlgorithm::None,
|
|
|
|
|
+ offset: 0,
|
|
|
|
|
+ length: payload.len() as u64,
|
|
|
|
|
+ nonce_b64: None,
|
|
|
|
|
+ tag_b64: None,
|
|
|
|
|
+ checksum,
|
|
|
|
|
+ schema_hash: section.schema_hash,
|
|
|
|
|
+ required: section.required,
|
|
|
|
|
+ },
|
|
|
|
|
+ payload,
|
|
|
|
|
+ })
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+fn encode_section_payload(
|
|
|
|
|
+ bytes: &[u8],
|
|
|
|
|
+ compression: &CompressionMetadata,
|
|
|
|
|
+) -> anyhow::Result<Vec<u8>> {
|
|
|
|
|
+ match compression.algorithm {
|
|
|
|
|
+ CompressionAlgorithm::None => Ok(bytes.to_vec()),
|
|
|
|
|
+ CompressionAlgorithm::Zstd => zstd::bulk::compress(bytes, compression.level.unwrap_or(3))
|
|
|
|
|
+ .context("failed to zstd-compress .pandora section"),
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+fn decode_section_payload(
|
|
|
|
|
+ bytes: &[u8],
|
|
|
|
|
+ compression: &CompressionMetadata,
|
|
|
|
|
+) -> anyhow::Result<Vec<u8>> {
|
|
|
|
|
+ match compression.algorithm {
|
|
|
|
|
+ CompressionAlgorithm::None => Ok(bytes.to_vec()),
|
|
|
|
|
+ CompressionAlgorithm::Zstd => zstd::bulk::decompress(bytes, usize::MAX)
|
|
|
|
|
+ .context("failed to zstd-decompress .pandora section"),
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+fn finalize_header_sections(
|
|
|
|
|
+ header: &ContainerHeader,
|
|
|
|
|
+ stored_sections: &[StoredSection],
|
|
|
|
|
+) -> anyhow::Result<(Vec<u8>, Vec<SectionDescriptor>)> {
|
|
|
|
|
+ let mut descriptors: Vec<SectionDescriptor> = stored_sections
|
|
|
|
|
+ .iter()
|
|
|
|
|
+ .map(|stored| stored.descriptor.clone())
|
|
|
|
|
+ .collect();
|
|
|
|
|
+
|
|
|
|
|
+ let mut previous_header_len = None;
|
|
|
|
|
+ for _ in 0..16 {
|
|
|
|
|
+ let mut probe = header.clone();
|
|
|
|
|
+ probe.sections = descriptors.clone();
|
|
|
|
|
+ let header_bytes = encode_header(&probe)?;
|
|
|
|
|
+ let mut offset = (PANDORA_PRELUDE_LEN + header_bytes.len()) as u64;
|
|
|
|
|
+
|
|
|
|
|
+ for (descriptor, stored) in descriptors.iter_mut().zip(stored_sections) {
|
|
|
|
|
+ descriptor.offset = offset;
|
|
|
|
|
+ descriptor.length = stored.payload.len() as u64;
|
|
|
|
|
+ offset += descriptor.length;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if previous_header_len == Some(header_bytes.len()) {
|
|
|
|
|
+ let mut final_header = header.clone();
|
|
|
|
|
+ final_header.sections = descriptors.clone();
|
|
|
|
|
+ return Ok((encode_header(&final_header)?, descriptors));
|
|
|
|
|
+ }
|
|
|
|
|
+ previous_header_len = Some(header_bytes.len());
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ bail!("failed to stabilize .pandora header length after offset assignment")
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
|
|
|
+pub struct BamQcPayload {
|
|
|
|
|
+ pub by_role: BTreeMap<String, WGSBamStats>,
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+#[derive(Debug, Serialize, Deserialize)]
|
|
|
|
|
+pub struct PipeQcPayload {
|
|
|
|
|
+ pub somatic_pipe_stats: SomaticPipeStats,
|
|
|
|
|
+ pub variant_stats: Option<VariantsStats>,
|
|
|
|
|
+ pub annotation_stats: BTreeMap<String, AnnotationsStats>,
|
|
|
|
|
+ pub vep_stats: Option<VepStats>,
|
|
|
|
|
+ pub caller_outputs: Vec<CallerOutputSummary>,
|
|
|
|
|
+ pub filter_steps: Vec<FilterStepSummary>,
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
|
|
|
|
+pub struct CallerOutputSummary {
|
|
|
|
|
+ pub caller: String,
|
|
|
|
|
+ pub n_input: usize,
|
|
|
|
|
+ pub n_after_filters: Option<usize>,
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
|
|
|
|
+pub struct FilterStepSummary {
|
|
|
|
|
+ pub name: String,
|
|
|
|
|
+ pub removed: usize,
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
|
|
|
|
+pub struct ProvenancePayload {
|
|
|
|
|
+ pub config_digest: Option<String>,
|
|
|
|
|
+ pub input_digests: BTreeMap<String, String>,
|
|
|
|
|
+ pub tool_versions: BTreeMap<String, String>,
|
|
|
|
|
+ pub command_lines: Vec<String>,
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+#[cfg(test)]
|
|
|
|
|
+mod tests {
|
|
|
|
|
+ use super::*;
|
|
|
|
|
+
|
|
|
|
|
+ fn test_header() -> ContainerHeader {
|
|
|
|
|
+ ContainerHeader {
|
|
|
|
|
+ format: "somaticpipe.output".to_string(),
|
|
|
|
|
+ format_version: PANDORA_FORMAT_VERSION,
|
|
|
|
|
+ producer: ProducerMetadata {
|
|
|
|
|
+ name: "pandora_lib_promethion".to_string(),
|
|
|
|
|
+ pipeline: "SomaticPipe".to_string(),
|
|
|
|
|
+ pipeline_version: "0.1.0".to_string(),
|
|
|
|
|
+ git_commit: None,
|
|
|
|
|
+ created_at: Utc::now(),
|
|
|
|
|
+ },
|
|
|
|
|
+ sample: SampleMetadata {
|
|
|
|
|
+ sample_id: "sample_001".to_string(),
|
|
|
|
|
+ tumor_timepoint: Some("diag".to_string()),
|
|
|
|
|
+ normal_timepoint: Some("constit".to_string()),
|
|
|
|
|
+ reference: Some("hs1".to_string()),
|
|
|
|
|
+ reference_digest: None,
|
|
|
|
|
+ },
|
|
|
|
|
+ compression: CompressionMetadata::default(),
|
|
|
|
|
+ encryption: EncryptionMetadata::default(),
|
|
|
|
|
+ sections: Vec::new(),
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ #[test]
|
|
|
|
|
+ fn writes_and_reads_raw_section() -> anyhow::Result<()> {
|
|
|
|
|
+ let path = std::env::temp_dir().join(format!("{}.pandora", uuid::Uuid::new_v4()));
|
|
|
|
|
+ let payload = b"hello pandora".to_vec();
|
|
|
|
|
+ let section = PendingSection::new(
|
|
|
|
|
+ SectionName::Provenance,
|
|
|
|
|
+ SectionKind::RawBytes,
|
|
|
|
|
+ payload.clone(),
|
|
|
|
|
+ );
|
|
|
|
|
+
|
|
|
|
|
+ write_container(&path, test_header(), vec![section])?;
|
|
|
|
|
+ let read = read_required_section(&path, &SectionName::Provenance)?;
|
|
|
|
|
+
|
|
|
|
|
+ assert_eq!(read.payload, payload);
|
|
|
|
|
+ assert_eq!(read.descriptor.name, SectionName::Provenance);
|
|
|
|
|
+
|
|
|
|
|
+ std::fs::remove_file(path)?;
|
|
|
|
|
+ Ok(())
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ #[test]
|
|
|
|
|
+ fn rejects_bad_magic() -> anyhow::Result<()> {
|
|
|
|
|
+ let bytes = [0u8; PANDORA_PRELUDE_LEN];
|
|
|
|
|
+ let prelude = ContainerPrelude::from_reader(&bytes[..])?;
|
|
|
|
|
+ assert!(validate_prelude(&prelude).is_err());
|
|
|
|
|
+ Ok(())
|
|
|
|
|
+ }
|
|
|
|
|
+}
|