Thomas 1 ماه پیش
والد
کامیت
04885bee38
4فایلهای تغییر یافته به همراه186 افزوده شده و 776 حذف شده
  1. 176 206
      Cargo.lock
  2. 1 2
      Cargo.toml
  3. 9 8
      src/collection/vcf.rs
  4. 0 560
      src/commands/dorado.rs

تفاوت فایلی نمایش داده نمی شود زیرا این فایل بسیار بزرگ است
+ 176 - 206
Cargo.lock


+ 1 - 2
Cargo.toml

@@ -16,14 +16,13 @@ csv = "1.3.0"
 serde = { version = "1.0.204", features = ["derive"] }
 serde_json = "1.0"
 tracing = "0.1.40"
-noodles-csi = "0.43.0"
+noodles-csi = "0.53.0"
 num-format = "0.4.4"
 byte-unit = "5.1.4"
 duct = "0.13.7"
 uuid = { version = "1.13.1", features = ["v4"] }
 rayon = "1.10.0"
 hashbrown = { version = "0.15.0", features = ["rayon"] }
-ctrlc = "3.4.4"
 lazy_static = "1.5.0"
 indicatif = "0.17.8"
 rust-htslib = "0.50.0"

+ 9 - 8
src/collection/vcf.rs

@@ -128,14 +128,15 @@ impl VcfCollection {
 
 pub fn n_variants(path: &str) -> anyhow::Result<u64> {
     let csi_src = format!("{path}.csi");
-    let index = csi::read(csi_src).context(format!("can't read index of {path}"))?;
-
-    let mut n = 0;
-    for reference_sequence in index.reference_sequences() {
-        if let Some(metadata) = reference_sequence.metadata() {
-            n += metadata.mapped_record_count()
-        }
-    }
+    let index = csi::fs::read(&csi_src)
+        .with_context(|| format!("can't read index of {path}"))?;
+
+    let n = index
+        .reference_sequences()
+        .iter()
+        .filter_map(|rs| rs.metadata())
+        .map(|m| m.mapped_record_count())
+        .sum();
 
     Ok(n)
 }

+ 0 - 560
src/commands/dorado.rs

@@ -316,563 +316,3 @@ mod tests {
         Ok(())
     }
 }
