readers.rs 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. //! File reader helpers for plain-text, BGZF, and Tabix-indexed genomic files.
  2. //!
  3. //! **Important:** `.gz` files are assumed to be **BGZF** (block gzip), not standard gzip.
  4. //! BGZF is the format produced by `bgzip` and used by BAM, VCF, BED.gz, etc.
  5. //! Plain `gzip` output will not decompress correctly here.
  6. //!
  7. //! # Tabix region fetch
  8. //!
  9. //! [`fetch_tabix_lines`] performs random-access region queries against any
  10. //! Tabix-indexed BGZF file, returning the raw data lines that overlap the
  11. //! requested [`GenomeRange`]. Format-specific wrappers in `bed.rs` and `vcf.rs`
  12. //! parse those lines into typed structures.
  13. use std::{
  14. fs::{self, File},
  15. io::{BufRead, BufReader, Read, Write},
  16. path::Path,
  17. };
  18. use anyhow::Context;
  19. use log::debug;
  20. use noodles_bgzf as bgzf;
  21. use noodles_core::{region::Interval, Position};
  22. use noodles_csi::BinningIndex;
  23. use noodles_tabix as tabix;
  24. use crate::{
  25. helpers::TempFileGuard,
  26. io::writers::{finalize_bgzf_file, get_gz_writer},
  27. positions::GenomeRange,
  28. };
  29. /// Type alias for a BGZF reader wrapping any `Read` source.
  30. pub type BGZFReader<R> = bgzf::io::Reader<R>;
  31. /// Open a plain-text or BGZF-compressed file for reading.
  32. ///
  33. /// The file format is detected from the extension. Supported extensions:
  34. /// `gz` (BGZF), `vcf`, `bed`, `tsv`, `json`, `chain`.
  35. ///
  36. /// `.gz` files are opened with [`BGZFReader`] — they must be BGZF-encoded,
  37. /// not plain gzip.
  38. ///
  39. /// # Arguments
  40. ///
  41. /// * `path` - Path to the file
  42. ///
  43. /// # Returns
  44. ///
  45. /// A boxed `Read` impl, buffered internally.
  46. ///
  47. /// # Errors
  48. ///
  49. /// Returns an error if the extension is missing, unrecognised, or the file
  50. /// cannot be opened.
  51. pub fn get_reader(path: &str) -> anyhow::Result<Box<dyn Read>> {
  52. debug!("Reading: {path}");
  53. let file_type = Path::new(path)
  54. .extension()
  55. .and_then(|s| s.to_str())
  56. .with_context(|| format!("can't parse extension from {path}"))?;
  57. anyhow::ensure!(
  58. matches!(file_type, "gz" | "vcf" | "bed" | "tsv" | "json" | "chain"),
  59. "unsupported file extension: {file_type}"
  60. );
  61. let raw_reader = File::open(path).with_context(|| format!("failed to open {path}"))?;
  62. match file_type {
  63. "gz" => Ok(Box::new(BufReader::new(BGZFReader::new(raw_reader)))),
  64. _ => Ok(Box::new(BufReader::new(raw_reader))),
  65. }
  66. }
  67. /// Open a file as a BGZF reader, compressing it first if it is not already `.gz`.
  68. ///
  69. /// If `path` does not end in `.gz`, [`compress_to_bgzip`] is called to produce
  70. /// `<path>.gz` on disk before opening it. This is a **side-effecting** operation:
  71. /// the compressed file is created and cached for subsequent calls.
  72. ///
  73. /// # Arguments
  74. ///
  75. /// * `path` - Path to the input file (plain text or BGZF)
  76. ///
  77. /// # Returns
  78. ///
  79. /// A [`BGZFReader`] over the (possibly newly created) `.gz` file.
  80. ///
  81. /// # Errors
  82. ///
  83. /// Returns an error if the extension cannot be determined, compression fails,
  84. /// or the resulting `.gz` file cannot be opened.
  85. pub fn get_gz_reader(path: &str) -> anyhow::Result<BGZFReader<File>> {
  86. debug!("Reading: {path}");
  87. let file_type = Path::new(path)
  88. .extension()
  89. .and_then(|s| s.to_str())
  90. .with_context(|| format!("can't parse extension from {path}"))?;
  91. let path = if file_type != "gz" {
  92. compress_to_bgzip(path)?
  93. } else {
  94. path.to_string()
  95. };
  96. let file = File::open(&path).with_context(|| format!("failed to open BGZF file: {path}"))?;
  97. Ok(BGZFReader::new(file))
  98. }
  99. /// Compress a plain-text file to BGZF, returning the path of the `.gz` output.
  100. ///
  101. /// If `<input_path>.gz` already exists it is returned immediately (cached).
  102. /// Otherwise the file is written atomically: a UUID-named temp file is created
  103. /// in the same directory as the output (same filesystem, guaranteeing an atomic
  104. /// rename), then renamed to `<input_path>.gz` on success. A [`TempFileGuard`]
  105. /// ensures the temp file is removed automatically if the function fails or panics.
  106. ///
  107. /// # Arguments
  108. ///
  109. /// * `input_path` - Path to the plain-text input file
  110. ///
  111. /// # Returns
  112. ///
  113. /// The path of the BGZF-compressed output file (`<input_path>.gz`).
  114. ///
  115. /// # Errors
  116. ///
  117. /// Returns an error if the input cannot be read, the output cannot be written,
  118. /// or the atomic rename fails.
  119. pub fn compress_to_bgzip(input_path: &str) -> anyhow::Result<String> {
  120. let output_path = format!("{input_path}.gz");
  121. if Path::new(&output_path).exists() {
  122. return Ok(output_path);
  123. }
  124. debug!("Compressing {input_path} → {output_path}");
  125. let tmp_dir = Path::new(input_path)
  126. .parent()
  127. .unwrap_or(Path::new("."));
  128. let mut guard = TempFileGuard::new();
  129. let tmp_path = guard.tmp_path(".gz", tmp_dir);
  130. let tmp_str = tmp_path.to_string_lossy();
  131. let input_file = File::open(input_path)
  132. .with_context(|| format!("failed to open input file: {input_path}"))?;
  133. let mut reader = BufReader::new(input_file);
  134. let mut writer = get_gz_writer(&tmp_str, false)?;
  135. let mut buffer = [0u8; 8192];
  136. loop {
  137. let n = reader
  138. .read(&mut buffer)
  139. .with_context(|| format!("failed reading {input_path}"))?;
  140. if n == 0 { break; }
  141. writer
  142. .write_all(&buffer[..n])
  143. .with_context(|| format!("failed writing {tmp_str}"))?;
  144. }
  145. finalize_bgzf_file(writer, &tmp_str)?;
  146. fs::rename(&tmp_path, &output_path)
  147. .with_context(|| format!("failed to rename {tmp_str} → {output_path}"))?;
  148. guard.disarm();
  149. Ok(output_path)
  150. }
  151. /// Visit every data line in a Tabix-indexed BGZF file that overlaps `region`,
  152. /// calling `on_line` for each one.
  153. ///
  154. /// This is the zero-allocation core of the tabix fetch family. A single `String`
  155. /// buffer is reused across all lines; `on_line` receives a `&str` reference into
  156. /// that buffer — no intermediate `Vec<String>` is ever built. Callers that need
  157. /// a collected result (e.g. [`fetch_tabix_lines`], [`fetch_bed`](super::bed::fetch_bed))
  158. /// simply push into their own accumulator inside the closure.
  159. ///
  160. /// Header lines (`#`) and blank lines are skipped before `on_line` is called.
  161. /// Lines are newline-stripped before being passed to the callback.
  162. ///
  163. /// # Coordinate system
  164. ///
  165. /// `region` is **0-based half-open** `[start, end)`. The conversion to 1-based
  166. /// inclusive Tabix positions is performed internally:
  167. /// - `tabix_start = region.range.start + 1`
  168. /// - `tabix_end = region.range.end`
  169. ///
  170. /// # Arguments
  171. ///
  172. /// * `bgz_path` - Path to the BGZF file; index expected at `<bgz_path>.tbi`
  173. /// * `region` - Genomic interval to query (0-based half-open)
  174. /// * `on_line` - Callback invoked for each matching data line; returning `Err`
  175. /// aborts the fetch immediately and propagates the error
  176. ///
  177. /// # Errors
  178. ///
  179. /// Returns an error if the index or file cannot be read, a seek fails, or
  180. /// `on_line` returns an error.
  181. pub fn fetch_tabix_lines_with<F>(
  182. bgz_path: &str,
  183. region: &GenomeRange,
  184. mut on_line: F,
  185. ) -> anyhow::Result<()>
  186. where
  187. F: FnMut(&str) -> anyhow::Result<()>,
  188. {
  189. let tbi_path = format!("{bgz_path}.tbi");
  190. let index = tabix::fs::read(&tbi_path)
  191. .with_context(|| format!("cannot read tabix index: {tbi_path}"))?;
  192. let rname = region.contig();
  193. let ref_seq_id = match index
  194. .header()
  195. .ok_or_else(|| anyhow::anyhow!("tabix index has no header: {tbi_path}"))?
  196. .reference_sequence_names()
  197. .get_index_of(rname.as_bytes())
  198. {
  199. Some(id) => id,
  200. None => return Ok(()),
  201. };
  202. // 0-based half-open [start, end) → 1-based inclusive [start+1, end]
  203. let interval_start = Position::try_from(region.range.start as usize + 1)
  204. .context("region start overflow")?;
  205. let interval_end = Position::try_from(region.range.end as usize)
  206. .context("region end overflow")?;
  207. let interval = Interval::from(interval_start..=interval_end);
  208. let chunks = index
  209. .query(ref_seq_id, interval)
  210. .context("tabix region query failed")?;
  211. if chunks.is_empty() {
  212. return Ok(());
  213. }
  214. let mut reader = bgzf::io::Reader::new(
  215. File::open(bgz_path).with_context(|| format!("cannot open {bgz_path}"))?,
  216. );
  217. // Single buffer reused across all lines — no per-line allocation.
  218. let mut buf = String::new();
  219. for chunk in chunks {
  220. reader.seek(chunk.start()).context("BGZF seek failed")?;
  221. loop {
  222. if reader.virtual_position() >= chunk.end() {
  223. break;
  224. }
  225. buf.clear();
  226. let n = reader.read_line(&mut buf).context("BGZF read_line failed")?;
  227. if n == 0 {
  228. break;
  229. }
  230. let line = buf.trim_end_matches(|c| c == '\n' || c == '\r');
  231. if !line.is_empty() && !line.starts_with('#') {
  232. on_line(line)?;
  233. }
  234. }
  235. }
  236. Ok(())
  237. }
  238. /// Convenience wrapper over [`fetch_tabix_lines_with`] that collects results
  239. /// into a `Vec<String>`.
  240. ///
  241. /// Prefer [`fetch_tabix_lines_with`] directly when you need to parse or filter
  242. /// lines, to avoid the intermediate string allocation.
  243. ///
  244. /// # Errors
  245. ///
  246. /// Returns an error if the index or file cannot be read.
  247. pub fn fetch_tabix_lines(bgz_path: &str, region: &GenomeRange) -> anyhow::Result<Vec<String>> {
  248. let mut lines = Vec::new();
  249. fetch_tabix_lines_with(bgz_path, region, |line| {
  250. lines.push(line.to_owned());
  251. Ok(())
  252. })?;
  253. Ok(lines)
  254. }