Thomas 1 deň pred
rodič
commit
871e6e4129
5 zmenil súbory, kde vykonal 96 pridanie a 34 odobranie
  1. 8 7
      src/io/bed.rs
  2. 72 12
      src/io/writers.rs
  3. 2 2
      src/runners.rs
  4. 5 4
      src/scan/scan.rs
  5. 9 9
      src/variant/variants_stats.rs

+ 8 - 7
src/io/bed.rs

@@ -14,11 +14,10 @@ use anyhow::Context;
 use log::warn;
 use rayon::prelude::*;
 
-use crate::{annotation::Annotation, io::writers::get_gz_writer, positions::{GenomePosition, GenomeRange, GetGenomeRange, extract_contig_indices, find_contig_indices, overlaps_par}, variant::variant_collection::Variants};
+use crate::{annotation::Annotation, io::writers::{finalize_bgzf_file, get_gz_writer}, positions::{GenomePosition, GenomeRange, GetGenomeRange, extract_contig_indices, find_contig_indices, overlaps_par}, variant::variant_collection::Variants};
 
 use super::readers::get_reader;
 
-use noodles_bgzf as nbgzf;
 use noodles_core::Position;
 use noodles_csi::binning_index::index::reference_sequence::bin::Chunk;
 use noodles_csi::binning_index::index::Header;
@@ -316,13 +315,14 @@ pub fn convert_bgz_with_tabix(input: impl AsRef<Path>, force: bool) -> anyhow::R
         }
 
         // Record start virtual offset for this line in the *output* bgzf stream.
-        let chunk_start = writer.bgzf_pos(); // BGZF virtual offset :contentReference[oaicite:2]{index=2}
+        // let chunk_start = writer.bgzf_pos(); // BGZF virtual offset :contentReference[oaicite:2]{index=2}
 
+        let chunk_start = writer.virtual_position();
         // Write line as-is.
         writer.write_all(line.as_bytes())?;
 
         // Record end virtual offset after writing the line.
-        let chunk_end = writer.bgzf_pos(); // :contentReference[oaicite:3]{index=3}
+        let chunk_end = writer.virtual_position(); // :contentReference[oaicite:3]{index=3}
 
         // Add to tabix index if it's a data line (skip meta/header lines).
         if !line.starts_with('#') && !line.trim().is_empty() {
@@ -354,8 +354,8 @@ pub fn convert_bgz_with_tabix(input: impl AsRef<Path>, force: bool) -> anyhow::R
                 .context("BED: end must be >= 1 for tabix indexing")?;
 
             let chunk = Chunk::new(
-                nbgzf::VirtualPosition::from(chunk_start),
-                nbgzf::VirtualPosition::from(chunk_end),
+                chunk_start,
+                chunk_end,
             ); // chunk is [start, end) in virtual offsets :contentReference[oaicite:4]{index=4}
 
             indexer
@@ -364,7 +364,8 @@ pub fn convert_bgz_with_tabix(input: impl AsRef<Path>, force: bool) -> anyhow::R
         }
     }
 
-    writer.close()?; // writes EOF marker :contentReference[oaicite:5]{index=5}
+    finalize_bgzf_file(writer, &out_bgz)?;
+    // writer.close()?; // writes EOF marker :contentReference[oaicite:5]{index=5}
 
     let index = indexer.build(); // :contentReference[oaicite:6]{index=6}
     tabix::fs::write(&out_tbi, &index)?; // :contentReference[oaicite:7]{index=7}

+ 72 - 12
src/io/writers.rs

@@ -1,33 +1,56 @@
 use std::{
     fs::{self, File, OpenOptions},
-    io::BufWriter,
+    io::{BufWriter, Write},
     path::Path,
 };
 
 use anyhow::Context;
-use bgzip::{BGZFWriter, Compression};
+// use bgzip::{BGZFWriter, Compression};
 use log::info;
 
 use crate::io::readers::get_reader;
 