-
-#[derive(Debug, Clone)]
-pub struct DoradoParams {
-    pub ref_fa: String,
-    pub ref_mmi: String,
-    pub name: String,
-    pub time: String,
-    pub pod_dir: String,
-    pub samtools_view_threads: u16,
-    pub samtools_sort_threads: u16,
-}
-
-// pub struct Dorado {
-//     config: Config,
-//     case: FlowCellCase,
-//     case_dir: String,
-//     time_dir: String,
-//     bam: String,
-//     start_time: SystemTime,
-//     end_time: SystemTime,
-//     is_done: bool,
-// }
-//
-// impl Dorado {
-//     pub fn init(case: FlowCellCase, config: Config) -> anyhow::Result<Self> {
-//         let data_dir = &config.result_dir;
-//         let case_dir = format!("{}/{}", data_dir, case.id);
-//         let time_dir = format!("{}/{}", case_dir, case.time_point);
-//         let bam = format!("{}/{}_{}_hs1.bam", time_dir, case.id, case.time_point);
-//         debug!("Dorado init with config: {config:#?}");
-//         info!("Final BAM file: {bam}");
-//
-//         Ok(Self {
-//             config,
-//             start_time: SystemTime::now(),
-//             end_time: SystemTime::now(),
-//             is_done: false,
-//             case_dir,
-//             time_dir,
-//             bam,
-//             case,
-//         })
-//     }
-//
-//     // ------------------------------------------------------------------
-//     // Small helper to actually execute a shell command
-//     // ------------------------------------------------------------------
-//     fn run_shell(cmdline: &str) -> anyhow::Result<()> {
-//         info!("Running: {cmdline}");
-//         cmd!("bash", "-c", cmdline)
-//             .run()
-//             .map_err(|e| anyhow::anyhow!("Failed to run: {cmdline}\n\t{}", e.to_string()))?;
-//         Ok(())
-//     }
-//
-//     // ------------------------------------------------------------------
-//     // Command builders (return strings)
-//     // ------------------------------------------------------------------
-//
-//     /// minimap2 index creation (returns None if index already exists)
-//     fn create_reference_mmi_cmd(&self) -> Option<String> {
-//         if std::path::Path::new(&self.config.align.ref_mmi).exists() {
-//             None
-//         } else {
-//             Some(format!(
-//                 "minimap2 -x map-ont -d {} {}",
-//                 self.config.align.ref_mmi, self.config.align.ref_fa
-//             ))
-//         }
-//     }
-//
-//     /// Dorado + samtools pipeline for basecalling + alignment
-//     fn basecall_align_cmd(&self, dorado_bin: &str) -> anyhow::Result<String> {
-//         let pod_dir = &self.case.pod_dir;
-//         let ref_fa = &self.config.align.ref_fa;
-//         let bam = &self.bam;
-//         let samtools = &self.config.align.samtools_bin;
-//         let samtools_view_threads = self.config.align.samtools_view_threads;
-//         let samtools_sort_threads = self.config.align.samtools_sort_threads;
-//         let dorado_arg = self.config.align.dorado_basecall_arg.clone();
-//
-//         let pod_path = fs::read_dir(pod_dir)
-//             .map_err(|e| anyhow::anyhow!("Failed to read pod5 dir: {}.\n\t{e}", pod_dir.display()))?
-//             .filter_map(|p| p.ok())
-//             .map(|p| p.path())
-//             .filter(|p| p.extension().unwrap() == "pod5")
-//             .take(1)
-//             .collect::<Vec<PathBuf>>()
-//             .pop()
-//             .unwrap();
-//
-//         let sequencing_kit = Pod5Info::from_pod5(pod_path.to_str().unwrap())
-//             .sequencing_kit
-//             .to_uppercase();
-//
-//         let dorado = format!(
-//             "{dorado_bin} basecaller --kit-name {sequencing_kit} {dorado_arg} {} --trim all --emit-moves --reference {ref_fa}",
-//             pod_dir.display()
-//         );
-//         info!("Dorado command: {dorado}");
-//
-//         let samtools_view = format!("{samtools} view -h -@ {samtools_view_threads} -b /dev/stdin");
-//         let samtools_sort =
-//             format!("{samtools} sort -@ {samtools_sort_threads} /dev/stdin -o {bam}");
-//
-//         Ok(format!("{dorado} | {samtools_view} | {samtools_sort}"))
-//     }
-//
-//     /// samtools index command
-//     fn index_cmd(&self) -> String {
-//         let t = self.config.align.samtools_view_threads.to_string();
-//         format!(
-//             "{} index -@ {t} {}",
-//             self.config.align.samtools_bin, self.bam
-//         )
-//     }
-//
-//     /// cramino QC command
-//     fn cramino_cmd(&self) -> String {
-//         format!("cramino -t 150 --karyotype {}", self.bam)
-//     }
-//
-//     /// modkit summary command
-//     fn modkit_cmd(&self) -> String {
-//         format!("modkit summary -t 50 {}", self.bam)
-//     }
-//
-//     /// fastq export pipeline from BAM
-//     fn create_fastq_cmd(&self) -> String {
-//         let bam = &self.bam;
-//         let fastq = format!(
-//             "{}/{}/{}/{}_{}.fastq.gz",
-//             self.case_dir, self.case.id, self.case.time_point, self.case.id, self.case.time_point
-//         );
-//         let samtools = format!("samtools fastq -@ 150 {bam}");
-//         let crabz = format!("crabz -f bgzf - -o {fastq}");
-//         format!("{samtools} | {crabz}")
-//     }
-//
-//     /// samtools merge command used in `merge_bam`
-//     fn merge_bam_cmd(&self, bam: &Path, into: &Path) -> String {
-//         format!(
-//             "{} merge -@ 160 -h {} {} {} {}",
-//             self.config.align.samtools_bin,
-//             bam.display(),
-//             into.display(),
-//             bam.display(),
-//             into.display() // placeholder, real tmp path is managed outside
-//         )
-//     }
-//
-//     // mux basecall + samtools view into muxed.bam
-//     fn from_mux_basecall_cmd(
-//         config: &Config,
-//         sequencing_kit: &str,
-//         pod_dir: &str,
-//         muxed_bam: &str,
-//     ) -> String {
-//         let dorado_bin = &config.align.dorado_bin;
-//         let dorado_arg = &config.align.dorado_basecall_arg;
-//         let ref_mmi = &config.align.ref_mmi;
-//         let samtools_bin = &config.align.samtools_bin;
-//         let samtools_view_threads = config.align.samtools_view_threads;
-//
-//         let dorado = format!(
-//             "{dorado_bin} basecaller --kit-name {sequencing_kit} {dorado_arg} {pod_dir} --emit-moves --trim all --reference {ref_mmi}"
-//         );
-//         let samtools_view =
-//             format!("{samtools_bin} view -h -@ {samtools_view_threads} -b -o {muxed_bam}");
-//         format!("{dorado} | {samtools_view}")
-//     }
-//
-//     /// samtools split command for demux
-//     fn demux_cmd(config: &Config, muxed_bam: &str, tmp_demux_dir: &str) -> String {
-//         format!(
-//             "{} split -@ {} -f '{}/%*_%!.bam' {}",
-//             config.align.samtools_bin, config.align.samtools_view_threads, tmp_demux_dir, muxed_bam
-//         )
-//     }
-//
-//     /// dorado aligner + samtools for realignment in from_mux
-//     fn realign_cmd(
-//         config: &Config,
-//         sequencing_kit: &str,
-//         barcode: &str,
-//         bam: &str,
-//         aligned_bam: &str,
-//     ) -> String {
-//         let dorado = format!(
-//             "{} aligner --threads {} {} {}",
-//             config.align.dorado_bin, config.align.dorado_aligner_threads, config.align.ref_fa, bam,
-//         );
-//         let samtools_view = format!(
-//             "{} view -h -@ {} -b /dev/stdin",
-//             config.align.samtools_bin, config.align.samtools_view_threads
-//         );
-//         let samtools_sort = format!(
-//             "{} sort -@ {} /dev/stdin -o {}",
-//             config.align.samtools_bin, config.align.samtools_sort_threads, aligned_bam
-//         );
-//         let _ = sequencing_kit; // not used here but kept for symmetry
-//         format!("{dorado} | {samtools_view} | {samtools_sort}")
-//     }
-//
-//     // ------------------------------------------------------------------
-//     // Workflow methods that now *run* the commands
-//     // ------------------------------------------------------------------
-//
-//     fn create_reference_mmi(&self) -> anyhow::Result<()> {
-//         if let Some(cmdline) = self.create_reference_mmi_cmd() {
-//             Self::run_shell(&cmdline)?;
-//         }
-//         Ok(())
-//     }
-//
-//     fn create_directories(&self) -> anyhow::Result<()> {
-//         if !std::path::Path::new(&self.case_dir).exists() {
-//             info!("Creating directory {}", self.case_dir);
-//             fs::create_dir(&self.case_dir)?;
-//         }
-//         if !std::path::Path::new(&self.time_dir).exists() {
-//             info!("Creating directory {}", self.time_dir);
-//             fs::create_dir(&self.time_dir)?;
-//         }
-//         Ok(())
-//     }
-//
-//     fn basecall_align(&mut self, dorado_bin: &str) -> anyhow::Result<()> {
-//         let pipe = self.basecall_align_cmd(dorado_bin)?;
-//         Self::run_shell(&pipe)
-//             .map_err(|e| anyhow::anyhow!("Failed to run pipe: {pipe}.\n\t{}", e.to_string()))
-//     }
-//
-//     pub fn index(&self) -> anyhow::Result<()> {
-//         let cmdline = self.index_cmd();
-//         info!("Running samtools index for {}", self.bam);
-//         Self::run_shell(&cmdline)
-//     }
-//
-//     pub fn run_cramino(&self) -> anyhow::Result<()> {
-//         let cramino_out = format!(
-//             "{}/{}_{}_hs1_cramino.txt",
-//             self.time_dir, self.case.id, self.case.time_point
-//         );
-//         info!("Quality control with cramino for BAM: {}", self.bam);
-//         let cmdline = self.cramino_cmd();
-//
-//         let output = cmd!("bash", "-c", &cmdline)
-//             .stdout_capture()
-//             .unchecked()
-//             .run()?;
-//
-//         fs::write(cramino_out, output.stdout)?;
-//         Ok(())
-//     }
-//
-//     pub fn run_modkit(&self) -> anyhow::Result<()> {
-//         let mod_summary = format!(
-//             "{}/{}_{}_5mC_5hmC_summary.txt",
-//             self.time_dir, self.case.id, self.case.time_point
-//         );
-//         info!("Generating base modification summary for BAM: {}", self.bam);
-//         let cmdline = self.modkit_cmd();
-//
-//         let output = cmd!("bash", "-c", &cmdline)
-//             .stdout_capture()
-//             .unchecked()
-//             .run()?;
-//
-//         fs::write(mod_summary, output.stdout)?;
-//         Ok(())
-//     }
-//
-//     pub fn create_fastq(&self) -> anyhow::Result<()> {
-//         let fastq = format!(
-//             "{}/{}/{}/{}_{}.fastq.gz",
-//             self.case_dir, self.case.id, self.case.time_point, self.case.id, self.case.time_point
-//         );
-//         if !std::path::Path::new(&fastq).exists() {
-//             let pipe = self.create_fastq_cmd();
-//             Self::run_shell(&pipe)?;
-//         }
-//         Ok(())
-//     }
-//
-//     pub fn merge_bam(&self, bam: &Path) -> anyhow::Result<()> {
-//         let composition_a: Vec<String> = bam_composition(bam.to_string_lossy().as_ref(), 20000)?
-//             .iter()
-//             .map(|(i, _, _)| i.clone())
-//             .collect();
-//         let composition_b: Vec<String> = bam_composition(&self.bam, 20000)?
-//             .iter()
-//             .map(|(i, _, _)| i.clone())
-//             .collect();
-//         let n_id = composition_a
-//             .iter()
-//             .filter(|id| composition_b.contains(id))
-//             .count();
-//         if n_id > 0 {
-//             warn!(
-//                 "{} is already merged, reads with the same run_id in the destination BAM.",
-//                 self.case.id
-//             );
-//             return Ok(());
-//         }
-//
-//         let into = PathBuf::from(&self.bam);
-//         let dir = into.parent().unwrap();
-//
-//         let original_file = into.file_name().unwrap().to_string_lossy().to_string();
-//         let original_i = dir.join(format!("{original_file}.bai"));
-//         if !original_i.exists() {
-//             self.index()?;
-//         }
-//
-//         let tmp_original_file = format!("{}.bam", Uuid::new_v4());
-//         let tmp_original = dir.join(tmp_original_file.clone());
-//         let tmp_original_i = dir.join(format!("{tmp_original_file}.bai"));
-//
-//         info!("Moving {} to {}", &into.display(), &tmp_original.display());
-//         fs::rename(&into, &tmp_original)?;
-//         info!(
-//             "Moving {} to {}",
-//             &original_i.display(),
-//             &tmp_original_i.display()
-//         );
-//         fs::rename(original_i, tmp_original_i.clone())?;
-//
-//         // real merge command with the correct tmp path
-//         let merge_cmdline = format!(
-//             "{} merge -@ 160 -h {} {} {} {}",
-//             self.config.align.samtools_bin,
-//             bam.display(),
-//             into.display(),
-//             bam.display(),
-//             tmp_original.display()
-//         );
-//         info!("Running {merge_cmdline}");
-//         Self::run_shell(&merge_cmdline)?;
-//
-//         fs::remove_file(tmp_original)?;
-//         fs::remove_file(tmp_original_i)?;
-//         fs::remove_file(bam)?;
-//
-//         self.index()?;
-//         Ok(())
-//     }
-//
-//     pub fn from_mux(cases: Vec<FlowCellCase>, config: Config) -> anyhow::Result<()> {
-//         // tmp dir
-//         let tmp_dir = format!("{}/.{}", config.result_dir, Uuid::new_v4());
-//         info!("Creating tmp dir {tmp_dir}");
-//         fs::create_dir(&tmp_dir)?;
-//
-//         // basecalling into muxed.bam
-//         let muxed_bam = format!("{tmp_dir}/muxed.bam");
-//         let pod_dir = cases[0].pod_dir.display().to_string();
-//
-//         let muxed_pod_dir = &cases.first().unwrap().pod_dir;
-//         let pod_path = fs::read_dir(muxed_pod_dir)
-//             .map_err(|e| {
-//                 anyhow::anyhow!(
-//                     "Failed to read pod5 dir: {}.\n\t{e}",
-//                     muxed_pod_dir.display()
-//                 )
-//             })?
-//             .filter_map(|p| p.ok())
-//             .map(|p| p.path())
-//             .filter(|p| p.extension().unwrap() == "pod5")
-//             .take(1)
-//             .collect::<Vec<PathBuf>>()
-//             .pop()
-//             .unwrap();
-//         let sequencing_kit = Pod5Info::from_pod5(pod_path.to_str().unwrap())
-//             .sequencing_kit
-//             .to_uppercase();
-//
-//         let basecall_pipe =
-//             Self::from_mux_basecall_cmd(&config, &sequencing_kit, &pod_dir, &muxed_bam);
-//         info!("Running: {basecall_pipe}");
-//         Self::run_shell(&basecall_pipe)?;
-//         info!("Basecalling ✅");
-//
-//         // demux
-//         let tmp_demux_dir = format!("{tmp_dir}/demuxed");
-//         fs::create_dir(&tmp_demux_dir)?;
-//         let demux_cmdline = Self::demux_cmd(&config, &muxed_bam, &tmp_demux_dir);
-//         info!("Demux from {sequencing_kit} into {tmp_demux_dir}");
-//         info!("Running: {demux_cmdline}");
-//         Self::run_shell(&demux_cmdline)?;
-//         info!("Demux ✅");
-//
-//         for case in cases.iter() {
-//             let barcode = case.barcode.replace("NB", "");
-//             let bam = find_unique_file(
-//                 &tmp_demux_dir,
-//                 &format!("{sequencing_kit}_barcode{}.bam", barcode),
-//             )?;
-//
-//             let aligned_bam = if !config.align.dorado_should_realign {
-//                 bam.clone()
-//             } else {
-//                 let aligned_bam = format!(
-//                     "{tmp_demux_dir}/{sequencing_kit}_barcode{}_aligned.bam",
-//                     barcode
-//                 );
-//                 let pipe =
-//                     Self::realign_cmd(&config, &sequencing_kit, &barcode, &bam, &aligned_bam);
-//                 info!("Running {pipe}");
-//                 Self::run_shell(&pipe)?;
-//                 info!("Alignement ✅");
-//                 aligned_bam.into()
-//             };
-//
-//             let d = Dorado::init(case.clone(), config.clone())?;
-//             d.create_directories()?;
-//
-//             if PathBuf::from(&d.bam).exists() {
-//                 info!("Merging");
-//                 d.merge_bam(&PathBuf::from(aligned_bam))?;
-//             } else {
-//                 info!("Moving from {} to {}", bam, d.bam);
-//                 fs::rename(aligned_bam, d.bam.clone())?;
-//                 d.index()?;
-//             }
-//
-//             d.run_cramino()?;
-//             d.run_modkit()?;
-//         }
-//         fs::remove_dir_all(tmp_dir)?;
-//
-//         Ok(())
-//     }
-//
-//     pub fn run_pipe(&mut self) -> anyhow::Result<()> {
-//         let start_time = std::time::SystemTime::now();
-//         self.start_time = start_time;
-//
-//         debug!("Running Dorado with config: {:#?}", self.config);
-//         let dorado_bin = self.config.align.dorado_bin.clone();
-//
-//         self.create_reference_mmi()?;
-//         self.create_directories()?;
-//
-//         info!(
-//             "Reading {} pod5 from: {}",
-//             self.case.time_point, self.config.pod_dir
-//         );
-//         let bam_path = std::path::Path::new(&self.bam);
-//
-//         if !bam_path.exists() {
-//             info!("Creating new bam file");
-//             self.basecall_align(&dorado_bin)?;
-//             self.index()?;
-//         } else {
-//             let new_bam_path = bam_path
-//                 .parent()
-//                 .unwrap()
-//                 .join(format!("{}.bam", Uuid::new_v4()));
-//             warn!("Creating new bam {}", new_bam_path.display());
-//
-//             let bam = self.bam.clone();
-//             self.bam = new_bam_path.clone().to_string_lossy().to_string();
-//             self.basecall_align(&dorado_bin)?;
-//             self.bam.clone_from(&bam);
-//             self.merge_bam(&new_bam_path)?;
-//         }
-//
-//         self.run_cramino()?;
-//         self.run_modkit()?;
-//         // self.create_fastq()?;
-//
-//         let end_time = std::time::SystemTime::now();
-//         self.end_time = end_time;
-//         let execution_time = end_time.duration_since(start_time).unwrap().as_secs_f64();
-//         info!(
-//             "Dorado and Minimap2 execution time: {} seconds",
-//             execution_time
-//         );
-//         self.is_done = true;
-//
-//         Ok(())
-//     }
-//
-//     // from_flowcell stays mostly as-is; it just calls run_pipe/from_mux
-//     pub fn from_flowcell(flowcell: &FlowCell, config: &Config) -> anyhow::Result<()> {
-//         let tp_conv = |time_point: &str| -> String {
-//             match time_point {
-//                 "normal" => config.normal_name.clone(),
-//                 "tumoral" => config.tumoral_name.clone(),
-//                 _ => panic!("Error time point name"),
-//             }
-//         };
-//         use crate::collection::flowcells::FlowCellLocation::*;
-//         let base_pod_dir = match &flowcell.location {
-//             Local(_) => None,
-//             Archived(pod_tar) => {
-//                 let file = File::open(pod_tar)
-//                     .map_err(|e| anyhow::anyhow!("Failed to open tar file: {pod_tar}\n\t{e}"))?;
-//                 let mut archive = tar::Archive::new(file);
-//                 info!("Un-tar of archived {pod_tar}");
-//                 archive
-//                     .unpack(&config.unarchive_tmp_dir)
-//                     .map_err(|e| anyhow::anyhow!("Failed to un-tar: {pod_tar}\n\t{e}"))?;
-//                 info!("Un-tar of archived {pod_tar} Done.");
-//
-//                 Some(config.unarchive_tmp_dir.to_string())
-//             }
-//         };
-//
-//         use crate::collection::flowcells::FlowCellExperiment::*;
-//         match &flowcell.experiment {
-//             WGSPod5Mux(pod_dir) => {
-//                 let pod_dir = if let Some(base_pod_dir) = &base_pod_dir {
-//                     format!("{base_pod_dir}/{pod_dir}")
-//                 } else {
-//                     pod_dir.clone()
-//                 };
-//
-//                 let cases = flowcell
-//                     .cases
-//                     .iter()
-//                     .map(|c| FlowCellCase {
-//                         id: c.case_id.clone(),
-//                         time_point: tp_conv(&c.sample_type),
-//                         barcode: c.barcode.clone(),
-//                         pod_dir: pod_dir.clone().into(),
-//                     })
-//                     .collect();
-//                 info!("Starting basecaller for muxed pod5: {cases:#?}");
-//
-//                 Dorado::from_mux(cases, config.clone())?;
-//             }
-//             WGSPod5Demux(pod_dir) => {
-//                 let pod_dir = if let Some(base_pod_dir) = &base_pod_dir {
-//                     format!("{base_pod_dir}/{pod_dir}")
-//                 } else {
-//                     pod_dir.clone()
-//                 };
-//
-//                 for c in flowcell.cases.iter() {
-//                     let pod_dir = format!("{pod_dir}/barcode{}", c.barcode.replace("NB", ""));
-//                     info!("Starting basecaller for demuxed pod5: {pod_dir}");
-//                     let mut d = Dorado::init(
-//                         FlowCellCase {
-//                             id: c.case_id.clone(),
-//                             time_point: tp_conv(&c.sample_type),
-//                             barcode: c.barcode.clone(),
-//                             pod_dir: pod_dir.into(),
-//                         },
-//                         config.clone(),
-//                     )?;
-//                     d.run_pipe()?;
-//                 }
-//             }
-//         }
-//
-//         Ok(())
-//     }
-// }

برخی فایل ها در این مقایسه diff نمایش داده نمی شوند زیرا تعداد فایل ها بسیار زیاد است