瀏覽代碼

Add block-compressed Pandora variant index

STEIMLE Thomas 2 周之前
父節點
當前提交
52d9e6486b
共有 2 個文件被更改,包括 834 次插入9 次删除
  1. 827 6
      src/io/somaticpipe_container.rs
  2. 7 3
      src/pipes/somatic.rs

+ 827 - 6
src/io/somaticpipe_container.rs

@@ -8,25 +8,38 @@ use std::{
     fs::File,
     io::{Cursor, Read, Seek, SeekFrom, Write},
     path::{Path, PathBuf},
+    sync::Arc,
 };
 
 use anyhow::{bail, Context};
+use arrow::{
+    array::{
+        ArrayRef, BooleanArray, Float32Array, RecordBatch, StringArray, UInt32Array, UInt64Array,
+        UInt8Array,
+    },
+    datatypes::{DataType, Field, Schema},
+    ipc::writer::FileWriter as ArrowFileWriter,
+};
 use chrono::{DateTime, Utc};
 use serde::de::DeserializeOwned;
 use serde::{Deserialize, Serialize};
 
 use crate::{
-    annotation::{AnnotationsStats, VepStats},
+    annotation::{Annotation, AnnotationsStats, VepStats},
     callers::savana::SavanaCN,
     collection::bam_stats::WGSBamStats,
     pipes::somatic::SomaticPipeStats,
-    variant::{variant_collection::Variants, variants_stats::VariantsStats},
+    variant::{
+        variant_collection::{Variant, Variants},
+        variants_stats::VariantsStats,
+    },
 };
 
 pub const PANDORA_MAGIC: &[u8; 8] = b"PANDORA\0";
 pub const PANDORA_FORMAT_VERSION: u16 = 1;
 pub const PANDORA_EXTENSION: &str = "pandora";
 pub const PANDORA_PRELUDE_LEN: usize = 8 + 2 + 8 + 32;
