Bläddra i källkod

refine somatic pandora output

STEIMLE Thomas 2 veckor sedan
förälder
incheckning
b28f23a056
5 ändrade filer med 368 tillägg och 28 borttagningar
  1. 0 4
      Cargo.lock
  2. 36 0
      README.md
  3. 195 18
      src/io/somaticpipe_container.rs
  4. 49 5
      src/pipes/somatic.rs
  5. 88 1
      src/variant/variant_collection.rs

+ 0 - 4
Cargo.lock

@@ -1376,8 +1376,6 @@ dependencies = [
 [[package]]
 name = "hts-sys"
 version = "2.2.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e38d7f1c121cd22aa214cb4dadd4277dc5447391eac518b899b29ba6356fbbb2"
 dependencies = [
  "bindgen",
  "bzip2-sys",
@@ -2630,8 +2628,6 @@ dependencies = [
 [[package]]
 name = "rust-htslib"
 version = "1.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f22161678c3d72e6434c5f3383325dbf88c3cacce665f0c7b4b077fc6e957ba9"
 dependencies = [
  "bio-types",
  "byteorder",

+ 36 - 0
README.md

@@ -10,6 +10,42 @@ sudo apt install cmake libclang-dev
 
 ```
 
+### Windows build
+
+Install the native dependencies with Rtools/MSYS2:
+
+```powershell
+pacman -S --needed mingw-w64-x86_64-curl mingw-w64-x86_64-sqlite3 mingw-w64-x86_64-openssl mingw-w64-x86_64-tre mingw-w64-x86_64-libiconv mingw-w64-x86_64-gettext
+```
+
+Make sure these user environment variables are set:
+
+```powershell
+[Environment]::SetEnvironmentVariable("LIBCLANG_PATH", "$env:USERPROFILE\tools\libclang-win", "User")
+[Environment]::SetEnvironmentVariable("PKG_CONFIG", "C:\rtools45\usr\bin\pkg-config.exe", "User")
+[Environment]::SetEnvironmentVariable("PKG_CONFIG_PATH", "C:\rtools45\mingw64\lib\pkgconfig", "User")
+```
+
+Also ensure the user `Path` contains:
+
+```text
+%USERPROFILE%\tools\libclang-win
+C:\rtools45\mingw64\bin
+C:\rtools45\usr\bin
+```
+
+Then open a fresh PowerShell in the repository and run:
+
+```powershell
+.\setup-patches.ps1
+cargo build
+```
+
+`setup-patches.ps1` generates the Windows-only local Cargo patch config in
+`.cargo/config.toml` and patched crate sources in `patches/`. Both are ignored
+by git. This keeps Linux builds on the registry crates while Windows builds use
+the patched `hts-sys` / `rust-htslib` sources.
+
 * minimap2 
 * (samtools)[https://www.htslib.org/download/]
 * (dorado)[https://github.com/nanoporetech/dorado]

+ 195 - 18
src/io/somaticpipe_container.rs

@@ -271,6 +271,7 @@ pub struct VariantRecordPointer {
     pub block_offset: u64,
     pub block_length: u64,
     pub block_uncompressed_length: u64,
+    pub block_compression: CompressionMetadata,
     pub record_offset_in_block: u64,
     pub record_length_in_block: u64,
 }
@@ -606,9 +607,8 @@ pub fn write_container(
         .map(encode_pending_section)
         .collect::<anyhow::Result<Vec<_>>>()?;
 
-    let (_, descriptors) = finalize_header_sections(&header, &stored_sections)?;
+    let (header_bytes, descriptors) = finalize_header_sections(&header, &stored_sections)?;
     header.sections = descriptors;
-    let header_bytes = encode_header(&header)?;
     let header_checksum = blake3::hash(&header_bytes);
 
     let prelude = ContainerPrelude {
@@ -621,6 +621,13 @@ pub fn write_container(
         .with_context(|| format!("failed to create {}", path.as_ref().display()))?;
     prelude.write_to(&mut writer)?;
     writer.write_all(&header_bytes)?;
+    if let Some(first_section) = header.sections.first() {
+        let written = (PANDORA_PRELUDE_LEN + header_bytes.len()) as u64;
+        if first_section.offset > written {
+            let padding = vec![0; (first_section.offset - written) as usize];
+            writer.write_all(&padding)?;
+        }
+    }
     for stored in stored_sections {
         writer.write_all(&stored.payload)?;
     }
@@ -639,6 +646,11 @@ pub fn read_header(path: impl AsRef<Path>) -> anyhow::Result<(ContainerPrelude,
 
     let header = decode_header(&header_bytes)?;
     validate_header(&header)?;
+    validate_section_layout(
+        &header,
+        (PANDORA_PRELUDE_LEN + header_bytes.len()) as u64,
+        reader.metadata()?.len(),
+    )?;
     Ok((prelude, header))
 }
 
@@ -902,6 +914,7 @@ fn flush_variant_record_block(
             block_offset,
             block_length,
             block_uncompressed_length,
+            block_compression: block_compression.clone(),
             record_offset_in_block,
             record_length_in_block,
         });
@@ -927,6 +940,8 @@ fn encode_variant_index_arrow(
         Field::new("block_offset", DataType::UInt64, false),
         Field::new("block_length", DataType::UInt64, false),
         Field::new("block_uncompressed_length", DataType::UInt64, false),
+        Field::new("block_compression", DataType::Utf8, false),
+        Field::new("block_compression_level", DataType::UInt8, true),
         Field::new("record_offset_in_block", DataType::UInt64, false),
         Field::new("record_length_in_block", DataType::UInt64, false),
         Field::new("contig", DataType::Utf8, false),
@@ -977,6 +992,24 @@ fn encode_variant_index_arrow(
                 .map(|row| row.pointer.block_uncompressed_length)
                 .collect::<Vec<_>>(),
         )),
+        Arc::new(StringArray::from(
+            rows.iter()
+                .map(|row| match row.pointer.block_compression.algorithm {
+                    CompressionAlgorithm::None => "none",
+                    CompressionAlgorithm::Zstd => "zstd",
+                })
+                .collect::<Vec<_>>(),
+        )),
+        Arc::new(UInt8Array::from(
+            rows.iter()
+                .map(|row| {
+                    row.pointer
+                        .block_compression
+                        .level
+                        .and_then(|level| u8::try_from(level).ok())
+                })
+                .collect::<Vec<_>>(),
+        )),
         Arc::new(UInt64Array::from(
             rows.iter()
                 .map(|row| row.pointer.record_offset_in_block)
@@ -1114,6 +1147,7 @@ fn read_variant_record_from_path(
     let descriptor = header
         .section(&SectionName::VariantRecords)
         .context("missing variant_records section")?;
+
     ensure_variant_record_pointer(descriptor, pointer)?;
 
     let mut reader = File::open(path.as_ref())
@@ -1146,10 +1180,11 @@ fn read_variant_records_from_path(
             pointer.block_length,
             pointer.block_uncompressed_length,
         );
-        if !block_cache.contains_key(&block_key) {
+        if let std::collections::btree_map::Entry::Vacant(e) = block_cache.entry(block_key) {
             let block = read_variant_record_block(&mut reader, descriptor, pointer)?;
-            block_cache.insert(block_key, block);
+            e.insert(block);
         }
+
         let block = block_cache
             .get(&block_key)
             .context("variant record block missing from cache")?;
@@ -1168,8 +1203,8 @@ fn read_variant_record_block(
 
     let mut block = vec![0; pointer.block_length as usize];
     reader.read_exact(&mut block)?;
-    let block = zstd::stream::decode_all(Cursor::new(block))
-        .context("failed to zstd-decompress variant record block")?;
+    let block = decode_section_payload(&block, &pointer.block_compression)
+        .context("failed to decode variant record block")?;
     if block.len() as u64 != pointer.block_uncompressed_length {
         bail!(
             "variant record block length mismatch: expected {}, decoded {}",
@@ -1401,6 +1436,59 @@ fn validate_header(header: &ContainerHeader) -> anyhow::Result<()> {
     Ok(())
 }
 
+fn validate_section_layout(
+    header: &ContainerHeader,
+    data_start: u64,
+    file_len: u64,
+) -> anyhow::Result<()> {
+    let mut ranges = Vec::with_capacity(header.sections.len());
+    let mut seen = Vec::with_capacity(header.sections.len());
+
+    for section in &header.sections {
+        let name = format!("{:?}", section.name);
+        if seen.iter().any(|existing| existing == &name) {
+            bail!("duplicate .pandora section: {name}");
+        }
+        seen.push(name.clone());
+
+        if section.offset < data_start {
+            bail!(
+                "{name} section starts inside .pandora prelude/header: offset {} < {}",
+                section.offset,
+                data_start
+            );
+        }
+
+        let end = section
+            .offset
+            .checked_add(section.length)
+            .with_context(|| format!("{name} section offset/length overflow"))?;
+        if end > file_len {
+            bail!(
+                "{name} section extends beyond file: end {} > {}",
+                end,
+                file_len
+            );
+        }
+        ranges.push((section.offset, end, name));
+    }
+
+    ranges.sort_by_key(|(start, _, _)| *start);
+    for pair in ranges.windows(2) {
+        let (_, previous_end, previous_name) = &pair[0];
+        let (next_start, _, next_name) = &pair[1];
+        if previous_end > next_start {
+            bail!(
+                "{previous_name} section overlaps {next_name}: end {} > start {}",
+                previous_end,
+                next_start
+            );
+        }
+    }
+
+    Ok(())
+}
+
 fn verify_checksum(bytes: &[u8], expected: &[u8; 32], label: &str) -> anyhow::Result<()> {
     let actual = blake3::hash(bytes);
     if actual.as_bytes() != expected {
@@ -1474,17 +1562,18 @@ fn finalize_header_sections(
     header: &ContainerHeader,
     stored_sections: &[StoredSection],
 ) -> anyhow::Result<(Vec<u8>, Vec<SectionDescriptor>)> {
-    let mut descriptors: Vec<SectionDescriptor> = stored_sections
+    let base_descriptors: Vec<SectionDescriptor> = stored_sections
         .iter()
         .map(|stored| stored.descriptor.clone())
         .collect();
+    let mut descriptors = base_descriptors.clone();
+    let mut probe = header.clone();
+    probe.sections = descriptors.clone();
+    let mut reserved_header_len = encode_header(&probe)?.len() + 1024;
 
-    let mut previous_header_len = None;
     for _ in 0..16 {
-        let mut probe = header.clone();
-        probe.sections = descriptors.clone();
-        let header_bytes = encode_header(&probe)?;
-        let mut offset = (PANDORA_PRELUDE_LEN + header_bytes.len()) as u64;
+        descriptors = base_descriptors.clone();
+        let mut offset = (PANDORA_PRELUDE_LEN + reserved_header_len) as u64;
 
         for (descriptor, stored) in descriptors.iter_mut().zip(stored_sections) {
             descriptor.offset = offset;
@@ -1492,15 +1581,17 @@ fn finalize_header_sections(
             offset += descriptor.length;
         }
 
-        if previous_header_len == Some(header_bytes.len()) {
-            let mut final_header = header.clone();
-            final_header.sections = descriptors.clone();
-            return Ok((encode_header(&final_header)?, descriptors));
+        let mut final_header = header.clone();
+        final_header.sections = descriptors.clone();
+        let header_bytes = encode_header(&final_header)?;
+        if header_bytes.len() <= reserved_header_len {
+            return Ok((header_bytes, descriptors));
         }
-        previous_header_len = Some(header_bytes.len());
+
+        reserved_header_len = header_bytes.len() + 1024;
     }
 
-    bail!("failed to stabilize .pandora header length after offset assignment")
+    bail!("failed to reserve enough .pandora header space after 16 passes")
 }
 
 #[derive(Debug, Clone, Serialize, Deserialize)]
@@ -1567,6 +1658,24 @@ mod tests {
         }
     }
 
+    fn test_variant(pos: u32, reference: &str, alternative: &str) -> anyhow::Result<Variant> {
+        let vcf: crate::variant::vcf_variant::VcfVariant = format!(
+            "chr1\t{}\t.\t{}\t{}\t60\tPASS\t.",
+            pos + 1,
+            reference,
+            alternative
+        )
+        .parse()?;
+        Ok(Variant {
+            hash: vcf.hash,
+            position: vcf.position.clone(),
+            reference: vcf.reference.clone(),
+            alternative: vcf.alternative.clone(),
+            vcf_variants: vec![vcf],
+            annotations: Vec::new(),
+        })
+    }
+
     #[test]
     fn writes_and_reads_raw_section() -> anyhow::Result<()> {
         let path = std::env::temp_dir().join(format!("{}.pandora", uuid::Uuid::new_v4()));
@@ -1634,6 +1743,74 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn reads_lazy_variant_records() -> anyhow::Result<()> {
+        let path = std::env::temp_dir().join(format!("{}.pandora", uuid::Uuid::new_v4()));
+        let variants = Variants {
+            data: vec![test_variant(99, "A", "T")?, test_variant(199, "G", "C")?],
+        };
+        let (records, _index) = variant_records_and_index_sections(&variants)?;
+
+        write_container(&path, test_header(), vec![records])?;
+        let pointer = VariantRecordPointer {
+            variant_idx: 1,
+            block_offset: 0,
+            block_length: read_header(&path)?
+                .1
+                .section(&SectionName::VariantRecords)
+                .context("missing variant_records section")?
+                .length,
+            block_uncompressed_length: bitcode::encode(&variants.data[0]).len() as u64
+                + bitcode::encode(&variants.data[1]).len() as u64,
+            block_compression: CompressionMetadata::default(),
+            record_offset_in_block: bitcode::encode(&variants.data[0]).len() as u64,
+            record_length_in_block: bitcode::encode(&variants.data[1]).len() as u64,
+        };
+
+        let reader = PandoraReader::open(&path)?;
+        let read = reader.read_variant_record(&pointer)?;
+        assert_eq!(read, variants.data[1]);
+
+        std::fs::remove_file(path)?;
+        Ok(())
+    }
+
+    #[test]
+    fn rejects_overlapping_section_layout() {
+        let mut header = test_header();
+        header.sections = vec![
+            SectionDescriptor {
+                name: SectionName::Variants,
+                kind: SectionKind::RawBytes,
+                compression: CompressionMetadata::default(),
+                encryption: EncryptionAlgorithm::None,
+                offset: 128,
+                length: 16,
+                nonce_b64: None,
+                tag_b64: None,
+                checksum: "blake3:unused".to_string(),
+                schema_hash: None,
+                required: true,
+            },
+            SectionDescriptor {
+                name: SectionName::Provenance,
+                kind: SectionKind::RawBytes,
+                compression: CompressionMetadata::default(),
+                encryption: EncryptionAlgorithm::None,
+                offset: 140,
+                length: 8,
+                nonce_b64: None,
+                tag_b64: None,
+                checksum: "blake3:unused".to_string(),
+                schema_hash: None,
+                required: true,
+            },
+        ];
+
+        let err = validate_section_layout(&header, 64, 200).unwrap_err();
+        assert!(err.to_string().contains("overlaps"));
+    }
+
     #[test]
     fn validation_reports_corrupt_section() -> anyhow::Result<()> {
         let path = std::env::temp_dir().join(format!("{}.pandora", uuid::Uuid::new_v4()));

+ 49 - 5
src/pipes/somatic.rs

@@ -34,9 +34,9 @@ use crate::{
     create_should_run, init_somatic_callers,
     io::somaticpipe_container::{
         bam_qc_section, copy_number_section, pipe_qc_section, provenance_section,
-        variant_records_and_index_sections, variants_section, BamQcPayload, CompressionMetadata,
-        ContainerHeader, EncryptionMetadata, PipeQcPayload, ProducerMetadata, ProvenancePayload,
-        SampleMetadata,
+        variant_records_and_index_sections, variants_section, BamQcPayload, CallerOutputSummary,
+        CompressionMetadata, ContainerHeader, EncryptionMetadata, FilterStepSummary, PipeQcPayload,
+        ProducerMetadata, ProvenancePayload, SampleMetadata,
     },
     runners::Run,
     variant::{
@@ -623,13 +623,16 @@ fn write_pandora_output(
     let mut annotation_stats = BTreeMap::new();
     annotation_stats.insert("final".to_string(), final_annotation_stats.clone());
 
+    let caller_outputs = caller_output_summaries(&somatic_pipe_stats);
+    let filter_steps = filter_step_summaries(&somatic_pipe_stats);
+
     let pipe_qc = PipeQcPayload {
         somatic_pipe_stats,
         variant_stats: Some(variant_stats),
         annotation_stats,
         vep_stats: Some(vep_stats),
-        caller_outputs: Vec::new(),
-        filter_steps: Vec::new(),
+        caller_outputs,
+        filter_steps,
     };
     sections.push(pipe_qc_section(&pipe_qc)?);
 
@@ -669,6 +672,47 @@ fn write_pandora_output(
     crate::io::somaticpipe_container::write_container(output_path, header, sections)
 }
 
+fn caller_output_summaries(somatic_pipe_stats: &SomaticPipeStats) -> Vec<CallerOutputSummary> {
+    somatic_pipe_stats
+        .input
+        .somatic
+        .iter()
+        .chain(&somatic_pipe_stats.input.solo_tumor)
+        .chain(&somatic_pipe_stats.input.germline)
+        .chain(&somatic_pipe_stats.input.solo_constit)
+        .map(|(caller, n_input)| CallerOutputSummary {
+            caller: caller.to_string(),
+            n_input: *n_input,
+            n_after_filters: None,
+        })
+        .collect()
+}
+
+fn filter_step_summaries(somatic_pipe_stats: &SomaticPipeStats) -> Vec<FilterStepSummary> {
+    vec![
+        FilterStepSummary {
+            name: "germline_or_constit".to_string(),
+            removed: somatic_pipe_stats.n_constit_germline,
+        },
+        FilterStepSummary {
+            name: "low_constit_depth".to_string(),
+            removed: somatic_pipe_stats.n_low_constit,
+        },
+        FilterStepSummary {
+            name: "high_constit_alt".to_string(),
+            removed: somatic_pipe_stats.n_high_alt_constit,
+        },
+        FilterStepSummary {
+            name: "gnomad_and_constit_alt".to_string(),
+            removed: somatic_pipe_stats.n_high_alt_constit_gnomad,
+        },
+        FilterStepSummary {
+            name: "low_entropy".to_string(),
+            removed: somatic_pipe_stats.n_low_entropies,
+        },
+    ]
+}
+
 pub fn const_stats(id: String, config: Config) -> anyhow::Result<()> {
     info!("Loading Germline");
     let annotations = Annotations::default();

+ 88 - 1
src/variant/variant_collection.rs

@@ -8,7 +8,7 @@ use std::{
 
 use crate::{
     callers::nanomonsv::nanomonsv_insert_classify,
-    io::{tsv::TsvLine, vcf::read_vcf},
+    io::{somaticpipe_container::PandoraReader, tsv::TsvLine, vcf::read_vcf},
     runners::Run,
 };
 use anyhow::Context;
@@ -1573,6 +1573,27 @@ impl Variants {
         Ok(decoded)
     }
 
+    pub fn load_from_pandora(path: impl AsRef<Path>) -> anyhow::Result<Self> {
+        let path = path.as_ref();
+        info!("Load from .pandora container: {}", path.display());
+        PandoraReader::open(path)
+            .with_context(|| format!("Failed to open .pandora container: {}", path.display()))?
+            .variants()
+            .with_context(|| format!("Failed to load variants section from {}", path.display()))
+    }
+
+    pub fn load_from_path(path: impl AsRef<Path>) -> anyhow::Result<Self> {
+        let path = path.as_ref();
+        let filename = path.to_string_lossy();
+        if path.extension().and_then(|ext| ext.to_str()) == Some("pandora") {
+            Self::load_from_pandora(path)
+        } else if filename.ends_with(".json.gz") {
+            Self::load_from_json(&filename)
+        } else {
+            Self::load_from_file(&filename)
+        }
+    }
+
     /// Write the complete VCF to the given output path, using a dict file for contig headers.
     pub fn write_vcf(&self, output_path: &str, dict_path: &str, force: bool) -> anyhow::Result<()> {
         let contigs = crate::io::dict::read_dict(dict_path)?;
@@ -2281,10 +2302,76 @@ mod tests {
         annotation::Caller,
         callers::{clairs::ClairS, savana::Savana},
         helpers::test_init,
+        io::somaticpipe_container::{
+            variants_section, CompressionMetadata, ContainerHeader, EncryptionMetadata,
+            ProducerMetadata, SampleMetadata, PANDORA_FORMAT_VERSION,
+        },
         pipes::Initialize,
         variant::vcf_variant::Variants,
     };
 
+    fn test_pandora_header() -> ContainerHeader {
+        ContainerHeader {
+            format: "somaticpipe.output".to_string(),
+            format_version: PANDORA_FORMAT_VERSION,
+            producer: ProducerMetadata {
+                name: "pandora_lib_promethion".to_string(),
+                pipeline: "SomaticPipe".to_string(),
+                pipeline_version: "0.1.0".to_string(),
+                git_commit: None,
+                created_at: chrono::Utc::now(),
+            },
+            sample: SampleMetadata {
+                sample_id: "sample_001".to_string(),
+                tumor_timepoint: Some("diag".to_string()),
+                normal_timepoint: Some("constit".to_string()),
+                reference: Some("hs1".to_string()),
+                reference_digest: None,
+            },
+            compression: CompressionMetadata::default(),
+            encryption: EncryptionMetadata::default(),
+            sections: Vec::new(),
+        }
+    }
+
+    #[test]
+    fn loads_variants_from_pandora_container() -> anyhow::Result<()> {
+        let path = std::env::temp_dir().join(format!("{}.pandora", uuid::Uuid::new_v4()));
+        let variants = super::Variants::default();
+        let section = variants_section(&variants);
+
+        crate::io::somaticpipe_container::write_container(
+            &path,
+            test_pandora_header(),
+            vec![section],
+        )?;
+
+        let loaded = super::Variants::load_from_pandora(&path)?;
+        assert_eq!(loaded.data.len(), variants.data.len());
+
+        std::fs::remove_file(path)?;
+        Ok(())
+    }
+
+    #[test]
+    fn load_from_path_dispatches_pandora_container() -> anyhow::Result<()> {
+        let path = std::env::temp_dir().join(format!("{}.pandora", uuid::Uuid::new_v4()));
+        let variants = super::Variants::default();
+        let section = variants_section(&variants);
+
+        crate::io::somaticpipe_container::write_container(
+            &path,
+            test_pandora_header(),
+            vec![section],
+        )?;
+
+        let loaded = super::Variants::load_from_path(&path)?;
+        assert_eq!(loaded.data.len(), variants.data.len());
+
+        std::fs::remove_file(path)?;
+        Ok(())
+    }
+
     #[test]
     fn annotate_constit() -> anyhow::Result<()> {
         test_init();