-pub fn get_gz_writer(path: &str, force: bool) -> anyhow::Result<BGZFWriter<File>> {
+// pub fn get_gz_writer(path: &str, force: bool) -> anyhow::Result<BGZFWriter<File>> {
+//     if !path.ends_with(".gz") {
+//         anyhow::bail!("The file should end with gz");
+//     }
+//
+//     if force && Path::new(path).exists() {
+//         fs::remove_file(path).with_context(|| anyhow::anyhow!("Failed to remove file: {path}"))?;
+//     }
+//
+//     let file = OpenOptions::new()
+//         .write(true) // Open the file for writing
+//         .create_new(true)
+//         .truncate(true)
+//         .open(path)
+//         .with_context(|| anyhow::anyhow!("failed to open the file: {path}"))?;
+//
+//     info!("Writing into {path}");
+//     Ok(BGZFWriter::new(file, Compression::default()))
+// }
+
+use noodles_bgzf as bgzf;
+
+pub fn get_gz_writer(
+    path: &str,
+    force: bool,
+) -> anyhow::Result<bgzf::io::Writer<BufWriter<std::fs::File>>> {
     if !path.ends_with(".gz") {
-        anyhow::bail!("The file should end with gz");
+        anyhow::bail!("file should end with .gz: {path}");
     }
 
-    if force && Path::new(path).exists() {
-        fs::remove_file(path).with_context(|| anyhow::anyhow!("Failed to remove file: {path}"))?;
+    if force && std::path::Path::new(path).exists() {
+        fs::remove_file(path).with_context(|| format!("failed to remove file: {path}"))?;
     }
 
     let file = OpenOptions::new()
-        .write(true) // Open the file for writing
+        .write(true)
         .create_new(true)
-        .truncate(true)
         .open(path)
-        .with_context(|| anyhow::anyhow!("failed to open the file: {path}"))?;
+        .with_context(|| format!("failed to create BGZF file: {path}"))?;
 
-    info!("Writing into {path}");
-    Ok(BGZFWriter::new(file, Compression::default()))
+    Ok(bgzf::io::Writer::new(BufWriter::new(file)))
 }
 
 pub fn get_writer(path: &str) -> anyhow::Result<BufWriter<File>> {
@@ -50,7 +73,44 @@ pub fn convert_bgz(input: impl AsRef<Path>, force: bool) -> anyhow::Result<()> {
 
     std::io::copy(&mut reader, &mut writer)?;
 
-    writer.close()?; // or writer.finish()? if gzip encoder
+    writer
+        .try_finish()
+        .with_context(|| format!("failed finishing BGZF writer: {}", input.display()))?;
+
+    let mut inner = writer
+        .finish()
+        .with_context(|| format!("failed returning inner BGZF writer: {}", input.display()))?;
+
+    inner
+        .flush()
+        .with_context(|| format!("failed flushing inner writer: {}", input.display()))?;
+
+    inner
+        .into_inner()
+        .with_context(|| format!("failed unwrapping BufWriter: {}", input.display()))?
+        .sync_all()
+        .with_context(|| format!("failed syncing file: {}", input.display()))?;
+
+    Ok(())
+}
+
+pub fn finalize_bgzf_file(
+    writer: bgzf::io::Writer<BufWriter<File>>,
+    path: &str,
+) -> anyhow::Result<()> {
+    let mut inner = writer
+        .finish()
+        .with_context(|| format!("failed finishing BGZF writer: {path}"))?;
+
+    inner
+        .flush()
+        .with_context(|| format!("failed flushing BufWriter: {path}"))?;
+
+    inner
+        .into_inner()
+        .with_context(|| format!("failed unwrapping BufWriter: {path}"))?
+        .sync_all()
+        .with_context(|| format!("failed syncing file: {path}"))?;
 
     Ok(())
 }

+ 2 - 2
src/runners.rs

@@ -12,7 +12,7 @@ use log::info;
 use serde::{Deserialize, Serialize};
 use uuid::Uuid;
 
-use crate::io::writers::get_gz_writer;
+use crate::io::writers::{finalize_bgzf_file, get_gz_writer};
 
 /// Trait for running a command.
 pub trait Run {
@@ -122,7 +122,7 @@ impl RunReport {
         let mut writer = get_gz_writer(&file_path, true)
             .with_context(|| format!("Failed to get gz writer for: {file_path}"))?;
         writer.write_all(json_data.as_bytes())?;
-        writer.close()?;
+        finalize_bgzf_file(writer, &file_path)?;
 
         Ok(())
     }

+ 5 - 4
src/scan/scan.rs

@@ -61,7 +61,7 @@ use crate::helpers::{bam_contigs, get_genome_sizes, is_file_older};
 use crate::io::bam::fb_inv_from_record;
 use crate::io::readers::get_gz_reader;
 use crate::io::tsv::tsv_reader;
-use crate::io::writers::get_gz_writer;
+use crate::io::writers::{finalize_bgzf_file, get_gz_writer};
 use crate::math::filter_outliers_modified_z_score_with_indices;
 
 use crate::config::Config;
@@ -811,9 +811,10 @@ pub fn par_whole_scan(id: &str, time_point: &str, config: &Config) -> anyhow::Re
                     .flush()
                     .with_context(|| format!("failed flushing BGZF writer: {tmp_path_str}"))?;
 
-                writer.close().with_context(|| {
-                    format!("failed closing/finalizing BGZF writer: {tmp_path_str}")
-                })?;
+                finalize_bgzf_file(writer, tmp_path_str)?;
+                // writer.close().with_context(|| {
+                //     format!("failed closing/finalizing BGZF writer: {tmp_path_str}")
+                // })?;
             }
 
             validate_count_file(tmp_path_str, contig, bins.len())

+ 9 - 9
src/variant/variants_stats.rs

@@ -164,18 +164,17 @@ use rayon::prelude::*;
 use serde::{Deserialize, Serialize, Serializer};
 
 use crate::{
-    annotation::{vep::VepImpact, Annotation},
+    annotation::{Annotation, vep::VepImpact},
     config::Config,
     helpers::bin_data,
     io::{
         bed::read_bed, dict::read_dict, gff::features_ranges, readers::get_gz_reader,
-        tsv::tsv_reader, writers::get_gz_writer,
+        tsv::tsv_reader, writers::{finalize_bgzf_file, get_gz_writer},
     },
     positions::{
-        contig_to_num, merge_overlapping_genome_ranges, par_overlaps, range_intersection_par,
-        GenomeRange,
+        GenomeRange, contig_to_num, merge_overlapping_genome_ranges, par_overlaps, range_intersection_par
     },
-    scan::bin::{parse_bin_record_into, BinRowBuf},
+    scan::bin::{BinRowBuf, parse_bin_record_into},
 };
 
 use super::variant_collection::{Variant, Variants};
@@ -454,10 +453,11 @@ impl VariantsStats {
 
         serde_json::to_writer(&mut writer, self)
             .with_context(|| anyhow::anyhow!("Failed to serialize JSON to file: {}", filename))?;
-
-        writer.close().with_context(|| {
-            anyhow::anyhow!("Failed to close BGZF writer for file: {}", filename)
-        })?;
+        finalize_bgzf_file(writer, filename)?;
+        //
+        // writer.close().with_context(|| {
+        //     anyhow::anyhow!("Failed to close BGZF writer for file: {}", filename)
+        // })?;
 
         debug!("Successfully saved variants to {}", filename);
         Ok(())