+pub const VARIANT_RECORD_BLOCK_TARGET_UNCOMPRESSED_SIZE: usize = 4 * 1024 * 1024;
 
 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
 pub struct ContainerPrelude {
@@ -82,6 +95,7 @@ impl ContainerPrelude {
 #[serde(rename_all = "snake_case")]
 pub enum SectionName {
     Variants,
+    VariantRecords,
     VariantIndex,
     CopyNumber,
     BamQc,
@@ -251,6 +265,16 @@ pub struct DecodedSection {
     pub payload: Vec<u8>,
 }
 
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
+pub struct VariantRecordPointer {
+    pub variant_idx: u64,
+    pub block_offset: u64,
+    pub block_length: u64,
+    pub block_uncompressed_length: u64,
+    pub record_offset_in_block: u64,
+    pub record_length_in_block: u64,
+}
+
 #[derive(Debug, Clone, Serialize, Deserialize)]
 pub struct PandoraSummary {
     pub prelude: ContainerPrelude,
@@ -268,6 +292,51 @@ pub struct PandoraSectionSummary {
     pub required: bool,
 }
 
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct PandoraManifest {
+    pub path: PathBuf,
+    pub sample_id: String,
+    pub pipeline: String,
+    pub pipeline_version: String,
+    pub created_at: DateTime<Utc>,
+    pub format_version: u16,
+    pub valid: bool,
+    pub warnings: Vec<String>,
+    pub errors: Vec<String>,
+    pub sections: Vec<PandoraManifestSection>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
+pub struct PandoraManifestSection {
+    pub name: SectionName,
+    pub kind: SectionKind,
+    pub offset: u64,
+    pub length: u64,
+    pub checksum: String,
+    pub required: bool,
+    pub readable: Option<bool>,
+    pub checksum_valid: Option<bool>,
+    pub decompresses: Option<bool>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
+pub struct PandoraValidationReport {
+    pub path: PathBuf,
+    pub valid: bool,
+    pub errors: Vec<String>,
+    pub warnings: Vec<String>,
+    pub sections: Vec<PandoraSectionValidation>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
+pub struct PandoraSectionValidation {
+    pub name: SectionName,
+    pub readable: bool,
+    pub checksum_valid: bool,
+    pub decompresses: bool,
+    pub error: Option<String>,
+}
+
 #[derive(Debug, Clone)]
 pub struct PandoraReader {
     path: PathBuf,
@@ -323,6 +392,30 @@ impl PandoraReader {
         decode_variants_section(&section)
     }
 
+    pub fn variant_index_arrow(&self) -> anyhow::Result<Option<Vec<u8>>> {
+        self.read_section(&SectionName::VariantIndex)?
+            .map(|section| {
+                ensure_section_kind(
+                    &section,
+                    &SectionName::VariantIndex,
+                    SectionKind::ArrowIpcFile,
+                )?;
+                Ok(section.payload)
+            })
+            .transpose()
+    }
+
+    pub fn read_variant_record(&self, pointer: &VariantRecordPointer) -> anyhow::Result<Variant> {
+        read_variant_record_from_path(&self.path, &self.header, pointer)
+    }
+
+    pub fn read_variant_records(
+        &self,
+        pointers: &[VariantRecordPointer],
+    ) -> anyhow::Result<Vec<Variant>> {
+        read_variant_records_from_path(&self.path, &self.header, pointers)
+    }
+
     pub fn copy_number(&self) -> anyhow::Result<Option<SavanaCN>> {
         self.read_section(&SectionName::CopyNumber)?
             .map(|section| decode_copy_number_section(&section))
@@ -386,6 +479,101 @@ pub fn read_pandora_summary(path: impl AsRef<Path>) -> anyhow::Result<PandoraSum
     Ok(PandoraSummary::from_parts(prelude, header))
 }
 
+pub fn pandora_manifest(path: impl AsRef<Path>) -> anyhow::Result<PandoraManifest> {
+    let path = path.as_ref().to_path_buf();
+    let summary = read_pandora_summary(&path)?;
+    let validation = validate_pandora(&path);
+
+    let sections = summary
+        .sections
+        .into_iter()
+        .map(|section| {
+            let section_validation = validation
+                .sections
+                .iter()
+                .find(|validated| validated.name == section.name);
+            PandoraManifestSection {
+                name: section.name,
+                kind: section.kind,
+                offset: section.offset,
+                length: section.length,
+                checksum: section.checksum,
+                required: section.required,
+                readable: section_validation.map(|validated| validated.readable),
+                checksum_valid: section_validation.map(|validated| validated.checksum_valid),
+                decompresses: section_validation.map(|validated| validated.decompresses),
+            }
+        })
+        .collect();
+
+    Ok(PandoraManifest {
+        path,
+        sample_id: summary.header.sample.sample_id,
+        pipeline: summary.header.producer.pipeline,
+        pipeline_version: summary.header.producer.pipeline_version,
+        created_at: summary.header.producer.created_at,
+        format_version: summary.header.format_version,
+        valid: validation.valid,
+        warnings: validation.warnings,
+        errors: validation.errors,
+        sections,
+    })
+}
+
+pub fn validate_pandora(path: impl AsRef<Path>) -> PandoraValidationReport {
+    let path = path.as_ref().to_path_buf();
+    let mut report = PandoraValidationReport {
+        path: path.clone(),
+        valid: false,
+        errors: Vec::new(),
+        warnings: Vec::new(),
+        sections: Vec::new(),
+    };
+
+    let (_, header) = match read_header(&path) {
+        Ok(value) => value,
+        Err(err) => {
+            report.errors.push(err.to_string());
+            return report;
+        }
+    };
+
+    for required in [
+        SectionName::Variants,
+        SectionName::PipeQc,
+        SectionName::Provenance,
+    ] {
+        if header.section(&required).is_none() {
+            report
+                .warnings
+                .push(format!("missing canonical section: {required:?}"));
+        }
+    }
+
+    let mut reader = match File::open(&path) {
+        Ok(reader) => reader,
+        Err(err) => {
+            report
+                .errors
+                .push(format!("failed to reopen {}: {err}", path.display()));
+            return report;
+        }
+    };
+
+    for descriptor in &header.sections {
+        let validation = validate_section(&mut reader, descriptor);
+        if let Some(error) = &validation.error {
+            report
+                .errors
+                .push(format!("{:?}: {error}", validation.name));
+        }
+        report.sections.push(validation);
+    }
+
+    report.valid = report.errors.is_empty();
+    report
+}
+
 pub fn encode_header(header: &ContainerHeader) -> anyhow::Result<Vec<u8>> {
     let packed = rmp_serde::to_vec_named(header)?;
     zstd::bulk::compress(&packed, header.compression.level.unwrap_or(3))
@@ -493,6 +681,55 @@ fn read_described_section(
     })
 }
 
+fn validate_section(reader: &mut File, descriptor: &SectionDescriptor) -> PandoraSectionValidation {
+    let mut validation = PandoraSectionValidation {
+        name: descriptor.name.clone(),
+        readable: false,
+        checksum_valid: false,
+        decompresses: false,
+        error: None,
+    };
+
+    if let Err(err) = reader.seek(SeekFrom::Start(descriptor.offset)) {
+        validation.error = Some(format!("failed to seek section: {err}"));
+        return validation;
+    }
+
+    let Ok(length) = usize::try_from(descriptor.length) else {
+        validation.error = Some(format!("section length too large: {}", descriptor.length));
+        return validation;
+    };
+
+    let mut stored = vec![0; length];
+    if let Err(err) = reader.read_exact(&mut stored) {
+        validation.error = Some(format!("failed to read section payload: {err}"));
+        return validation;
+    }
+    validation.readable = true;
+
+    if let Err(err) = verify_prefixed_checksum(
+        &stored,
+        &descriptor.checksum,
+        &format!("{:?}", descriptor.name),
+    ) {
+        validation.error = Some(err.to_string());
+        return validation;
+    }
+    validation.checksum_valid = true;
+
+    if matches!(descriptor.encryption, EncryptionAlgorithm::Aes256Gcm) {
+        validation.error = Some("encrypted .pandora sections are not implemented yet".to_string());
+        return validation;
+    }
+
+    if let Err(err) = decode_section_payload(&stored, &descriptor.compression) {
+        validation.error = Some(err.to_string());
+        return validation;
+    }
+    validation.decompresses = true;
+    validation
+}
+
 pub fn read_required_section(
     path: impl AsRef<Path>,
     name: &SectionName,
@@ -509,6 +746,37 @@ pub fn variants_section(variants: &Variants) -> PendingSection {
     .with_schema_hash(logical_schema_hash("Variants"))
 }
 
+pub fn variant_records_and_index_sections(
+    variants: &Variants,
+) -> anyhow::Result<(PendingSection, PendingSection)> {
+    let (records_payload, pointers) = encode_variant_record_blocks(
+        variants,
+        VARIANT_RECORD_BLOCK_TARGET_UNCOMPRESSED_SIZE,
+        CompressionMetadata::default(),
+    )?;
+    let index_payload = encode_variant_index_arrow(variants, &pointers)?;
+
+    let records_section = PendingSection::new(
+        SectionName::VariantRecords,
+        SectionKind::RawBytes,
+        records_payload,
+    )
+    .with_compression(CompressionMetadata {
+        algorithm: CompressionAlgorithm::None,
+        level: None,
+    })
+    .with_schema_hash(logical_schema_hash("VariantRecordBlocksV1"));
+
+    let index_section = PendingSection::new(
+        SectionName::VariantIndex,
+        SectionKind::ArrowIpcFile,
+        index_payload,
+    )
+    .with_schema_hash(logical_schema_hash("VariantIndexV1"));
+
+    Ok((records_section, index_section))
+}
+
 pub fn decode_variants_section(section: &DecodedSection) -> anyhow::Result<Variants> {
     ensure_section_kind(section, &SectionName::Variants, SectionKind::Bitcode)?;
     bitcode::decode(&section.payload).context("failed to decode Variants bitcode payload")
@@ -566,6 +834,500 @@ pub fn decode_provenance_section(section: &DecodedSection) -> anyhow::Result<Pro
     decode_message_pack_payload(&section.payload, "ProvenancePayload")
 }
 
+fn encode_variant_record_blocks(
+    variants: &Variants,
+    target_uncompressed_block_size: usize,
+    block_compression: CompressionMetadata,
+) -> anyhow::Result<(Vec<u8>, Vec<VariantRecordPointer>)> {
+    let mut payload = Vec::new();
+    let mut pointers = Vec::with_capacity(variants.data.len());
+    let mut current_block = Vec::new();
+    let mut current_records: Vec<(u64, u64, u64)> = Vec::new();
+
+    for (variant_idx, variant) in variants.data.iter().enumerate() {
+        let encoded = bitcode::encode(variant);
+
+        if !current_block.is_empty()
+            && current_block.len() + encoded.len() > target_uncompressed_block_size
+        {
+            flush_variant_record_block(
+                &mut payload,
+                &mut pointers,
+                &mut current_block,
+                &mut current_records,
+                &block_compression,
+            )?;
+        }
+
+        let record_offset_in_block = current_block.len() as u64;
+        current_block.extend_from_slice(&encoded);
+        current_records.push((
+            variant_idx as u64,
+            record_offset_in_block,
+            encoded.len() as u64,
+        ));
+    }
+
+    flush_variant_record_block(
+        &mut payload,
+        &mut pointers,
+        &mut current_block,
+        &mut current_records,
+        &block_compression,
+    )?;
+
+    Ok((payload, pointers))
+}
+
+fn flush_variant_record_block(
+    payload: &mut Vec<u8>,
+    pointers: &mut Vec<VariantRecordPointer>,
+    current_block: &mut Vec<u8>,
+    current_records: &mut Vec<(u64, u64, u64)>,
+    block_compression: &CompressionMetadata,
+) -> anyhow::Result<()> {
+    if current_block.is_empty() {
+        return Ok(());
+    }
+
+    let block_offset = payload.len() as u64;
+    let block_uncompressed_length = current_block.len() as u64;
+    let encoded_block = encode_section_payload(current_block, block_compression)?;
+    let block_length = encoded_block.len() as u64;
+    payload.extend_from_slice(&encoded_block);
+
+    for (variant_idx, record_offset_in_block, record_length_in_block) in current_records.drain(..) {
+        pointers.push(VariantRecordPointer {
+            variant_idx,
+            block_offset,
+            block_length,
+            block_uncompressed_length,
+            record_offset_in_block,
+            record_length_in_block,
+        });
+    }
+
+    current_block.clear();
+    Ok(())
+}
+
+fn encode_variant_index_arrow(
+    variants: &Variants,
+    pointers: &[VariantRecordPointer],
+) -> anyhow::Result<Vec<u8>> {
+    let rows = variants
+        .data
+        .iter()
+        .zip(pointers)
+        .map(|(variant, pointer)| VariantIndexRow::from_variant(variant, pointer))
+        .collect::<Vec<_>>();
+
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("variant_idx", DataType::UInt64, false),
+        Field::new("block_offset", DataType::UInt64, false),
+        Field::new("block_length", DataType::UInt64, false),
+        Field::new("block_uncompressed_length", DataType::UInt64, false),
+        Field::new("record_offset_in_block", DataType::UInt64, false),
+        Field::new("record_length_in_block", DataType::UInt64, false),
+        Field::new("contig", DataType::Utf8, false),
+        Field::new("contig_id", DataType::UInt8, false),
+        Field::new("position", DataType::UInt32, false),
+        Field::new("reference", DataType::Utf8, false),
+        Field::new("alternative", DataType::Utf8, false),
+        Field::new("variant_hash", DataType::Utf8, false),
+        Field::new("n_callers", DataType::UInt8, false),
+        Field::new("callers", DataType::Utf8, false),
+        Field::new("variant_categories", DataType::Utf8, false),
+        Field::new("quality", DataType::Float32, true),
+        Field::new("filter", DataType::Utf8, true),
+        Field::new("gene", DataType::Utf8, true),
+        Field::new("ensembl_feature", DataType::Utf8, true),
+        Field::new("impact", DataType::Utf8, true),
+        Field::new("consequence", DataType::Utf8, true),
+        Field::new("hgvs", DataType::Utf8, true),
+        Field::new("panel", DataType::Utf8, true),
+        Field::new("has_cosmic", DataType::Boolean, false),
+        Field::new("has_gnomad", DataType::Boolean, false),
+        Field::new("is_cpg", DataType::Boolean, false),
+        Field::new("is_repeat", DataType::Boolean, false),
+        Field::new("is_vntr", DataType::Boolean, false),
+        Field::new("low_entropy", DataType::Boolean, false),
+        Field::new("low_mapq", DataType::Boolean, false),
+        Field::new("high_depth", DataType::Boolean, false),
+    ]));
+
+    let columns: Vec<ArrayRef> = vec![
+        Arc::new(UInt64Array::from(
+            rows.iter()
+                .map(|row| row.pointer.variant_idx)
+                .collect::<Vec<_>>(),
+        )),
+        Arc::new(UInt64Array::from(
+            rows.iter()
+                .map(|row| row.pointer.block_offset)
+                .collect::<Vec<_>>(),
+        )),
+        Arc::new(UInt64Array::from(
+            rows.iter()
+                .map(|row| row.pointer.block_length)
+                .collect::<Vec<_>>(),
+        )),
+        Arc::new(UInt64Array::from(
+            rows.iter()
+                .map(|row| row.pointer.block_uncompressed_length)
+                .collect::<Vec<_>>(),
+        )),
+        Arc::new(UInt64Array::from(
+            rows.iter()
+                .map(|row| row.pointer.record_offset_in_block)
+                .collect::<Vec<_>>(),
+        )),
+        Arc::new(UInt64Array::from(
+            rows.iter()
+                .map(|row| row.pointer.record_length_in_block)
+                .collect::<Vec<_>>(),
+        )),
+        Arc::new(StringArray::from(
+            rows.iter()
+                .map(|row| row.contig.as_str())
+                .collect::<Vec<_>>(),
+        )),
+        Arc::new(UInt8Array::from(
+            rows.iter().map(|row| row.contig_id).collect::<Vec<_>>(),
+        )),
+        Arc::new(UInt32Array::from(
+            rows.iter().map(|row| row.position).collect::<Vec<_>>(),
+        )),
+        Arc::new(StringArray::from(
+            rows.iter()
+                .map(|row| row.reference.as_str())
+                .collect::<Vec<_>>(),
+        )),
+        Arc::new(StringArray::from(
+            rows.iter()
+                .map(|row| row.alternative.as_str())
+                .collect::<Vec<_>>(),
+        )),
+        Arc::new(StringArray::from(
+            rows.iter()
+                .map(|row| row.variant_hash.as_str())
+                .collect::<Vec<_>>(),
+        )),
+        Arc::new(UInt8Array::from(
+            rows.iter().map(|row| row.n_callers).collect::<Vec<_>>(),
+        )),
+        Arc::new(StringArray::from(
+            rows.iter()
+                .map(|row| row.callers.as_str())
+                .collect::<Vec<_>>(),
+        )),
+        Arc::new(StringArray::from(
+            rows.iter()
+                .map(|row| row.variant_categories.as_str())
+                .collect::<Vec<_>>(),
+        )),
+        Arc::new(Float32Array::from(
+            rows.iter().map(|row| row.quality).collect::<Vec<_>>(),
+        )),
+        Arc::new(StringArray::from(
+            rows.iter()
+                .map(|row| row.filter.as_deref())
+                .collect::<Vec<_>>(),
+        )),
+        Arc::new(StringArray::from(
+            rows.iter()
+                .map(|row| row.gene.as_deref())
+                .collect::<Vec<_>>(),
+        )),
+        Arc::new(StringArray::from(
+            rows.iter()
+                .map(|row| row.ensembl_feature.as_deref())
+                .collect::<Vec<_>>(),
+        )),
+        Arc::new(StringArray::from(
+            rows.iter()
+                .map(|row| row.impact.as_deref())
+                .collect::<Vec<_>>(),
+        )),
+        Arc::new(StringArray::from(
+            rows.iter()
+                .map(|row| row.consequence.as_deref())
+                .collect::<Vec<_>>(),
+        )),
+        Arc::new(StringArray::from(
+            rows.iter()
+                .map(|row| row.hgvs.as_deref())
+                .collect::<Vec<_>>(),
+        )),
+        Arc::new(StringArray::from(
+            rows.iter()
+                .map(|row| row.panel.as_deref())
+                .collect::<Vec<_>>(),
+        )),
+        Arc::new(BooleanArray::from(
+            rows.iter().map(|row| row.has_cosmic).collect::<Vec<_>>(),
+        )),
+        Arc::new(BooleanArray::from(
+            rows.iter().map(|row| row.has_gnomad).collect::<Vec<_>>(),
+        )),
+        Arc::new(BooleanArray::from(
+            rows.iter().map(|row| row.is_cpg).collect::<Vec<_>>(),
+        )),
+        Arc::new(BooleanArray::from(
+            rows.iter().map(|row| row.is_repeat).collect::<Vec<_>>(),
+        )),
+        Arc::new(BooleanArray::from(
+            rows.iter().map(|row| row.is_vntr).collect::<Vec<_>>(),
+        )),
+        Arc::new(BooleanArray::from(
+            rows.iter().map(|row| row.low_entropy).collect::<Vec<_>>(),
+        )),
+        Arc::new(BooleanArray::from(
+            rows.iter().map(|row| row.low_mapq).collect::<Vec<_>>(),
+        )),
+        Arc::new(BooleanArray::from(
+            rows.iter().map(|row| row.high_depth).collect::<Vec<_>>(),
+        )),
+    ];
+
+    let batch = RecordBatch::try_new(schema.clone(), columns)
+        .context("failed to build variant index Arrow record batch")?;
+
+    let mut payload = Vec::new();
+    let mut writer = ArrowFileWriter::try_new(&mut payload, &schema)
+        .context("failed to create variant index Arrow IPC writer")?;
+    writer
+        .write(&batch)
+        .context("failed to write variant index Arrow IPC batch")?;
+    writer
+        .finish()
+        .context("failed to finish variant index Arrow IPC payload")?;
+    drop(writer);
+    Ok(payload)
+}
+
+fn read_variant_record_from_path(
+    path: impl AsRef<Path>,
+    header: &ContainerHeader,
+    pointer: &VariantRecordPointer,
+) -> anyhow::Result<Variant> {
+    let descriptor = header
+        .section(&SectionName::VariantRecords)
+        .context("missing variant_records section")?;
+    ensure_variant_record_pointer(descriptor, pointer)?;
+
+    let mut reader = File::open(path.as_ref())
+        .with_context(|| format!("failed to open {}", path.as_ref().display()))?;
+    let block = read_variant_record_block(&mut reader, descriptor, pointer)?;
+
+    decode_variant_record_from_block(&block, pointer)
+}
+
+fn read_variant_records_from_path(
+    path: impl AsRef<Path>,
+    header: &ContainerHeader,
+    pointers: &[VariantRecordPointer],
+) -> anyhow::Result<Vec<Variant>> {
+    let descriptor = header
+        .section(&SectionName::VariantRecords)
+        .context("missing variant_records section")?;
+    for pointer in pointers {
+        ensure_variant_record_pointer(descriptor, pointer)?;
+    }
+
+    let mut reader = File::open(path.as_ref())
+        .with_context(|| format!("failed to open {}", path.as_ref().display()))?;
+    let mut block_cache: BTreeMap<(u64, u64, u64), Vec<u8>> = BTreeMap::new();
+    let mut variants = Vec::with_capacity(pointers.len());
+
+    for pointer in pointers {
+        let block_key = (
+            pointer.block_offset,
+            pointer.block_length,
+            pointer.block_uncompressed_length,
+        );
+        if !block_cache.contains_key(&block_key) {
+            let block = read_variant_record_block(&mut reader, descriptor, pointer)?;
+            block_cache.insert(block_key, block);
+        }
+        let block = block_cache
+            .get(&block_key)
+            .context("variant record block missing from cache")?;
+        variants.push(decode_variant_record_from_block(block, pointer)?);
+    }
+
+    Ok(variants)
+}
+
+fn read_variant_record_block(
+    reader: &mut File,
+    descriptor: &SectionDescriptor,
+    pointer: &VariantRecordPointer,
+) -> anyhow::Result<Vec<u8>> {
+    reader.seek(SeekFrom::Start(descriptor.offset + pointer.block_offset))?;
+
+    let mut block = vec![0; pointer.block_length as usize];
+    reader.read_exact(&mut block)?;
+    let block = zstd::stream::decode_all(Cursor::new(block))
+        .context("failed to zstd-decompress variant record block")?;
+    if block.len() as u64 != pointer.block_uncompressed_length {
+        bail!(
+            "variant record block length mismatch: expected {}, decoded {}",
+            pointer.block_uncompressed_length,
+            block.len()
+        );
+    }
+    Ok(block)
+}
+
+fn decode_variant_record_from_block(
+    block: &[u8],
+    pointer: &VariantRecordPointer,
+) -> anyhow::Result<Variant> {
+    let start = pointer.record_offset_in_block as usize;
+    let end = start + pointer.record_length_in_block as usize;
+    let record = block
+        .get(start..end)
+        .context("variant record pointer is outside decoded block")?;
+    bitcode::decode(record).context("failed to decode Variant bitcode record")
+}
+
+fn ensure_variant_record_pointer(
+    descriptor: &SectionDescriptor,
+    pointer: &VariantRecordPointer,
+) -> anyhow::Result<()> {
+    ensure_descriptor_kind(
+        descriptor,
+        &SectionName::VariantRecords,
+        SectionKind::RawBytes,
+    )?;
+    if descriptor.compression.algorithm != CompressionAlgorithm::None {
+        bail!("variant_records section must not use whole-section compression");
+    }
+    let block_end = pointer
+        .block_offset
+        .checked_add(pointer.block_length)
+        .context("variant record block offset overflow")?;
+    if block_end > descriptor.length {
+        bail!("variant record block pointer is outside variant_records section");
+    }
+    let record_end = pointer
+        .record_offset_in_block
+        .checked_add(pointer.record_length_in_block)
+        .context("variant record offset overflow")?;
+    if record_end > pointer.block_uncompressed_length {
+        bail!("variant record pointer is outside decoded block");
+    }
+    Ok(())
+}
+
+#[derive(Debug, Clone)]
+struct VariantIndexRow {
+    pointer: VariantRecordPointer,
+    contig: String,
+    contig_id: u8,
+    position: u32,
+    reference: String,
+    alternative: String,
+    variant_hash: String,
+    n_callers: u8,
+    callers: String,
+    variant_categories: String,
+    quality: Option<f32>,
+    filter: Option<String>,
+    gene: Option<String>,
+    ensembl_feature: Option<String>,
+    impact: Option<String>,
+    consequence: Option<String>,
+    hgvs: Option<String>,
+    panel: Option<String>,
+    has_cosmic: bool,
+    has_gnomad: bool,
+    is_cpg: bool,
+    is_repeat: bool,
+    is_vntr: bool,
+    low_entropy: bool,
+    low_mapq: bool,
+    high_depth: bool,
+}
+
+impl VariantIndexRow {
+    fn from_variant(variant: &Variant, pointer: &VariantRecordPointer) -> Self {
+        let best_vep = variant.best_vep().ok();
+        let variant_categories = variant
+            .alteration_category()
+            .iter()
+            .map(ToString::to_string)
+            .collect::<Vec<_>>()
+            .join(",");
+        let first_vcf = variant.vcf_variants.first();
+        let panel = variant
+            .annotations
+            .iter()
+            .find_map(|annotation| match annotation {
+                Annotation::Panel(panel) => Some(panel.clone()),
+                _ => None,
+            });
+
+        Self {
+            pointer: pointer.clone(),
+            contig: variant.position.contig(),
+            contig_id: variant.position.contig,
+            position: variant.position.position,
+            reference: variant.reference.to_string(),
+            alternative: variant.alternative.to_string(),
+            variant_hash: hex::encode(variant.hash.to_bytes()),
+            n_callers: variant.n_callers(),
+            callers: variant.callers(),
+            variant_categories,
+            quality: first_vcf.and_then(|vcf| vcf.quality),
+            filter: first_vcf.map(|vcf| vcf.filter.to_string()),
+            gene: best_vep.as_ref().and_then(|vep| vep.gene()),
+            ensembl_feature: best_vep.as_ref().and_then(|vep| vep.feature()),
+            impact: best_vep
+                .as_ref()
+                .and_then(|vep| vep.impact().map(|impact| impact.to_string())),
+            consequence: best_vep
+                .as_ref()
+                .map(|vep| vep.consequences_str().join(",")),
+            hgvs: best_vep.as_ref().and_then(|vep| vep.hgvs()),
+            panel,
+            has_cosmic: variant
+                .annotations
+                .iter()
+                .any(|annotation| matches!(annotation, Annotation::Cosmic(_))),
+            has_gnomad: variant
+                .annotations
+                .iter()
+                .any(|annotation| matches!(annotation, Annotation::GnomAD(_))),
+            is_cpg: variant
+                .annotations
+                .iter()
+                .any(|annotation| matches!(annotation, Annotation::CpG)),
+            is_repeat: variant
+                .annotations
+                .iter()
+                .any(|annotation| matches!(annotation, Annotation::Repeat)),
+            is_vntr: variant
+                .annotations
+                .iter()
+                .any(|annotation| matches!(annotation, Annotation::VNTR)),
+            low_entropy: variant
+                .annotations
+                .iter()
+                .any(|annotation| matches!(annotation, Annotation::LowEntropy)),
+            low_mapq: variant
+                .annotations
+                .iter()
+                .any(|annotation| matches!(annotation, Annotation::LowMAPQ)),
+            high_depth: variant
+                .annotations
+                .iter()
+                .any(|annotation| matches!(annotation, Annotation::HighDepth)),
+        }
+    }
+}
+
 fn message_pack_section<T: Serialize>(
     name: SectionName,
     payload: &T,
@@ -589,16 +1351,24 @@ fn ensure_section_kind(
     expected_name: &SectionName,
     expected_kind: SectionKind,
 ) -> anyhow::Result<()> {
-    if &section.descriptor.name != expected_name {
+    ensure_descriptor_kind(&section.descriptor, expected_name, expected_kind)
+}
+
+fn ensure_descriptor_kind(
+    descriptor: &SectionDescriptor,
+    expected_name: &SectionName,
+    expected_kind: SectionKind,
+) -> anyhow::Result<()> {
+    if &descriptor.name != expected_name {
         bail!(
             "expected section {expected_name:?}, found {:?}",
-            section.descriptor.name
+            descriptor.name
         );
     }
-    if section.descriptor.kind != expected_kind {
+    if descriptor.kind != expected_kind {
         bail!(
             "expected section kind {expected_kind:?}, found {:?}",
-            section.descriptor.kind
+            descriptor.kind
         );
     }
     Ok(())
@@ -842,6 +1612,57 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn validates_container_integrity() -> anyhow::Result<()> {
+        let path = std::env::temp_dir().join(format!("{}.pandora", uuid::Uuid::new_v4()));
+        let section = PendingSection::new(
+            SectionName::Provenance,
+            SectionKind::RawBytes,
+            b"hello validation".to_vec(),
+        );
+
+        write_container(&path, test_header(), vec![section])?;
+
+        let report = validate_pandora(&path);
+        assert!(report.valid, "{:?}", report.errors);
+        assert_eq!(report.sections.len(), 1);
+        assert!(report.sections[0].readable);
+        assert!(report.sections[0].checksum_valid);
+        assert!(report.sections[0].decompresses);
+
+        std::fs::remove_file(path)?;
+        Ok(())
+    }
+
+    #[test]
+    fn validation_reports_corrupt_section() -> anyhow::Result<()> {
+        let path = std::env::temp_dir().join(format!("{}.pandora", uuid::Uuid::new_v4()));
+        let section = PendingSection::new(
+            SectionName::Provenance,
+            SectionKind::RawBytes,
+            b"hello corruption".to_vec(),
+        );
+
+        write_container(&path, test_header(), vec![section])?;
+        let (_, header) = read_header(&path)?;
+        let offset = header.sections[0].offset;
+
+        let mut file = std::fs::OpenOptions::new().write(true).open(&path)?;
+        file.seek(SeekFrom::Start(offset))?;
+        file.write_all(b"x")?;
+        drop(file);
+
+        let report = validate_pandora(&path);
+        assert!(!report.valid);
+        assert_eq!(report.sections.len(), 1);
+        assert!(report.sections[0].readable);
+        assert!(!report.sections[0].checksum_valid);
+        assert!(report.errors[0].contains("checksum mismatch"));
+
+        std::fs::remove_file(path)?;
+        Ok(())
+    }
+
     #[test]
     fn rejects_bad_magic() -> anyhow::Result<()> {
         let bytes = [0u8; PANDORA_PRELUDE_LEN];

+ 7 - 3
src/pipes/somatic.rs

@@ -33,9 +33,10 @@ use crate::{
     config::Config,
     create_should_run, init_somatic_callers,
     io::somaticpipe_container::{
-        bam_qc_section, copy_number_section, pipe_qc_section, provenance_section, variants_section,
-        BamQcPayload, CompressionMetadata, ContainerHeader, EncryptionMetadata, PipeQcPayload,
-        ProducerMetadata, ProvenancePayload, SampleMetadata,
+        bam_qc_section, copy_number_section, pipe_qc_section, provenance_section,
+        variant_records_and_index_sections, variants_section, BamQcPayload, CompressionMetadata,
+        ContainerHeader, EncryptionMetadata, PipeQcPayload, ProducerMetadata, ProvenancePayload,
+        SampleMetadata,
     },
     runners::Run,
     variant::{
@@ -575,6 +576,9 @@ fn write_pandora_output(
 ) -> anyhow::Result<()> {
     let mut sections = Vec::new();
     sections.push(variants_section(variants));
+    let (variant_records, variant_index) = variant_records_and_index_sections(variants)?;
+    sections.push(variant_records);
+    sections.push(variant_index);
 
     match SavanaCN::parse_file(id, config) {
         Ok(copy_number) => sections.push(copy_number_section(&copy_number)?.optional()),