Thomas 1 жил өмнө
parent
commit
bbc7d3ebf2

+ 79 - 88
src/commands/dorado.rs

@@ -1,6 +1,7 @@
 use std::{
     fs,
     io::{BufRead, BufReader, Read, Write},
+    path::Path,
     process::{ChildStderr, Command, Stdio},
     sync::{
         mpsc::{self, Sender},
@@ -10,8 +11,8 @@ use std::{
     time::{Duration, SystemTime},
 };
 
+use duct::cmd;
 use log::info;
-use pty_process::blocking::Pty;
 
 pub trait Run {
     fn run(self) -> anyhow::Result<()>;
@@ -24,6 +25,9 @@ pub struct DoradoConfig {
     pub name: String,
     pub time: String,
     pub pod_dir: String,
+    pub dorado_threads: u16,
+    pub samtools_view_threads: u16,
+    pub samtools_sort_threads: u16,
 }
 
 pub struct Dorado {
@@ -69,79 +73,43 @@ impl Dorado {
         pod_dir: &str,
         ref_mmi: &str,
         bam: &str,
+        dorado_threads: u16,
+        samtools_view_threads: u16,
+        samtools_sort_threads: u16,
     ) -> anyhow::Result<()> {
-        let (sender, receiver) = mpsc::channel();
-
-        let mut dorado = Command::new(dorado_bin)
-            .args([
-                "basecaller",
-                "sup,5mC_5hmC",
-                pod_dir,
-                "--trim",
-                "all",
-                "--reference",
-                ref_mmi,
-            ])
-            .stdout(Stdio::piped())
-            .stderr(Stdio::piped())
-            .spawn()?;
-
-        // let stderr = dorado.stderr.take().expect("Failed to open stderr");
-
-        let dorado_stderr = Arc::new(Mutex::new(dorado.stderr.take().unwrap()));
-        let dorado_sender = sender.clone();
-        let dorado_stderr_clone = Arc::clone(&dorado_stderr);
-        thread::spawn(move || {
-            let _ = Self::print_stderr_live(dorado_stderr_clone, dorado_sender);
-        });
-
-        let mut view = Command::new("samtools")
-            .args(["view", "-h", "-@ 20", "-b", "/dev/stdin"])
-            .stdin(Stdio::from(dorado.stdout.take().unwrap()))
-            .stdout(Stdio::piped())
-            .stderr(Stdio::piped())
-            .spawn()?;
-
-        let view_stderr = Arc::new(Mutex::new(view.stderr.take().unwrap()));
-        let view_sender = sender.clone();
-        let view_stderr_clone = Arc::clone(&view_stderr);
-        thread::spawn(move || {
-            if let Err(e) = Self::print_stderr_live(view_stderr_clone, view_sender) {
-                eprintln!("Error in view stderr thread: {}", e);
-            }
-        });
-
-        let mut sort = Command::new("samtools")
-            .args(["sort", "-@ 30", "/dev/stdin", "-o", bam])
-            .stdin(Stdio::from(view.stdout.take().unwrap()))
-            .stdout(Stdio::piped())
-            .stderr(Stdio::piped())
-            .spawn()?;
-
-        let sort_stderr = Arc::new(Mutex::new(sort.stderr.take().unwrap()));
-        let sort_sender = sender.clone();
-        let sort_stderr_clone = Arc::clone(&sort_stderr);
-        thread::spawn(move || {
-            if let Err(e) = Self::print_stderr_live(sort_stderr_clone, sort_sender) {
-                eprintln!("Error in sort stderr thread: {}", e);
-            }
-        });
-
-        // Wait for all commands to finish
-        dorado.wait()?;
-        view.wait()?;
-        sort.wait()?;
+        let dorado = format!(
+            "{dorado_bin} basecaller sup,5mC_5hmC {pod_dir} --trim all --reference {ref_mmi}"
+        );
+        let samtools_view = format!("samtools view -h -@ {samtools_view_threads} -b /dev/stdin");
+        let samtools_sort = format!("samtools sort -@ {samtools_sort_threads} /dev/stdin -o {bam}");
+        let pipe = format!("{dorado} | {samtools_view} | {samtools_sort}");
+        let pipe_cmd = duct::cmd!("bash", "-c", pipe);
+        let mut reader = pipe_cmd.stdout_capture().reader()?;
 
-        // Run samtools index
-        let index_output = Command::new("samtools")
-            .args(["index", "-@ 150", bam])
-            .output()?;
+        let mut buffer = [0; 1];
+        let mut line = String::new();
 
-        self.print_stderr(BufReader::new(index_output.stderr.as_slice()), sender)?;
+        loop {
+            match reader.read(&mut buffer) {
+                Ok(0) => break, // End of output
+                Ok(_) => {
+                    let char = buffer[0] as char;
+                    eprint!("{}", char);
+                    std::io::stderr().flush()?;
 
-        // Collect all logs
-        for log in receiver {
-            self.log.push(log);
+                    if char == '\n' {
+                        // Send the complete line
+                        self.log.push(line.clone());
+                        line.clear();
+                    } else {
+                        line.push(char);
+                    }
+                }
+                Err(err) => {
+                    eprintln!("Error reading from stderr: {}", err);
+                    break;
+                }
+            }
         }
 
         Ok(())
@@ -149,21 +117,36 @@ impl Dorado {
 
     fn run_cramino(&self, bam: &str, time_dir: &str, name: &str, time: &str) -> anyhow::Result<()> {
         let cramino_out = format!("{}/{}_{}_hs1_cramino.txt", time_dir, name, time);
-        if !std::path::Path::new(&cramino_out).exists() {
+        if !Path::new(&cramino_out).exists() {
             println!("[pipe] Quality control of BAM: {}", bam);
-            Command::new("cramino")
-                .args(["-t", "150", "--hist", "--checksum", "--karyotype", bam])
-                .output()?;
+            let output = duct::cmd!(
+                "cramino",
+                "-t",
+                "150",
+                "--hist",
+                "--checksum",
+                "--karyotype",
+                bam
+            )
+            .stdout_capture()
+            .unchecked()
+            .run()?;
+
+            fs::write(&cramino_out, output.stdout)?;
         }
         Ok(())
     }
 
     fn run_modkit(&self, bam: &str, time_dir: &str, name: &str, time: &str) -> anyhow::Result<()> {
         let mod_summary = format!("{}/{}_{}_5mC_5hmC_summary.txt", time_dir, name, time);
-        if !std::path::Path::new(&mod_summary).exists() {
-            Command::new("modkit")
-                .args(["summary", "-t", "50", bam])
-                .output()?;
+        if !Path::new(&mod_summary).exists() {
+            println!("[pipe] Generating modification summary for BAM: {}", bam);
+            let output = cmd!("modkit", "summary", "-t", "50", bam)
+                .stdout_capture()
+                .unchecked()
+                .run()?;
+
+            fs::write(&mod_summary, output.stdout)?;
         }
         Ok(())
     }
@@ -177,15 +160,12 @@ impl Dorado {
     ) -> anyhow::Result<()> {
         let fastq = format!("{}/{}/{}/{}_{}.fastq.gz", case_dir, name, time, name, time);
         if !std::path::Path::new(&fastq).exists() {
-            let samtools_fastq = Command::new("samtools")
-                .args(["fastq", "-@ 150", bam])
-                .stdout(Stdio::piped())
-                .spawn()?;
-
-            Command::new("crabz")
-                .args(["-f", "bgzf", "-", "-o", &fastq])
-                .stdin(samtools_fastq.stdout.unwrap())
-                .output()?;
+            // samtools fastq -@ 150 "$bam" | crabz -f bgzf - -o "$fastq"
+            let samtools = format!("samtools fastq -@ 150 {bam}");
+            let crabz = format!("crabz -f bgzf - -o {fastq}");
+            let pipe = format!("{samtools} | {crabz}");
+            let pipe_cmd = duct::cmd!("bash", "-c", pipe);
+            pipe_cmd.run();
         }
         Ok(())
     }
@@ -259,6 +239,9 @@ impl Run for Dorado {
             pod_dir,
             ref_fa,
             ref_mmi,
+            dorado_threads,
+            samtools_view_threads,
+            samtools_sort_threads,
         } = self.config.clone();
 
         let data_dir = "/data/longreads_basic_pipe";
@@ -273,7 +256,15 @@ impl Run for Dorado {
         println!("Reading {} pod5 from: {}", time, pod_dir);
 
         if !std::path::Path::new(&bam).exists() {
-            self.run_dorado_and_samtools(dorado_bin, &pod_dir, &ref_mmi, &bam)?;
+            self.run_dorado_and_samtools(
+                dorado_bin,
+                &pod_dir,
+                &ref_mmi,
+                &bam,
+                dorado_threads,
+                samtools_view_threads,
+                samtools_sort_threads,
+            )?;
         }
 
         self.run_cramino(&bam, &time_dir, &name, &time)?;

+ 0 - 1
src/commands/dorado_pty.rs

@@ -70,7 +70,6 @@ impl DoradoPty {
         // let mut reader = big_cmd.stderr_to_stdout().reader()?;
         // let mut lines = BufReader::new(reader).lines();
 
-        warn!("dfsdf");
         //  while let Some(line) = lines.next() {
         //      match line {
         //          Ok(line) => info!("{line}"),

+ 0 - 1
src/commands/mod.rs

@@ -1,3 +1,2 @@
 pub mod dorado;
-pub mod dorado_pty;
 

+ 46 - 34
src/lib.rs

@@ -17,19 +17,26 @@ pub struct Collections {
     pub pod5: Pod5Collection,
     pub bam: BamCollection,
     pub vcf: VcfCollection,
-    pub tasks: Vec<CollectionsTasks>
+    pub tasks: Vec<CollectionsTasks>,
 }
 
 impl Collections {
     pub fn new(pod_dir: &str, corrected_fc_path: &str, result_dir: &str) -> anyhow::Result<Self> {
-        let pod5 = Pod5Collection::import_dir(
-            "/data/run_data",
-            "/data/flow_cells.tsv",
-            "/data/longreads_basic_pipe",
-        )?;
+        // let pod5 = Pod5Collection::import_dir(
+        //     "/data/run_data",
+        //     "/data/flow_cells.tsv",
+        //     "/data/longreads_basic_pipe",
+        // )?;
+        let pod5 = Pod5Collection::import_dir(pod_dir, corrected_fc_path, result_dir)?;
+
         let bam = BamCollection::new(result_dir);
         let vcf = load_vcf_collection(result_dir);
-        Ok(Self { pod5, bam, vcf, tasks: Vec::new() })
+        Ok(Self {
+            pod5,
+            bam,
+            vcf,
+            tasks: Vec::new(),
+        })
     }
 
     pub fn todo(&mut self) {
@@ -76,7 +83,6 @@ impl Collections {
             self.run();
         } else {
             while let Some(task) = self.tasks.pop() {
-                
                 task.run();
             }
         }
@@ -115,7 +121,11 @@ impl CollectionsTasks {
                         name: id.to_string(),
                         time: time_point.to_string(),
                         pod_dir: pod5_dir.display().to_string(),
-                    }).unwrap();
+                        dorado_threads: 150,
+                        samtools_view_threads: 20,
+                        samtools_sort_threads: 30,
+                    })
+                    .unwrap();
                     d.run().unwrap();
                 }
             }
@@ -132,11 +142,10 @@ impl CollectionsTasks {
 
 #[cfg(test)]
 mod tests {
-    use crate::{bam::BamType, commands::dorado_pty::DoradoPty};
-
-    use self::commands::dorado::{self, Run};
+    use self::commands::dorado;
 
     use super::*;
+    use crate::bam::BamType;
 
     #[test]
     fn it_works() {
@@ -144,17 +153,20 @@ mod tests {
         modkit::modkit(bam_path);
     }
 
-    // #[test]
-    // fn run_dorado() {
-    //     let d = dorado::Dorado::init(dorado::DoradoConfig {
-    //         ref_fa: "/data/ref/hs1/chm13v2.0.fa".to_string(),
-    //         ref_mmi: "/data/ref/chm13v2.0.mmi".to_string(),
-    //         name: "CONSIGNY".to_string(),
-    //         time: "mrd".to_string(),
-    //         pod_dir: "/data/run_data/20240326-CL/CONSIGNY-MRD-NB07_RICCO-DIAG-NB08/20240326_1355_1E_PAU78333_bc25da25/pod5_pass/barcode07".to_string()
-    //     });
-    //     d.run();
-    // }
+    #[test]
+    fn run_dorado() -> anyhow::Result<()> {
+        let d = dorado::Dorado::init(dorado::DoradoConfig {
+            ref_fa: "/data/ref/hs1/chm13v2.0.fa".to_string(),
+            ref_mmi: "/data/ref/chm13v2.0.mmi".to_string(),
+            name: "CONSIGNY".to_string(),
+            time: "mrd".to_string(),
+            pod_dir: "/data/run_data/20240326-CL/CONSIGNY-MRD-NB07_RICCO-DIAG-NB08/20240326_1355_1E_PAU78333_bc25da25/pod5_pass/barcode07".to_string(),
+            dorado_threads: 150,
+            samtools_view_threads: 20,
+            samtools_sort_threads: 30,
+        })?;
+        d.run()
+    }
 
     #[test]
     fn pod5() -> anyhow::Result<()> {
@@ -170,17 +182,17 @@ mod tests {
         Ok(())
     }
 
-    #[test_log::test]
-    fn pty() -> anyhow::Result<()> {
-        DoradoPty::new(dorado::DoradoConfig {
-            ref_fa: "/data/ref/hs1/chm13v2.0.fa".to_string(),
-            ref_mmi: "/data/ref/chm13v2.0.mmi".to_string(),
-            name: "CONSIGNY".to_string(),
-            time: "mrd".to_string(),
-            pod_dir: "/data/run_data/20240326-CL/CONSIGNY-MRD-NB07_RICCO-DIAG-NB08/20240326_1355_1E_PAU78333_bc25da25/pod5_pass/barcode07".to_string()
-        })?;
-        Ok(())
-    }
+    // #[test_log::test]
+    // fn pty() -> anyhow::Result<()> {
+    //     DoradoPty::new(dorado::DoradoConfig {
+    //         ref_fa: "/data/ref/hs1/chm13v2.0.fa".to_string(),
+    //         ref_mmi: "/data/ref/chm13v2.0.mmi".to_string(),
+    //         name: "CONSIGNY".to_string(),
+    //         time: "mrd".to_string(),
+    //         pod_dir: "/data/run_data/20240326-CL/CONSIGNY-MRD-NB07_RICCO-DIAG-NB08/20240326_1355_1E_PAU78333_bc25da25/pod5_pass/barcode07".to_string()
+    //     })?;
+    //     Ok(())
+    // }
 
     #[test_log::test]
     fn bam() -> anyhow::Result<()> {