From 1975bc34c0613cce5dfb18f7412812a77c1a6049 Mon Sep 17 00:00:00 2001 From: Kai Zhang Date: Wed, 13 Nov 2024 13:09:35 +0800 Subject: [PATCH] Async reader (#6) * add async support * revise the user interface * allow input from url when possible --- .gitignore | 1 + docs/api.rst | 3 +- precellar/Cargo.toml | 6 +- precellar/src/align.rs | 219 ++++++++++++++++++----------------------- precellar/src/qc.rs | 6 +- python/Cargo.toml | 8 +- python/src/lib.rs | 54 +++++----- python/src/utils.rs | 155 ++++++++++++++++++++++------- seqspec/Cargo.toml | 11 ++- seqspec/src/lib.rs | 6 +- seqspec/src/read.rs | 4 +- seqspec/src/region.rs | 2 +- seqspec/src/utils.rs | 143 ++++++++++++++++++++------- 13 files changed, 376 insertions(+), 242 deletions(-) diff --git a/.gitignore b/.gitignore index 7289f4e..b29038d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ target *.gz +*.zst # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/docs/api.rst b/docs/api.rst index be793a5..8ee48b9 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -32,4 +32,5 @@ Utilities .. autosummary:: :toctree: _autosummary - utils.strip_barcode_from_fastq \ No newline at end of file + utils.strip_barcode_from_fastq + bam_to_fastq \ No newline at end of file diff --git a/precellar/Cargo.toml b/precellar/Cargo.toml index 1adfcca..c23f016 100644 --- a/precellar/Cargo.toml +++ b/precellar/Cargo.toml @@ -6,15 +6,15 @@ edition = "2021" [dependencies] anyhow = "1.0" bed-utils = "0.5.1" -bwa-mem2 = { git = "https://github.com/regulatory-genomics/bwa-mem2-rust.git", rev = "07eda9b9c2815ae52b3fa30b01de0e19fae31fe0" } -star-aligner = { git = "https://github.com/regulatory-genomics/star-aligner", rev = "f9915ea3afbac1e8f4773e2e7c22376f1549c3c7" } +bwa-mem2 = { git = "https://github.com/regulatory-genomics/bwa-mem2-rust.git", rev = "8de06bcc0a2145fd819232ffb2bf100fb795db30" } +star-aligner = { git = "https://github.com/regulatory-genomics/star-aligner", rev = "faef1085eaf26e6e8d5875fcbc641c3af9444d89" } bstr = "1.0" either = "1.13" itertools = "0.13" indexmap = "2.5" log = "0.4" lexical = "6.1" -noodles = { version = "0.80", features = ["core", "fastq", "bam", "sam", "async"] } +noodles = { version = "0.85", features = ["core", "fastq", "bam", "sam", "async"] } kdam = "0.5.2" rayon = "1.10" smallvec = "1.13" diff --git a/precellar/src/align.rs b/precellar/src/align.rs index 4fe924c..5ea6c8a 100644 --- a/precellar/src/align.rs +++ b/precellar/src/align.rs @@ -8,9 +8,10 @@ use indexmap::IndexMap; use kdam::{tqdm, BarExt}; use log::info; use noodles::sam::alignment::record_buf::{data::field::value::Value, RecordBuf}; -use noodles::sam::alignment::{record::data::field::tag::Tag, Record}; +use noodles::sam::alignment::record::data::field::tag::Tag; use noodles::{bam, fastq, sam}; use rayon::iter::ParallelIterator; +use rayon::slice::ParallelSlice; use seqspec::{Assay, Modality, Read, RegionId, RegionIndex, RegionType}; use smallvec::SmallVec; use star_aligner::StarAligner; @@ -50,19 +51,17 @@ impl AsIterator for Vec { pub trait Aligner { type AlignOutput: AsIterator; - fn chunk_size(&self) -> usize; - fn header(&self) -> sam::Header; fn align_reads( &mut self, - header: &sam::Header, + num_threads: u16, records: Vec, ) -> Vec; fn align_read_pairs( &mut self, - header: &sam::Header, + num_threads: u16, records: Vec, ) -> Vec<(Self::AlignOutput, Self::AlignOutput)>; } @@ -72,21 +71,17 @@ pub struct DummyAligner; impl Aligner for DummyAligner { type AlignOutput = RecordBuf; - fn chunk_size(&self) -> usize { - 0 - } - fn header(&self) -> sam::Header { sam::Header::default() } - fn align_reads(&mut self, _: &sam::Header, _: Vec) -> Vec { + fn align_reads(&mut self, _: u16, _: Vec) -> Vec { Vec::new() } fn align_read_pairs( &mut self, - _: &sam::Header, + _: u16, _: Vec, ) -> Vec<(Self::AlignOutput, Self::AlignOutput)> { Vec::new() @@ -96,17 +91,13 @@ impl Aligner for DummyAligner { impl Aligner for BurrowsWheelerAligner { type AlignOutput = RecordBuf; - fn chunk_size(&self) -> usize { - self.chunk_size() - } - fn header(&self) -> sam::Header { self.get_sam_header() } fn align_reads( &mut self, - header: &sam::Header, + num_threads: u16, records: Vec, ) -> Vec { let (info, mut reads): (Vec<_>, Vec<_>) = records @@ -115,25 +106,24 @@ impl Aligner for BurrowsWheelerAligner { .unzip(); // TODO: add UMI - self.align_reads(reads.as_mut_slice()) + self.align_reads(num_threads, reads.as_mut_slice()) .enumerate() - .map(|(i, alignment)| { + .map(|(i, mut alignment)| { let (bc, umi) = info.get(i).unwrap(); add_cell_barcode( - header, - &alignment, + &mut alignment, bc.raw.sequence(), bc.raw.quality_scores(), bc.corrected.as_deref(), - ) - .unwrap() + ); + alignment }) .collect() } fn align_read_pairs( &mut self, - header: &sam::Header, + num_threads: u16, records: Vec, ) -> Vec<(Self::AlignOutput, Self::AlignOutput)> { let (info, mut reads): (Vec<_>, Vec<_>) = records @@ -145,27 +135,23 @@ impl Aligner for BurrowsWheelerAligner { ) }) .unzip(); - self.align_read_pairs(&mut reads) + self.align_read_pairs(num_threads, &mut reads) .enumerate() - .map(|(i, (ali1, ali2))| { + .map(|(i, (mut ali1, mut ali2))| { let (bc, umi) = info.get(i).unwrap(); - let ali1_ = add_cell_barcode( - &header, - &ali1, + add_cell_barcode( + &mut ali1, bc.raw.sequence(), bc.raw.quality_scores(), bc.corrected.as_deref(), - ) - .unwrap(); - let ali2_ = add_cell_barcode( - &header, - &ali2, + ); + add_cell_barcode( + &mut ali2, bc.raw.sequence(), bc.raw.quality_scores(), bc.corrected.as_deref(), - ) - .unwrap(); - (ali1_, ali2_) + ); + (ali1, ali2) }) .collect() } @@ -174,98 +160,70 @@ impl Aligner for BurrowsWheelerAligner { impl Aligner for StarAligner { type AlignOutput = Vec; - fn chunk_size(&self) -> usize { - 0 - } - fn header(&self) -> sam::Header { self.get_header().clone() } fn align_reads( &mut self, - header: &sam::Header, + num_threads: u16, records: Vec, ) -> Vec { - let (info, mut reads): (Vec<_>, Vec<_>) = records - .into_iter() - .map(|rec| ((rec.barcode.unwrap(), rec.umi), rec.read1.unwrap())) - .unzip(); - - // TODO: StarAligner can expose a method to align a single read instead of a batch, - // so that barcode and UMI processing can be done in parallel. - StarAligner::align_reads(self, reads.as_mut_slice()) - .collect::>() - .into_iter() - .enumerate() - .map(|(i, alignment)| { - let (bc, umi) = info.get(i).unwrap(); - alignment - .into_iter() - .map(|x| { - add_cell_barcode( - header, - &x, - bc.raw.sequence(), - bc.raw.quality_scores(), - bc.corrected.as_deref(), - ) - .unwrap() - }) - .collect() + let chunk_size = get_chunk_size(records.len(), num_threads as usize); + + records.par_chunks(chunk_size).flat_map_iter(|chunk| { + let mut aligner = self.clone(); + chunk.iter().map(move |rec| { + let bc = rec.barcode.as_ref().unwrap(); + let mut ali = aligner.align_read(&rec.read1.as_ref().unwrap()).unwrap(); + ali.iter_mut().for_each(|alignment| + add_cell_barcode( + alignment, + bc.raw.sequence(), + bc.raw.quality_scores(), + bc.corrected.as_deref(), + ) + ); + ali }) - .collect() + }).collect() } fn align_read_pairs( &mut self, - header: &sam::Header, + num_threads: u16, records: Vec, ) -> Vec<(Self::AlignOutput, Self::AlignOutput)> { - let (info, mut reads): (Vec<_>, Vec<_>) = records - .into_iter() - .map(|rec| { - ( - (rec.barcode.unwrap(), rec.umi), - (rec.read1.unwrap(), rec.read2.unwrap()), - ) - }) - .unzip(); - StarAligner::align_read_pairs(self, &mut reads) - .collect::>() - .into_iter() - .enumerate() - .map(|(i, (ali1, ali2))| { - let (bc, umi) = info.get(i).unwrap(); - let ali1_ = ali1 - .into_iter() - .map(|x| { - add_cell_barcode( - &header, - &x, - bc.raw.sequence(), - bc.raw.quality_scores(), - bc.corrected.as_deref(), - ) - .unwrap() - }) - .collect(); - let ali2_ = ali2 - .into_iter() - .map(|x| { - add_cell_barcode( - &header, - &x, - bc.raw.sequence(), - bc.raw.quality_scores(), - bc.corrected.as_deref(), - ) - .unwrap() - }) - .collect(); - (ali1_, ali2_) - }) - .collect() + let chunk_size = get_chunk_size(records.len(), num_threads as usize); + + records.par_chunks(chunk_size).flat_map_iter(|chunk| { + let mut aligner = self.clone(); + chunk.iter().map(move |rec| { + let bc = rec.barcode.as_ref().unwrap(); + let (mut ali1, mut ali2) = aligner.align_read_pair( + &rec.read1.as_ref().unwrap(), + &rec.read2.as_ref().unwrap() + ).unwrap(); + ali1.iter_mut().chain(ali2.iter_mut()).for_each(|alignment| + add_cell_barcode( + alignment, + bc.raw.sequence(), + bc.raw.quality_scores(), + bc.corrected.as_deref(), + ) + ); + (ali1, ali2) + }).collect::>() + }).collect() + } +} + +fn get_chunk_size(total_length: usize, num_threads: usize) -> usize { + let chunk_size = total_length / num_threads; + if chunk_size == 0 { + 1 + } else { + chunk_size } } @@ -330,8 +288,23 @@ impl FastqProcessor { metrics } + /// Align reads and return the alignments. + /// If the fastq file is paired-end, the alignments will be returned as a tuple. + /// Otherwise, the alignments will be returned as a single vector. + /// + /// # Arguments + /// + /// * `num_threads` - The number of threads to use for alignment. + /// * `chunk_size` - The maximum number of bases in a chunk. + /// + /// # Returns + /// + /// An iterator of alignments. If the fastq file is paired-end, the alignments will be returned as a tuple. + /// Otherwise, the alignments will be returned as a single vector. pub fn gen_barcoded_alignments( &mut self, + num_threads: u16, + chunk_size: usize, ) -> impl Iterator, Vec<(A::AlignOutput, A::AlignOutput)>>> + '_ { let fq_reader = self.gen_barcoded_fastq(true); @@ -349,11 +322,11 @@ impl FastqProcessor { ); let mut progress_bar = tqdm!(total = fq_reader.total_reads.unwrap_or(0)); - let fq_reader = VectorChunk::new(fq_reader, self.aligner.chunk_size()); + let fq_reader = VectorChunk::new(fq_reader, chunk_size); fq_reader.map(move |data| { let align_qc = self.align_qc.get_mut(&modality).unwrap(); if is_paired { - let results: Vec<_> = self.aligner.align_read_pairs(&header, data); + let results: Vec<_> = self.aligner.align_read_pairs(num_threads, data); results.iter().for_each(|(ali1, ali2)| { ali1.as_iter().for_each(|x| align_qc.update(x, &header)); ali2.as_iter().for_each(|x| align_qc.update(x, &header)); @@ -361,7 +334,7 @@ impl FastqProcessor { progress_bar.update(results.len()).unwrap(); Either::Right(results) } else { - let results: Vec<_> = self.aligner.align_reads(&header, data); + let results: Vec<_> = self.aligner.align_reads(num_threads, data); results.iter().for_each(|ali| { ali.as_iter().for_each(|x| align_qc.update(x, &header)); }); @@ -720,6 +693,7 @@ pub struct AnnotatedRecord { } impl AnnotatedRecord { + /// The total number of bases, including read1 and read2, in the record. pub fn len(&self) -> usize { self.read1.as_ref().map_or(0, |x| x.sequence().len()) + self.read2.as_ref().map_or(0, |x| x.sequence().len()) @@ -767,7 +741,7 @@ impl AnnotatedRecord { pub struct VectorChunk { inner: I, - chunk_size: usize, + chunk_size: usize, // The maximum number of bases in a chunk } impl VectorChunk { @@ -799,14 +773,12 @@ impl> Iterator for VectorChunk { } } -fn add_cell_barcode( - header: &sam::Header, - record: &R, +fn add_cell_barcode( + record_buf: &mut RecordBuf, ori_barcode: &[u8], ori_qual: &[u8], correct_barcode: Option<&[u8]>, -) -> std::io::Result { - let mut record_buf = RecordBuf::try_from_alignment_record(header, record)?; +) { let data = record_buf.data_mut(); data.insert( Tag::CELL_BARCODE_SEQUENCE, @@ -819,7 +791,6 @@ fn add_cell_barcode( if let Some(barcode) = correct_barcode { data.insert(Tag::CELL_BARCODE_ID, Value::String(barcode.into())); } - Ok(record_buf) } fn slice_fastq_record(record: &fastq::Record, start: usize, end: usize) -> fastq::Record { @@ -916,7 +887,7 @@ mod tests { ); let mut processor = FastqProcessor::new(spec, aligner).with_modality(Modality::ATAC); - processor.gen_barcoded_alignments().take(6).for_each(|x| { + processor.gen_barcoded_alignments(8, 50000).take(6).for_each(|x| { println!("{:?}", x); }); diff --git a/precellar/src/qc.rs b/precellar/src/qc.rs index e782bda..7400dc4 100644 --- a/precellar/src/qc.rs +++ b/precellar/src/qc.rs @@ -169,17 +169,15 @@ impl AlignQC { self.num_read2_bases += record.sequence().len() as u64; self.num_read2_q30_bases += record .quality_scores() - .as_ref() .iter() - .filter(|x| *x >= 30) + .filter(|s| s.as_ref().map(|x| *x >= 30).unwrap_or(false)) .count() as u64; } else { self.num_read1_bases += record.sequence().len() as u64; self.num_read1_q30_bases += record .quality_scores() - .as_ref() .iter() - .filter(|x| *x >= 30) + .filter(|s| s.as_ref().map(|x| *x >= 30).unwrap_or(false)) .count() as u64; } diff --git a/python/Cargo.toml b/python/Cargo.toml index 51711d2..f4c15ca 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -10,12 +10,12 @@ crate-type = ["cdylib"] [dependencies] anyhow = "1.0" -bwa-mem2 = { git = "https://github.com/regulatory-genomics/bwa-mem2-rust.git", rev = "07eda9b9c2815ae52b3fa30b01de0e19fae31fe0" } -star-aligner = { git = "https://github.com/regulatory-genomics/star-aligner", rev = "f9915ea3afbac1e8f4773e2e7c22376f1549c3c7" } +bwa-mem2 = { git = "https://github.com/regulatory-genomics/bwa-mem2-rust.git", rev = "8de06bcc0a2145fd819232ffb2bf100fb795db30" } +star-aligner = { git = "https://github.com/regulatory-genomics/star-aligner", rev = "faef1085eaf26e6e8d5875fcbc641c3af9444d89" } bstr = "1.0" either = "1.13" itertools = "0.13" -noodles = { version = "0.80", features = ["core", "fastq", "bam", "sam", "bgzf"] } +noodles = { version = "0.85", features = ["core", "fastq", "bam", "sam", "bgzf"] } seqspec = { version = "0.1", workspace = true } serde_yaml = "0.9" termtree = "0.5" @@ -24,6 +24,8 @@ regex = "1.6" log = "0.4" env_logger = "0.11" url = "2.5" +tokio = { version = "1", features = ["rt", "rt-multi-thread"] } +futures = "0.3" [dependencies.pyo3] version = "0.22.3" diff --git a/python/src/lib.rs b/python/src/lib.rs index 620913c..90ebbcf 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -21,7 +21,7 @@ use ::precellar::{ }; use pyseqspec::Assay; use seqspec::{ - utils::{open_file_for_write, Compression}, + utils::{create_file, Compression}, Modality, }; @@ -78,6 +78,9 @@ fn make_genome_index(fasta: PathBuf, genome_prefix: PathBuf) -> Result<()> { /// The temporary directory to use. /// num_threads: int /// The number of threads to use. +/// chunk_size: int +/// This parameter is used to control the number of bases processed in each chunk. +/// The actual value is determined by: chunk_size * num_threads. /// /// Returns /// ------- @@ -91,14 +94,14 @@ fn make_genome_index(fasta: PathBuf, genome_prefix: PathBuf) -> Result<()> { mito_dna=vec!["chrM".to_owned(), "M".to_owned()], shift_left=4, shift_right=-5, compression=None, compression_level=None, - temp_dir=None, num_threads=8, + temp_dir=None, num_threads=8, chunk_size=10000000, ), text_signature = "(assay, genome_index, *, modality, output_bam=None, output_fragment=None, mito_dna=['chrM', 'M'], shift_left=4, shift_right=-5, compression=None, compression_level=None, - temp_dir=None, num_threads=8)", + temp_dir=None, num_threads=8, chunk_size=10000000)", )] fn align( py: Python<'_>, @@ -113,7 +116,8 @@ fn align( compression: Option<&str>, compression_level: Option, temp_dir: Option, - num_threads: u32, + num_threads: u16, + chunk_size: usize, ) -> Result> { assert!( output_bam.is_some() || output_fragment.is_some(), @@ -128,7 +132,7 @@ fn align( let aligner = BurrowsWheelerAligner::new( FMIndex::read(genome_index).unwrap(), - AlignerOpts::default().with_n_threads(num_threads as usize), + AlignerOpts::default(), PairedEndStats::default(), ); let header = aligner.header(); @@ -150,21 +154,23 @@ fn align( anyhow::Ok(writer) }) .transpose()?; - let alignments = processor.gen_barcoded_alignments().map(|data| { - py.check_signals().unwrap(); - if let Some(writer) = &mut bam_writer { - match data.as_ref() { - Either::Left(chunk) => chunk - .iter() - .for_each(|x| writer.write_alignment_record(&header, x).unwrap()), - Either::Right(chunk) => chunk.iter().for_each(|(a, b)| { - writer.write_alignment_record(&header, a).unwrap(); - writer.write_alignment_record(&header, b).unwrap(); - }), - }; - } - data - }); + let alignments = processor + .gen_barcoded_alignments(num_threads, chunk_size) + .map(|data| { + py.check_signals().unwrap(); + if let Some(writer) = &mut bam_writer { + match data.as_ref() { + Either::Left(chunk) => chunk + .iter() + .for_each(|x| writer.write_alignment_record(&header, x).unwrap()), + Either::Right(chunk) => chunk.iter().for_each(|(a, b)| { + writer.write_alignment_record(&header, a).unwrap(); + writer.write_alignment_record(&header, b).unwrap(); + }), + }; + } + data + }); let fragment_writer = output_fragment .as_ref() @@ -172,7 +178,7 @@ fn align( let compression = compression .map(|x| Compression::from_str(x).unwrap()) .or(output.try_into().ok()); - open_file_for_write(output, compression, compression_level, num_threads) + create_file(output, compression, compression_level, num_threads as u32) }) .transpose()?; if let Some(mut writer) = fragment_writer { @@ -264,7 +270,7 @@ fn make_fragment( let compression = compression .map(|x| Compression::from_str(x).unwrap()) .or((&output).try_into().ok()); - let mut writer = open_file_for_write(output, compression, compression_level, num_threads)?; + let mut writer = create_file(output, compression, compression_level, num_threads)?; let mut fragment_generator = FragmentGenerator::default(); if let Some(dir) = temp_dir { @@ -325,11 +331,11 @@ fn make_fastq( std::fs::create_dir_all(&out_dir)?; let read1_fq = out_dir.join("R1.fq.zst"); - let read1_writer = open_file_for_write(read1_fq, Some(Compression::Zstd), None, 8)?; + let read1_writer = create_file(read1_fq, Some(Compression::Zstd), None, 8)?; let mut read1_writer = Writer::new(BufWriter::new(read1_writer)); let mut read2_writer = if fq_reader.is_paired_end() { let read2_fq = out_dir.join("R2.fq.zst"); - let read2_writer = open_file_for_write(read2_fq, Some(Compression::Zstd), None, 8)?; + let read2_writer = create_file(read2_fq, Some(Compression::Zstd), None, 8)?; let read2_writer = Writer::new(BufWriter::new(read2_writer)); Some(read2_writer) } else { diff --git a/python/src/utils.rs b/python/src/utils.rs index ec557e2..13f5eb7 100644 --- a/python/src/utils.rs +++ b/python/src/utils.rs @@ -1,10 +1,14 @@ -use std::{io::{BufReader, BufWriter}, path::PathBuf, str::FromStr}; +use std::{io::BufWriter, path::PathBuf, str::FromStr}; use precellar::utils::strip_fastq; -use seqspec::utils::{open_file_for_read, open_file_for_write, Compression}; -use noodles::fastq::{self, Reader, io::Writer}; +use seqspec::utils::{open_file_async, is_url, create_file, Compression}; +use noodles::fastq::{self, io::Writer}; +use noodles::bam; use anyhow::Result; use pyo3::{prelude::*, types::PyDict}; use regex::Regex; +use tokio::runtime::Runtime; +use futures::{StreamExt, TryStreamExt}; +use noodles::sam::alignment::record::QualityScores; /// Remove barcode from the read names of fastq records. /// @@ -14,8 +18,8 @@ use regex::Regex; /// /// Parameters /// ---------- -/// in_fq: Path -/// File path to the input fastq file. +/// in_fq: str +/// File path or URL to the input fastq file. /// out_fq: Path /// File path to the output fastq file. /// regex: str @@ -29,10 +33,13 @@ use regex::Regex; /// If None, none of the barcodes will be written to a file. /// from_description: bool /// If True, the barcode will be extracted from the description instead of the name. -/// compression: str | None +/// compression: Literal['gzip', 'zst'] | None /// Compression algorithm to use. If None, the compression algorithm will be inferred from the file extension. /// compression_level: int | None /// Compression level to use. +/// input_compression: Liter['gzip', 'zst'] | None +/// Compression algorithm of the input fastq file. This has to be specified +/// if the input fastq file is compressed and fetched from a URL. /// num_threads: int /// The number of threads to use. /// @@ -86,29 +93,34 @@ use regex::Regex; #[pyo3( signature = (in_fq, out_fq, *, regex, out_barcode=None, from_description=false, - compression=None, compression_level=None, num_threads=8, + compression=None, compression_level=None, input_compression=None, num_threads=8 ), text_signature = "(in_fq, out_fq, *, regex, out_barcode, from_description=False, - compression=None, compression_level=None, num_threads=8)", + compression=None, compression_level=None, input_compression=None, num_threads=8)", )] fn strip_barcode_from_fastq( py: Python<'_>, - in_fq: PathBuf, + in_fq: &str, out_fq: PathBuf, regex: &str, out_barcode: Option>, from_description: bool, compression: Option<&str>, compression_level: Option, + input_compression: Option<&str>, num_threads: u32, ) -> Result<()> { - let mut reader = Reader::new(BufReader::new(open_file_for_read(in_fq)?)); + if is_url(in_fq) && input_compression.is_none() { + log::warn!("The input source is URL and input_compression is None"); + } + + let re = Regex::new(regex)?; let mut fq_writer = { let compression = compression.map(|x| Compression::from_str(x).unwrap()) .or((&out_fq).try_into().ok()); - Writer::new(BufWriter::new(open_file_for_write(out_fq, compression, compression_level, num_threads)?)) + Writer::new(BufWriter::new(create_file(out_fq, compression, compression_level, num_threads)?)) }; let (barcode_keys, mut barcode_writers) = if let Some(output) = out_barcode { @@ -135,40 +147,108 @@ fn strip_barcode_from_fastq( let writers = files.into_iter().map(|file| { let compression = compression.map(|x| Compression::from_str(x).unwrap()) .or((&file).try_into().ok()); - Writer::new(BufWriter::new(open_file_for_write(file, compression, compression_level, num_threads).unwrap())) + Writer::new(BufWriter::new(create_file(file, compression, compression_level, num_threads).unwrap())) }).collect(); (Some(keys), writers) } else { (None, Vec::new()) }; - let re = Regex::new(regex)?; - reader.records().enumerate().try_for_each(|(i, record)| { - if i % 1000000 == 0 { - py.check_signals().unwrap(); - } - let record = record?; - let (record, barcodes) = strip_fastq( - record, &re, barcode_keys.as_ref().map(|x| x.as_slice()), from_description - )?; - - fq_writer.write_record(&record)?; - if let Some(barcodes) = barcodes { - barcodes.into_iter().enumerate().for_each(|(i, barcode)| { - let qual_score = vec![b'~'; barcode.len()]; - let fq = fastq::Record::new( - fastq::record::Definition::new(record.name(), ""), - barcode.into_bytes(), - qual_score, - ); - barcode_writers.get_mut(i).unwrap().write_record(&fq).unwrap(); - }) - } + let rt = Runtime::new()?; + rt.block_on(async { + let reader = open_file_async(in_fq, input_compression.map(|x| Compression::from_str(x).unwrap())).await?; + let mut reader = fastq::AsyncReader::new(tokio::io::BufReader::new(reader)); + + let mut i = 0u32; + reader.records().for_each(|record| { + if i % 1000000 == 0 { + py.check_signals().unwrap(); + } + i += 1; + + let record = record.unwrap(); + let (record, barcodes) = strip_fastq( + record, &re, barcode_keys.as_ref().map(|x| x.as_slice()), from_description + ).unwrap(); - anyhow::Ok(()) - })?; + fq_writer.write_record(&record).unwrap(); + if let Some(barcodes) = barcodes { + barcodes.into_iter().enumerate().for_each(|(i, barcode)| { + let qual_score = vec![b'~'; barcode.len()]; + let fq = fastq::Record::new( + fastq::record::Definition::new(record.name(), ""), + barcode.into_bytes(), + qual_score, + ); + barcode_writers.get_mut(i).unwrap().write_record(&fq).unwrap(); + }) + } - Ok(()) + futures::future::ready(()) + }).await; + + Ok(()) + }) +} + +/// Convert a BAM file to a FASTQ file. +/// +/// This function reads a BAM file and convert the alignments back to a FASTQ file. +/// The function will ignore secondary/supplementary alignments. +/// +/// Parameters +/// ---------- +/// input: str +/// File path or url to the input BAM file. +/// output: Path +/// File path to the output FASTQ file. +/// compression: Literal['gzip', 'zst'] | None +/// Compression algorithm to use. If None, the compression algorithm will be inferred from the file extension. +/// compression_level: int | None +/// Compression level to use. +#[pyfunction] +#[pyo3( + signature = (input, output, *, compression=None, compression_level=None), + text_signature = "(input, output, *, compression=None, compression_level=None)", +)] +fn bam_to_fastq( + py: Python<'_>, + input: &str, + output: PathBuf, + compression: Option<&str>, + compression_level: Option, +) -> Result<()> { + fn bam_to_fq(bam: &bam::Record) -> fastq::Record { + let name = bam.name().unwrap(); + let seq: Vec = bam.sequence().iter().collect(); + let qual: Vec = bam.quality_scores().iter().map(|x| x.unwrap() + 33).collect(); + fastq::Record::new(fastq::record::Definition::new(name, ""), seq, qual) + } + + let compression = compression.map(|x| Compression::from_str(x).unwrap()) + .or((&output).try_into().ok()); + let mut writer = Writer::new(BufWriter::new(create_file(output, compression, compression_level, 8)?)); + + let rt = Runtime::new()?; + rt.block_on(async { + let reader = seqspec::utils::open_file_async(input, None).await?; + let mut reader = bam::r#async::io::Reader::new(reader); + reader.read_header().await?; + + let mut n = 0u32; + while let Some(record) = reader.records().try_next().await.unwrap() { + if n % 100000 == 0 { + py.check_signals()?; + } + + let flag = record.flags(); + if flag.is_unmapped() || !(flag.is_secondary() || flag.is_supplementary()) { + writer.write_record(&bam_to_fq(&record))?; + } + n += 1; + } + Ok(()) + }) } #[pymodule] @@ -176,6 +256,7 @@ pub(crate) fn register_submodule(parent_module: &Bound<'_, PyModule>) -> PyResul let utils = PyModule::new_bound(parent_module.py(), "utils")?; utils.add_function(wrap_pyfunction!(strip_barcode_from_fastq, &utils)?)?; + utils.add_function(wrap_pyfunction!(bam_to_fastq, &utils)?)?; parent_module.add_submodule(&utils) } diff --git a/seqspec/Cargo.toml b/seqspec/Cargo.toml index 9521c95..56b46ff 100644 --- a/seqspec/Cargo.toml +++ b/seqspec/Cargo.toml @@ -11,9 +11,14 @@ indexmap = "2.5" log = "0.4" hamming = "0.1" home = "0.5" -noodles = { version = "0.80", features = ["core", "fastq", "async"] } -reqwest = { version = "0.12", features = ["blocking"] } +noodles = { version = "0.85", features = ["core", "fastq", "async"] } +url = "2.5" +reqwest = { version = "0.12", features = ["stream", "blocking"] } serde = { version = "1.0", features = ["derive", "rc"] } serde_yaml = "0.9" zstd = { version = "0.13", features = ["zstdmt"] } -multi_reader = "0.1" \ No newline at end of file +multi_reader = "0.1" +tokio = "1.41" +tokio-util = "0.7" +futures = "0.3" +async-compression = { version = "0.4", features = ["gzip", "zstd", "tokio"] } \ No newline at end of file diff --git a/seqspec/src/lib.rs b/seqspec/src/lib.rs index 852becd..8e5d3d1 100644 --- a/seqspec/src/lib.rs +++ b/seqspec/src/lib.rs @@ -21,7 +21,7 @@ use std::{ str::FromStr, sync::{Arc, RwLock}, }; -use utils::{open_file_for_write, Compression}; +use utils::{create_file, Compression}; #[derive(Deserialize, Serialize, Debug, Clone, PartialEq)] pub struct Assay { @@ -403,13 +403,13 @@ impl Assay { .map(|(read, _)| { let output_valid = dir.as_ref().join(format!("{}.fq.zst", read.read_id)); let output_valid = - open_file_for_write(output_valid, Some(Compression::Zstd), Some(9), 8)?; + create_file(output_valid, Some(Compression::Zstd), Some(9), 8)?; let output_valid = fastq::io::Writer::new(output_valid); let output_other = dir .as_ref() .join(format!("Invalid_{}.fq.zst", read.read_id)); let output_other = - open_file_for_write(output_other, Some(Compression::Zstd), Some(9), 8)?; + create_file(output_other, Some(Compression::Zstd), Some(9), 8)?; let output_other = fastq::io::Writer::new(output_other); anyhow::Ok((output_valid, output_other)) diff --git a/seqspec/src/read.rs b/seqspec/src/read.rs index 7e66397..0e64b1f 100644 --- a/seqspec/src/read.rs +++ b/seqspec/src/read.rs @@ -296,12 +296,12 @@ impl File { /// The base_dir is used to resolve relative paths. pub fn open(&self) -> Result> { match self.urltype { - UrlType::Local => Ok(Box::new(crate::utils::open_file_for_read(&self.url)?)), + UrlType::Local => Ok(Box::new(crate::utils::open_file(&self.url)?)), _ => { let mut cache = Cache::new().unwrap(); cache.dir = home::home_dir().unwrap().join(".cache/seqspec"); let file = cache.cached_path(&self.url).unwrap(); - Ok(Box::new(crate::utils::open_file_for_read(file)?)) + Ok(Box::new(crate::utils::open_file(file)?)) } } } diff --git a/seqspec/src/region.rs b/seqspec/src/region.rs index 1741aa3..9235dbf 100644 --- a/seqspec/src/region.rs +++ b/seqspec/src/region.rs @@ -277,7 +277,7 @@ impl Onlist { let mut cache = Cache::new()?; cache.dir = home::home_dir().unwrap().join(".cache/seqspec"); let file = cache.cached_path(&self.url)?; - let reader = std::io::BufReader::new(crate::utils::open_file_for_read(file)?); + let reader = std::io::BufReader::new(crate::utils::open_file(file)?); reader.lines().map(|x| Ok(x?.into_bytes())).collect() } diff --git a/seqspec/src/utils.rs b/seqspec/src/utils.rs index a0945b0..202716a 100644 --- a/seqspec/src/utils.rs +++ b/seqspec/src/utils.rs @@ -1,22 +1,104 @@ -use std::{fs::File, io::{BufWriter, Write}, path::{Path, PathBuf}, str::FromStr}; -use anyhow::{Context, Result, anyhow}; +use anyhow::{anyhow, bail, Context, Result}; +use futures::StreamExt; +use std::{ + fs::File, io::{BufWriter, Write}, path::{Path, PathBuf}, str::FromStr +}; +use tokio::io::AsyncRead; +use tokio_util::io::StreamReader; +use async_compression::tokio::bufread::GzipDecoder; +use async_compression::tokio::bufread::ZstdDecoder; +use url::Url; + +pub fn create_file>( + filename: P, + compression: Option, + compression_level: Option, + num_threads: u32, +) -> Result> { + let buffer = BufWriter::new( + File::create(&filename) + .with_context(|| format!("cannot create file: {}", filename.as_ref().display()))?, + ); + let writer: Box = match compression { + None => Box::new(buffer), + Some(Compression::Gzip) => Box::new(flate2::write::GzEncoder::new( + buffer, + flate2::Compression::new(compression_level.unwrap_or(6)), + )), + Some(Compression::Zstd) => { + let mut zstd = + zstd::stream::Encoder::new(buffer, compression_level.unwrap_or(9) as i32)?; + zstd.multithread(num_threads)?; + Box::new(zstd.auto_finish()) + } + }; + Ok(writer) +} /// Open a file, possibly compressed. Supports gzip and zstd. -pub fn open_file_for_read>(file: P) -> Result> { +pub fn open_file>(file: P) -> Result> { let reader: Box = match detect_compression(file.as_ref()) { - Some(Compression::Gzip) => Box::new(flate2::read::MultiGzDecoder::new(File::open(file.as_ref())?)), + Some(Compression::Gzip) => Box::new(flate2::read::MultiGzDecoder::new(File::open( + file.as_ref(), + )?)), Some(Compression::Zstd) => { let r = zstd::stream::read::Decoder::new(File::open(file.as_ref())?)?; Box::new(r) - }, + } None => Box::new(File::open(file.as_ref())?), }; anyhow::Ok(reader).with_context(|| format!("cannot open file: {:?}", file.as_ref())) } +/// Fetch content from a URL or a file, possibly compressed. Supports gzip and zstd. +pub async fn open_file_async(src: &str, compression: Option) -> Result> { + if is_url(src) { + return Ok(Box::new(open_url_async(Url::parse(src).unwrap()).await?)); + } + + let reader = tokio::io::BufReader::new(tokio::fs::File::open(src).await?); + + let compression = compression.or_else(|| detect_compression(src)); + let reader: Box = match compression { + Some(Compression::Gzip) => Box::new(GzipDecoder::new(reader)), + Some(Compression::Zstd) => Box::new(ZstdDecoder::new(reader)), + None => Box::new(reader), + }; + Ok(reader) +} + +pub fn is_url(src: &str) -> bool { + if let Ok(url) = Url::parse(src) { + if !url.cannot_be_a_base() { + return true; + } + } + return false; +} + +async fn open_url_async(url: Url) -> Result { + // Create a client with automatic redirect following + let client = reqwest::Client::builder() + .redirect(reqwest::redirect::Policy::limited(10)) // Follow up to 10 redirects + .build()?; + + let response = client.get(url).send().await?; + if !response.status().is_success() { + bail!("Request failed with status: {}", response.status()); + } + + // Convert the response into a byte stream + let byte_stream = response + .bytes_stream() + .map(|result| result.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))); + Ok(StreamReader::new(byte_stream)) +} + /// Determine the file compression type. Supports gzip and zstd. fn detect_compression>(path: P) -> Option { - let file = File::open(path.as_ref()).with_context(|| format!("cannot open file: {:?}", path.as_ref())).unwrap(); + let file = File::open(path.as_ref()) + .with_context(|| format!("cannot open file: {:?}", path.as_ref())) + .unwrap(); if flate2::read::MultiGzDecoder::new(file).header().is_some() { Some(Compression::Gzip) } else if let Some(ext) = path.as_ref().extension() { @@ -63,35 +145,17 @@ impl TryFrom<&PathBuf> for Compression { } } -pub fn open_file_for_write>( - filename: P, - compression: Option, - compression_level: Option, - num_threads: u32, -) -> Result> { - let buffer = BufWriter::new( - File::create(&filename).with_context(|| format!("cannot create file: {}", filename.as_ref().display()))? - ); - let writer: Box = match compression { - None => Box::new(buffer), - Some(Compression::Gzip) => Box::new(flate2::write::GzEncoder::new(buffer, flate2::Compression::new(compression_level.unwrap_or(6)))), - Some(Compression::Zstd) => { - let mut zstd = zstd::stream::Encoder::new(buffer, compression_level.unwrap_or(9) as i32)?; - zstd.multithread(num_threads)?; - Box::new(zstd.auto_finish()) - }, - }; - Ok(writer) -} - pub fn rev_compl(seq: &[u8]) -> Vec { - seq.iter().rev().map(|&x| match x { - b'A' => b'T', - b'T' => b'A', - b'C' => b'G', - b'G' => b'C', - _ => x, - }).collect() + seq.iter() + .rev() + .map(|&x| match x { + b'A' => b'T', + b'T' => b'A', + b'C' => b'G', + b'G' => b'C', + _ => x, + }) + .collect() } pub fn normalize_path, P2: AsRef>(work_dir: P1, path: P2) -> Result { @@ -101,10 +165,15 @@ pub fn normalize_path, P2: AsRef>(work_dir: P1, path: P2) } else { work_dir.as_ref().join(path) }; - result.canonicalize().with_context(|| format!("Failed to normalize path: {:?}", &result)) + result + .canonicalize() + .with_context(|| format!("Failed to normalize path: {:?}", &result)) } -pub fn unnormalize_path, P2: AsRef>(work_dir: P1, path: P2) -> Result { +pub fn unnormalize_path, P2: AsRef>( + work_dir: P1, + path: P2, +) -> Result { let work_dir = work_dir.as_ref().canonicalize()?; let path = path.as_ref().canonicalize()?; Ok(relative_path(work_dir.as_path(), path.as_path())) @@ -174,4 +243,4 @@ mod tests { let to = Path::new("/a/b/c"); assert_eq!(relative_path(from, to), PathBuf::from("")); } -} \ No newline at end of file +}