|
|
@@ -1,17 +1,23 @@
|
|
|
use std::{
|
|
|
fs,
|
|
|
- io::{BufRead, BufReader},
|
|
|
- process::Command,
|
|
|
- time::SystemTime,
|
|
|
+ io::{BufRead, BufReader, Read, Write},
|
|
|
+ process::{ChildStderr, Command, Stdio},
|
|
|
+ sync::{
|
|
|
+ mpsc::{self, Sender},
|
|
|
+ Arc, Mutex,
|
|
|
+ },
|
|
|
+ thread,
|
|
|
+ time::{Duration, SystemTime},
|
|
|
};
|
|
|
|
|
|
use log::info;
|
|
|
+use pty_process::blocking::Pty;
|
|
|
|
|
|
pub trait Run {
|
|
|
- fn run(self);
|
|
|
+ fn run(self) -> anyhow::Result<()>;
|
|
|
}
|
|
|
|
|
|
-#[derive(Debug)]
|
|
|
+#[derive(Debug, Clone)]
|
|
|
pub struct DoradoConfig {
|
|
|
pub ref_fa: String,
|
|
|
pub ref_mmi: String,
|
|
|
@@ -29,161 +35,251 @@ pub struct Dorado {
|
|
|
}
|
|
|
|
|
|
impl Dorado {
|
|
|
- pub fn init(config: DoradoConfig) -> Self {
|
|
|
- Self {
|
|
|
+ pub fn init(config: DoradoConfig) -> anyhow::Result<Self> {
|
|
|
+ Ok(Self {
|
|
|
config,
|
|
|
start_time: SystemTime::now(),
|
|
|
end_time: SystemTime::now(),
|
|
|
is_done: false,
|
|
|
log: Vec::new(),
|
|
|
- }
|
|
|
+ })
|
|
|
}
|
|
|
-}
|
|
|
-
|
|
|
-impl Run for Dorado {
|
|
|
- fn run(mut self) {
|
|
|
- // Start timer
|
|
|
- let start_time = std::time::SystemTime::now();
|
|
|
- self.start_time = start_time;
|
|
|
-
|
|
|
- info!("Running Dorado with params: {:#?}", self.config);
|
|
|
-
|
|
|
- let name = &self.config.name;
|
|
|
- let time = &self.config.time;
|
|
|
- let pod_dir = &self.config.pod_dir;
|
|
|
-
|
|
|
- let ref_fa = &self.config.ref_fa;
|
|
|
- let ref_mmi = &self.config.ref_mmi;
|
|
|
-
|
|
|
- let data_dir = "/data/longreads_basic_pipe";
|
|
|
- let dorado_bin = "/data/tools/dorado-0.7.2-linux-x64/bin/dorado";
|
|
|
- let case_dir = format!("{}/{}", data_dir, name);
|
|
|
- let time_dir = format!("{}/{}", case_dir, time);
|
|
|
- let bam = format!("{}/{}_{}_hs1.bam", time_dir, name, time);
|
|
|
-
|
|
|
+ fn create_reference_mmi(&self, ref_mmi: &str, ref_fa: &str) -> anyhow::Result<()> {
|
|
|
if !std::path::Path::new(ref_mmi).exists() {
|
|
|
Command::new("minimap2")
|
|
|
.args(["-x", "map-ont", "-d", ref_mmi, ref_fa])
|
|
|
- .output()
|
|
|
- .expect("Failed to execute minimap2");
|
|
|
+ .output()?;
|
|
|
}
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
|
|
|
- if !std::path::Path::new(&case_dir).exists() {
|
|
|
- fs::create_dir(&case_dir).expect("Failed to create case directory");
|
|
|
+ fn create_directories(&self, case_dir: &str, time_dir: &str) -> anyhow::Result<()> {
|
|
|
+ if !std::path::Path::new(case_dir).exists() {
|
|
|
+ fs::create_dir(case_dir)?;
|
|
|
}
|
|
|
-
|
|
|
- if !std::path::Path::new(&time_dir).exists() {
|
|
|
- fs::create_dir(&time_dir).expect("Failed to create time directory");
|
|
|
+ if !std::path::Path::new(time_dir).exists() {
|
|
|
+ fs::create_dir(time_dir)?;
|
|
|
}
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
|
|
|
- println!("Reading {} pod5 from: {}", time, pod_dir);
|
|
|
+ fn run_dorado_and_samtools(
|
|
|
+ &mut self,
|
|
|
+ dorado_bin: &str,
|
|
|
+ pod_dir: &str,
|
|
|
+ ref_mmi: &str,
|
|
|
+ bam: &str,
|
|
|
+ ) -> anyhow::Result<()> {
|
|
|
+ let (sender, receiver) = mpsc::channel();
|
|
|
|
|
|
- if !std::path::Path::new(&bam).exists() {
|
|
|
- let dorado_output = Command::new(dorado_bin)
|
|
|
- .args([
|
|
|
- "basecaller",
|
|
|
- "sup,5mC_5hmC",
|
|
|
- pod_dir,
|
|
|
- "--trim",
|
|
|
- "all",
|
|
|
- "--reference",
|
|
|
- ref_mmi,
|
|
|
- ])
|
|
|
- .stdout(std::process::Stdio::piped())
|
|
|
- .stderr(std::process::Stdio::piped())
|
|
|
- .spawn()
|
|
|
- .expect("Failed to execute Dorado");
|
|
|
-
|
|
|
- if let Some(stderr) = dorado_output.stderr {
|
|
|
- print_stderr(stderr, &mut self.log);
|
|
|
+ let mut dorado = Command::new(dorado_bin)
|
|
|
+ .args([
|
|
|
+ "basecaller",
|
|
|
+ "sup,5mC_5hmC",
|
|
|
+ pod_dir,
|
|
|
+ "--trim",
|
|
|
+ "all",
|
|
|
+ "--reference",
|
|
|
+ ref_mmi,
|
|
|
+ ])
|
|
|
+ .stdout(Stdio::piped())
|
|
|
+ .stderr(Stdio::piped())
|
|
|
+ .spawn()?;
|
|
|
+
|
|
|
+ // let stderr = dorado.stderr.take().expect("Failed to open stderr");
|
|
|
+
|
|
|
+ let dorado_stderr = Arc::new(Mutex::new(dorado.stderr.take().unwrap()));
|
|
|
+ let dorado_sender = sender.clone();
|
|
|
+ let dorado_stderr_clone = Arc::clone(&dorado_stderr);
|
|
|
+ thread::spawn(move || {
|
|
|
+ let _ = Self::print_stderr_live(dorado_stderr_clone, dorado_sender);
|
|
|
+ });
|
|
|
+
|
|
|
+ let mut view = Command::new("samtools")
|
|
|
+ .args(["view", "-h", "-@ 20", "-b", "/dev/stdin"])
|
|
|
+ .stdin(Stdio::from(dorado.stdout.take().unwrap()))
|
|
|
+ .stdout(Stdio::piped())
|
|
|
+ .stderr(Stdio::piped())
|
|
|
+ .spawn()?;
|
|
|
+
|
|
|
+ let view_stderr = Arc::new(Mutex::new(view.stderr.take().unwrap()));
|
|
|
+ let view_sender = sender.clone();
|
|
|
+ let view_stderr_clone = Arc::clone(&view_stderr);
|
|
|
+ thread::spawn(move || {
|
|
|
+ if let Err(e) = Self::print_stderr_live(view_stderr_clone, view_sender) {
|
|
|
+ eprintln!("Error in view stderr thread: {}", e);
|
|
|
}
|
|
|
+ });
|
|
|
|
|
|
- let samtools_view_output = Command::new("samtools")
|
|
|
- .args(["view", "-h", "-@ 20", "-b", "/dev/stdin"])
|
|
|
- .stdin(dorado_output.stdout.unwrap())
|
|
|
- .stdout(std::process::Stdio::piped())
|
|
|
- .stderr(std::process::Stdio::piped())
|
|
|
- .spawn()
|
|
|
- .expect("Failed to execute samtools view");
|
|
|
+ let mut sort = Command::new("samtools")
|
|
|
+ .args(["sort", "-@ 30", "/dev/stdin", "-o", bam])
|
|
|
+ .stdin(Stdio::from(view.stdout.take().unwrap()))
|
|
|
+ .stdout(Stdio::piped())
|
|
|
+ .stderr(Stdio::piped())
|
|
|
+ .spawn()?;
|
|
|
|
|
|
- if let Some(stderr) = samtools_view_output.stderr {
|
|
|
- print_stderr(stderr, &mut self.log);
|
|
|
+ let sort_stderr = Arc::new(Mutex::new(sort.stderr.take().unwrap()));
|
|
|
+ let sort_sender = sender.clone();
|
|
|
+ let sort_stderr_clone = Arc::clone(&sort_stderr);
|
|
|
+ thread::spawn(move || {
|
|
|
+ if let Err(e) = Self::print_stderr_live(sort_stderr_clone, sort_sender) {
|
|
|
+ eprintln!("Error in sort stderr thread: {}", e);
|
|
|
}
|
|
|
+ });
|
|
|
+
|
|
|
+ // Wait for all commands to finish
|
|
|
+ dorado.wait()?;
|
|
|
+ view.wait()?;
|
|
|
+ sort.wait()?;
|
|
|
+
|
|
|
+ // Run samtools index
|
|
|
+ let index_output = Command::new("samtools")
|
|
|
+ .args(["index", "-@ 150", bam])
|
|
|
+ .output()?;
|
|
|
|
|
|
- Command::new("samtools")
|
|
|
- .args(["sort", "-@ 30", "/dev/stdin", "-o", &bam])
|
|
|
- .stdin(samtools_view_output.stdout.unwrap())
|
|
|
- .output()
|
|
|
- .expect("Failed to execute samtools sort");
|
|
|
-
|
|
|
- // Create a buffer to store the output
|
|
|
- // let stderr = dorado_output.stderr.take().expect("Failed to open stderr");
|
|
|
- // let mut reader = std::io::BufReader::new(stderr);
|
|
|
- // let mut output = String::new();
|
|
|
- //
|
|
|
- // loop {
|
|
|
- // // Read the output of the command
|
|
|
- // match reader.read_line(&mut output) {
|
|
|
- // Ok(0) => break, // End of output
|
|
|
- // Ok(_) => {
|
|
|
- // // Print or process the output here
|
|
|
- // print!("{}", output);
|
|
|
- // io::stderr().flush().unwrap();
|
|
|
- // // print!("{}[2J", 27 as char);
|
|
|
- // print!("\x1B[2J\x1B[1;1H");
|
|
|
- //
|
|
|
- // output.clear();
|
|
|
- // }
|
|
|
- // Err(err) => {
|
|
|
- // eprintln!("Error reading from stdout: {}", err);
|
|
|
- // break;
|
|
|
- // }
|
|
|
- // }
|
|
|
- //
|
|
|
- // // Optionally, you can add a delay to control refresh rate
|
|
|
- // std::thread::sleep(Duration::from_millis(100));
|
|
|
- // }
|
|
|
-
|
|
|
- Command::new("samtools")
|
|
|
- .args(["index", "-@ 150", &bam])
|
|
|
- .output()
|
|
|
- .expect("Failed to execute samtools index");
|
|
|
+ self.print_stderr(BufReader::new(index_output.stderr.as_slice()), sender)?;
|
|
|
+
|
|
|
+ // Collect all logs
|
|
|
+ for log in receiver {
|
|
|
+ self.log.push(log);
|
|
|
}
|
|
|
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
+
|
|
|
+ fn run_cramino(&self, bam: &str, time_dir: &str, name: &str, time: &str) -> anyhow::Result<()> {
|
|
|
let cramino_out = format!("{}/{}_{}_hs1_cramino.txt", time_dir, name, time);
|
|
|
if !std::path::Path::new(&cramino_out).exists() {
|
|
|
println!("[pipe] Quality control of BAM: {}", bam);
|
|
|
-
|
|
|
Command::new("cramino")
|
|
|
- .args(["-t", "150", "--hist", "--checksum", "--karyotype", &bam])
|
|
|
- .output()
|
|
|
- .expect("Failed to execute cramino");
|
|
|
+ .args(["-t", "150", "--hist", "--checksum", "--karyotype", bam])
|
|
|
+ .output()?;
|
|
|
}
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
|
|
|
+ fn run_modkit(&self, bam: &str, time_dir: &str, name: &str, time: &str) -> anyhow::Result<()> {
|
|
|
let mod_summary = format!("{}/{}_{}_5mC_5hmC_summary.txt", time_dir, name, time);
|
|
|
if !std::path::Path::new(&mod_summary).exists() {
|
|
|
Command::new("modkit")
|
|
|
- .args(["summary", "-t", "50", &bam])
|
|
|
- .output()
|
|
|
- .expect("Failed to execute modkit summary");
|
|
|
+ .args(["summary", "-t", "50", bam])
|
|
|
+ .output()?;
|
|
|
}
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
|
|
|
+ fn create_fastq(
|
|
|
+ &self,
|
|
|
+ bam: &str,
|
|
|
+ case_dir: &str,
|
|
|
+ name: &str,
|
|
|
+ time: &str,
|
|
|
+ ) -> anyhow::Result<()> {
|
|
|
let fastq = format!("{}/{}/{}/{}_{}.fastq.gz", case_dir, name, time, name, time);
|
|
|
if !std::path::Path::new(&fastq).exists() {
|
|
|
- Command::new("samtools")
|
|
|
- .args(["fastq", "-@ 150", &bam])
|
|
|
- .stdout(std::process::Stdio::piped())
|
|
|
- .spawn()
|
|
|
- .expect("Failed to execute samtools fastq");
|
|
|
+ let samtools_fastq = Command::new("samtools")
|
|
|
+ .args(["fastq", "-@ 150", bam])
|
|
|
+ .stdout(Stdio::piped())
|
|
|
+ .spawn()?;
|
|
|
|
|
|
Command::new("crabz")
|
|
|
.args(["-f", "bgzf", "-", "-o", &fastq])
|
|
|
- .stdin(std::process::Stdio::piped())
|
|
|
- .output()
|
|
|
- .expect("Failed to execute crabz");
|
|
|
+ .stdin(samtools_fastq.stdout.unwrap())
|
|
|
+ .output()?;
|
|
|
}
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
+
|
|
|
+ fn print_stderr<R: BufRead>(
|
|
|
+ &mut self,
|
|
|
+ reader: R,
|
|
|
+ sender: Sender<String>,
|
|
|
+ ) -> anyhow::Result<()> {
|
|
|
+ for line in reader.lines() {
|
|
|
+ let line = line?;
|
|
|
+ eprintln!("{}", line);
|
|
|
+ sender.send(line.clone()).unwrap();
|
|
|
+ self.log.push(line);
|
|
|
+ }
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
+
|
|
|
+ fn print_stderr_live(
|
|
|
+ stderr: Arc<Mutex<ChildStderr>>,
|
|
|
+ sender: Sender<String>,
|
|
|
+ ) -> anyhow::Result<()> {
|
|
|
+ let mut lock = stderr.lock().unwrap();
|
|
|
+ let mut reader = BufReader::new(&mut *lock);
|
|
|
+ // let mut reader = BufReader::new(stderr.lock().unwrap());
|
|
|
+ let mut buffer = [0; 1];
|
|
|
+ let mut line = String::new();
|
|
|
+
|
|
|
+ loop {
|
|
|
+ match reader.read(&mut buffer) {
|
|
|
+ Ok(0) => break, // End of output
|
|
|
+ Ok(_) => {
|
|
|
+ let char = buffer[0] as char;
|
|
|
+ eprint!("{}", char);
|
|
|
+ std::io::stderr().flush()?;
|
|
|
+
|
|
|
+ if char == '\n' {
|
|
|
+ // Send the complete line
|
|
|
+ sender.send(line.trim().to_string())?;
|
|
|
+ line.clear();
|
|
|
+ } else {
|
|
|
+ line.push(char);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Err(err) => {
|
|
|
+ eprintln!("Error reading from stderr: {}", err);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Send any remaining content in the line
|
|
|
+ if !line.is_empty() {
|
|
|
+ sender.send(line.trim().to_string())?;
|
|
|
+ }
|
|
|
+
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl Run for Dorado {
|
|
|
+ fn run(mut self) -> anyhow::Result<()> {
|
|
|
+ let start_time = std::time::SystemTime::now();
|
|
|
+ self.start_time = start_time;
|
|
|
+
|
|
|
+ info!("Running Dorado with params: {:#?}", self.config);
|
|
|
+
|
|
|
+ let DoradoConfig {
|
|
|
+ name,
|
|
|
+ time,
|
|
|
+ pod_dir,
|
|
|
+ ref_fa,
|
|
|
+ ref_mmi,
|
|
|
+ } = self.config.clone();
|
|
|
+
|
|
|
+ let data_dir = "/data/longreads_basic_pipe";
|
|
|
+ let dorado_bin = "/data/tools/dorado-0.7.2-linux-x64/bin/dorado";
|
|
|
+ let case_dir = format!("{}/{}", data_dir, name);
|
|
|
+ let time_dir = format!("{}/{}", case_dir, time);
|
|
|
+ let bam = format!("{}/{}_{}_hs1.bam", time_dir, name, time);
|
|
|
+
|
|
|
+ self.create_reference_mmi(&ref_mmi, &ref_fa)?;
|
|
|
+ self.create_directories(&case_dir, &time_dir)?;
|
|
|
+
|
|
|
+ println!("Reading {} pod5 from: {}", time, pod_dir);
|
|
|
+
|
|
|
+ if !std::path::Path::new(&bam).exists() {
|
|
|
+ self.run_dorado_and_samtools(dorado_bin, &pod_dir, &ref_mmi, &bam)?;
|
|
|
+ }
|
|
|
+
|
|
|
+ self.run_cramino(&bam, &time_dir, &name, &time)?;
|
|
|
+ self.run_modkit(&bam, &time_dir, &name, &time)?;
|
|
|
+ self.create_fastq(&bam, &case_dir, &name, &time)?;
|
|
|
|
|
|
- // Stop timer and calculate execution time
|
|
|
let end_time = std::time::SystemTime::now();
|
|
|
self.end_time = end_time;
|
|
|
let execution_time = end_time.duration_since(start_time).unwrap().as_secs_f64();
|
|
|
@@ -192,18 +288,20 @@ impl Run for Dorado {
|
|
|
execution_time
|
|
|
);
|
|
|
self.is_done = true;
|
|
|
- }
|
|
|
-}
|
|
|
|
|
|
-fn print_stderr(stderr: std::process::ChildStderr, save: &mut Vec<String>) {
|
|
|
- let stderr_reader = BufReader::new(stderr);
|
|
|
- for line in stderr_reader.lines() {
|
|
|
- match line {
|
|
|
- Ok(line) => {
|
|
|
- eprintln!("{}", line);
|
|
|
- save.push(line);
|
|
|
- }
|
|
|
- Err(err) => eprintln!("Error reading stderr: {}", err),
|
|
|
- }
|
|
|
+ Ok(())
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+// fn print_stderr(stderr: std::process::ChildStderr, save: &mut Vec<String>) {
|
|
|
+// let stderr_reader = BufReader::new(stderr);
|
|
|
+// for line in stderr_reader.lines() {
|
|
|
+// match line {
|
|
|
+// Ok(line) => {
|
|
|
+// eprintln!("{}", line);
|
|
|
+// save.push(line);
|
|
|
+// }
|
|
|
+// Err(err) => eprintln!("Error reading stderr: {}", err),
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|