Thomas 1 年之前
父节点
当前提交
02ec23c7ae
共有 6 个文件被更改,包括 22 次插入233 次删除
  1. 14 91
      src/commands/dorado.rs
  2. 0 120
      src/commands/dorado_pty.rs
  3. 0 1
      src/commands/mod.rs
  4. 5 17
      src/lib.rs
  5. 0 1
      src/pod5.rs
  6. 3 3
      src/vcf.rs

+ 14 - 91
src/commands/dorado.rs

@@ -1,18 +1,12 @@
 use std::{
     fs,
-    io::{BufRead, BufReader, Read, Write},
+    io::{Read, Write},
     path::Path,
-    process::{ChildStderr, Command, Stdio},
-    sync::{
-        mpsc::{self, Sender},
-        Arc, Mutex,
-    },
-    thread,
-    time::{Duration, SystemTime},
+    time::SystemTime,
 };
 
 use duct::cmd;
-use log::info;
+use log::{info, warn};
 
 pub trait Run {
     fn run(self) -> anyhow::Result<()>;
@@ -50,9 +44,7 @@ impl Dorado {
     }
     fn create_reference_mmi(&self, ref_mmi: &str, ref_fa: &str) -> anyhow::Result<()> {
         if !std::path::Path::new(ref_mmi).exists() {
-            Command::new("minimap2")
-                .args(["-x", "map-ont", "-d", ref_mmi, ref_fa])
-                .output()?;
+            cmd!("minimap2", "-x", "map-ont", "-d", ref_mmi, ref_fa).run()?;
         }
         Ok(())
     }
@@ -73,7 +65,6 @@ impl Dorado {
         pod_dir: &str,
         ref_mmi: &str,
         bam: &str,
-        dorado_threads: u16,
         samtools_view_threads: u16,
         samtools_sort_threads: u16,
     ) -> anyhow::Result<()> {
@@ -83,7 +74,8 @@ impl Dorado {
         let samtools_view = format!("samtools view -h -@ {samtools_view_threads} -b /dev/stdin");
         let samtools_sort = format!("samtools sort -@ {samtools_sort_threads} /dev/stdin -o {bam}");
         let pipe = format!("{dorado} | {samtools_view} | {samtools_sort}");
-        let pipe_cmd = duct::cmd!("bash", "-c", pipe);
+        info!("Running: {pipe}");
+        let pipe_cmd = cmd!("bash", "-c", pipe);
         let mut reader = pipe_cmd.stdout_capture().reader()?;
 
         let mut buffer = [0; 1];
@@ -106,7 +98,7 @@ impl Dorado {
                     }
                 }
                 Err(err) => {
-                    eprintln!("Error reading from stderr: {}", err);
+                    warn!("Error reading from stderr: {}", err);
                     break;
                 }
             }
@@ -118,7 +110,7 @@ impl Dorado {
     fn run_cramino(&self, bam: &str, time_dir: &str, name: &str, time: &str) -> anyhow::Result<()> {
         let cramino_out = format!("{}/{}_{}_hs1_cramino.txt", time_dir, name, time);
         if !Path::new(&cramino_out).exists() {
-            println!("[pipe] Quality control of BAM: {}", bam);
+            info!("Quality control of BAM: {}", bam);
             let output = duct::cmd!(
                 "cramino",
                 "-t",
@@ -140,7 +132,7 @@ impl Dorado {
     fn run_modkit(&self, bam: &str, time_dir: &str, name: &str, time: &str) -> anyhow::Result<()> {
         let mod_summary = format!("{}/{}_{}_5mC_5hmC_summary.txt", time_dir, name, time);
         if !Path::new(&mod_summary).exists() {
-            println!("[pipe] Generating modification summary for BAM: {}", bam);
+            info!("Generating base modification summary for BAM: {}", bam);
             let output = cmd!("modkit", "summary", "-t", "50", bam)
                 .stdout_capture()
                 .unchecked()
@@ -160,70 +152,15 @@ impl Dorado {
     ) -> anyhow::Result<()> {
         let fastq = format!("{}/{}/{}/{}_{}.fastq.gz", case_dir, name, time, name, time);
         if !std::path::Path::new(&fastq).exists() {
-            // samtools fastq -@ 150 "$bam" | crabz -f bgzf - -o "$fastq"
             let samtools = format!("samtools fastq -@ 150 {bam}");
             let crabz = format!("crabz -f bgzf - -o {fastq}");
             let pipe = format!("{samtools} | {crabz}");
+            info!("Running: {pipe}");
             let pipe_cmd = duct::cmd!("bash", "-c", pipe);
-            pipe_cmd.run();
+            pipe_cmd.run()?;
         }
         Ok(())
     }
-
-    fn print_stderr<R: BufRead>(
-        &mut self,
-        reader: R,
-        sender: Sender<String>,
-    ) -> anyhow::Result<()> {
-        for line in reader.lines() {
-            let line = line?;
-            eprintln!("{}", line);
-            sender.send(line.clone()).unwrap();
-            self.log.push(line);
-        }
-        Ok(())
-    }
-
-    fn print_stderr_live(
-        stderr: Arc<Mutex<ChildStderr>>,
-        sender: Sender<String>,
-    ) -> anyhow::Result<()> {
-        let mut lock = stderr.lock().unwrap();
-        let mut reader = BufReader::new(&mut *lock);
-        // let mut reader = BufReader::new(stderr.lock().unwrap());
-        let mut buffer = [0; 1];
-        let mut line = String::new();
-
-        loop {
-            match reader.read(&mut buffer) {
-                Ok(0) => break, // End of output
-                Ok(_) => {
-                    let char = buffer[0] as char;
-                    eprint!("{}", char);
-                    std::io::stderr().flush()?;
-
-                    if char == '\n' {
-                        // Send the complete line
-                        sender.send(line.trim().to_string())?;
-                        line.clear();
-                    } else {
-                        line.push(char);
-                    }
-                }
-                Err(err) => {
-                    eprintln!("Error reading from stderr: {}", err);
-                    break;
-                }
-            }
-        }
-
-        // Send any remaining content in the line
-        if !line.is_empty() {
-            sender.send(line.trim().to_string())?;
-        }
-
-        Ok(())
-    }
 }
 
 impl Run for Dorado {
@@ -239,7 +176,7 @@ impl Run for Dorado {
             pod_dir,
             ref_fa,
             ref_mmi,
-            dorado_threads,
+            dorado_threads: _,
             samtools_view_threads,
             samtools_sort_threads,
         } = self.config.clone();
@@ -253,7 +190,7 @@ impl Run for Dorado {
         self.create_reference_mmi(&ref_mmi, &ref_fa)?;
         self.create_directories(&case_dir, &time_dir)?;
 
-        println!("Reading {} pod5 from: {}", time, pod_dir);
+        info!("Reading {} pod5 from: {}", time, pod_dir);
 
         if !std::path::Path::new(&bam).exists() {
             self.run_dorado_and_samtools(
@@ -261,7 +198,6 @@ impl Run for Dorado {
                 &pod_dir,
                 &ref_mmi,
                 &bam,
-                dorado_threads,
                 samtools_view_threads,
                 samtools_sort_threads,
             )?;
@@ -274,7 +210,7 @@ impl Run for Dorado {
         let end_time = std::time::SystemTime::now();
         self.end_time = end_time;
         let execution_time = end_time.duration_since(start_time).unwrap().as_secs_f64();
-        eprintln!(
+        info!(
             "Dorado and Minimap2 execution time: {} seconds",
             execution_time
         );
@@ -283,16 +219,3 @@ impl Run for Dorado {
         Ok(())
     }
 }
-
-// fn print_stderr(stderr: std::process::ChildStderr, save: &mut Vec<String>) {
-//     let stderr_reader = BufReader::new(stderr);
-//     for line in stderr_reader.lines() {
-//         match line {
-//             Ok(line) => {
-//                 eprintln!("{}", line);
-//                 save.push(line);
-//             }
-//             Err(err) => eprintln!("Error reading stderr: {}", err),
-//         }
-//     }
-// }

+ 0 - 120
src/commands/dorado_pty.rs

@@ -1,120 +0,0 @@
-use super::dorado::DoradoConfig;
-use anyhow::{Context, Result};
-use log::{info, warn};
-
-use std::{
-    fs,
-    io::{BufRead, BufReader, Read, Write},
-    process::{ChildStderr, Command, Stdio},
-    sync::{
-        mpsc::{self, Sender},
-        Arc, Mutex,
-    },
-    thread,
-    time::{Duration, SystemTime},
-};
-
-#[derive(Debug)]
-pub struct DoradoPty {
-    config: DoradoConfig,
-    start_time: SystemTime,
-    end_time: SystemTime,
-}
-impl DoradoPty {
-    pub fn new(config: DoradoConfig) -> Result<Self> {
-        // match unsafe { fork() }? {
-        //     ForkResult::Parent { child, .. } => {
-        //         // Wait for the child process to finish
-        //         println!("Parent process, spawned child with PID: {}", child);
-        //         let _ = nix::sys::wait::waitpid(child, None)?;
-        //     }
-        //     ForkResult::Child => {
-        //         // In the child process, execute a shell command
-        //         let shell = CString::new("/bin/sh").unwrap();
-        //         let arg1 = CString::new("-c").unwrap();
-        //         let command = CString::new("ls -a | wc -l")
-        //             .unwrap();
-        //
-        //         execvp(&shell, &[shell.clone(), arg1, command])?;
-        //     }
-        // }
-
-        let config = DoradoConfig {
-        ref_fa: "/data/ref/hs1/chm13v2.0.fa".to_string(),
-        ref_mmi: "/data/ref/chm13v2.0.mmi".to_string(),
-        name: "MICHELAS".to_string(),
-        time: "diag".to_string(),
-        pod_dir: "/data/run_data/20240403-CL/CHENU-MRD-NB22_MICHELAS-DIAG-NB23/20240403_1431_1E_PAU78358_7b839f76/pod5_pass/barcode23".to_string(),
-    };
-        let DoradoConfig {
-            name,
-            time,
-            pod_dir,
-            ref_fa,
-            ref_mmi,
-        } = config.clone();
-
-        let data_dir = "/data/longreads_basic_pipe";
-        let dorado_bin = "/data/tools/dorado-0.7.2-linux-x64/bin/dorado";
-        let case_dir = format!("{}/{}", data_dir, name);
-        let time_dir = format!("{}/{}", case_dir, time);
-        let bam = format!("{}/{}_{}_hs1.bam", time_dir, name, time);
-        let dorado = format!(
-            "{dorado_bin} basecaller sup,5mC_5hmC {pod_dir} --trim all --reference {ref_mmi}"
-        );
-        let samtools_view = format!("samtools view -h -@ 20 -b /dev/stdin");
-        let samtools_sort = format!("samtools sort -@ 30 /dev/stdin -o {bam}");
-        let pipe = format!("{dorado} | {samtools_view} | {samtools_sort}");
-        let pipe_cmd = duct::cmd!("bash", "-c", pipe);
-        let mut reader = pipe_cmd.stdout_capture().reader()?;
-        // let mut reader = big_cmd.stderr_to_stdout().reader()?;
-        // let mut lines = BufReader::new(reader).lines();
-
-        //  while let Some(line) = lines.next() {
-        //      match line {
-        //          Ok(line) => info!("{line}"),
-        //          Err(e) => warn!("{e}"),
-        //      }
-        // }
-
-        // let stderr = command.stdout.take().expect("Failed to open stderr");
-        // let mut reader = BufReader::new(stderr);
-        let mut buffer = [0; 1];
-        let mut line = String::new();
-
-        loop {
-            match reader.read(&mut buffer) {
-                Ok(0) => break, // End of output
-                Ok(_) => {
-                    let char = buffer[0] as char;
-                    eprint!("{}", char);
-                    std::io::stderr().flush()?;
-
-                    if char == '\n' {
-                        // Send the complete line
-                        line.clear();
-                    } else {
-                        line.push(char);
-                    }
-                }
-                Err(err) => {
-                    eprintln!("Error reading from stderr: {}", err);
-                    break;
-                }
-            }
-        }
-
-        // if let Some(&mut stderr) = command.stdin.take() {
-        //     let mut stderr_output = String::new();
-        //     let r = BufReader::new(&mut stderr);
-        //     stderr.read_to_string(&mut stderr_output)?;
-        //     println!("Captured stderr: {}", stderr_output);
-        // }
-        //
-        Ok(Self {
-            config,
-            start_time: SystemTime::now(),
-            end_time: SystemTime::UNIX_EPOCH,
-        })
-    }
-}

+ 0 - 1
src/commands/mod.rs

@@ -1,2 +1 @@
 pub mod dorado;
-

+ 5 - 17
src/lib.rs

@@ -131,10 +131,10 @@ impl CollectionsTasks {
             }
 
             CollectionsTasks::CompleteBam {
-                id,
-                time_point,
-                pod5_type,
-                pod5_dir,
+                id: _,
+                time_point: _,
+                pod5_type: _,
+                pod5_dir: _,
             } => warn!("TODO"),
         }
     }
@@ -173,7 +173,7 @@ mod tests {
         let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
             .build();
 
-        let runs = Pod5Collection::import_dir(
+        let _ = Pod5Collection::import_dir(
             "/data/run_data",
             "/data/flow_cells.tsv",
             "/data/longreads_basic_pipe",
@@ -182,18 +182,6 @@ mod tests {
         Ok(())
     }
 
-    // #[test_log::test]
-    // fn pty() -> anyhow::Result<()> {
-    //     DoradoPty::new(dorado::DoradoConfig {
-    //         ref_fa: "/data/ref/hs1/chm13v2.0.fa".to_string(),
-    //         ref_mmi: "/data/ref/chm13v2.0.mmi".to_string(),
-    //         name: "CONSIGNY".to_string(),
-    //         time: "mrd".to_string(),
-    //         pod_dir: "/data/run_data/20240326-CL/CONSIGNY-MRD-NB07_RICCO-DIAG-NB08/20240326_1355_1E_PAU78333_bc25da25/pod5_pass/barcode07".to_string()
-    //     })?;
-    //     Ok(())
-    // }
-
     #[test_log::test]
     fn bam() -> anyhow::Result<()> {
         let bam_collection = bam::load_bam_collection("/data/longreads_basic_pipe");

+ 0 - 1
src/pod5.rs

@@ -11,7 +11,6 @@ use std::{
     fs::{self, File, Metadata},
     os::unix::fs::MetadataExt,
     path::PathBuf,
-    str::FromStr,
     usize,
 };
 

+ 3 - 3
src/vcf.rs

@@ -95,9 +95,9 @@ pub struct VcfCollection {
 }
 
 impl VcfCollection {
-    pub fn print_tsv(&self) {
-        for vcf in self.vcfs.iter() {}
-    }
+    // pub fn print_tsv(&self) {
+    //     for vcf in self.vcfs.iter() {}
+    // }
 
     pub fn sort_by_id(&mut self) {
         self.vcfs.sort_by_key(|v| v.id.clone());