From be5bc177a0aad427503e6b8f34a5e1428f01e3b6 Mon Sep 17 00:00:00 2001 From: Kai Zhang Date: Sun, 27 Oct 2024 13:42:47 +0800 Subject: [PATCH 1/2] generalize to more than 2 barcodes --- precellar/src/align.rs | 579 +++++++++++++++++++++------------------ precellar/src/barcode.rs | 15 +- python/src/lib.rs | 59 +++- python/src/utils.rs | 1 - seqspec/src/lib.rs | 2 +- seqspec/src/read.rs | 10 +- seqspec/src/region.rs | 4 +- 7 files changed, 385 insertions(+), 285 deletions(-) diff --git a/precellar/src/align.rs b/precellar/src/align.rs index 300b2e5..21715d7 100644 --- a/precellar/src/align.rs +++ b/precellar/src/align.rs @@ -1,16 +1,16 @@ -use crate::barcode::{BarcodeCorrector, Whitelist}; +use crate::barcode::{BarcodeCorrector, OligoFrequncy, Whitelist}; use crate::qc::{AlignQC, Metrics}; use anyhow::{bail, Result}; use bstr::BString; use bwa_mem2::BurrowsWheelerAligner; use either::Either; +use indexmap::IndexMap; use kdam::{tqdm, BarExt}; use log::info; use noodles::sam::alignment::record_buf::data::field::value::Value; use noodles::sam::alignment::{record::data::field::tag::Tag, record_buf::RecordBuf, Record}; use noodles::{bam, fastq, sam}; -use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; -use seqspec::{Assay, Modality, Read, RegionIndex, RegionType, SequenceType}; +use seqspec::{Assay, Modality, Read, RegionId, RegionIndex, RegionType}; use smallvec::SmallVec; use std::collections::{HashMap, HashSet}; use std::io::BufRead; @@ -33,6 +33,32 @@ pub trait Alinger { ) -> impl ExactSizeIterator; } +pub struct DummyAligner; + +impl Alinger for DummyAligner { + fn chunk_size(&self) -> usize { + 0 + } + + fn header(&self) -> sam::Header { + sam::Header::default() + } + + fn align_reads( + &mut self, + _records: &mut [fastq::Record], + ) -> impl ExactSizeIterator { + std::iter::empty() + } + + fn align_read_pairs( + &mut self, + _records: &mut [(fastq::Record, fastq::Record)], + ) -> impl ExactSizeIterator { + std::iter::empty() + } +} + impl Alinger for BurrowsWheelerAligner { fn chunk_size(&self) -> usize { self.chunk_size() @@ -121,23 +147,10 @@ impl FastqProcessor { pub fn gen_barcoded_alignments( &mut self, ) -> impl Iterator, Vec<(RecordBuf, RecordBuf)>>> + '_ { - info!("Counting barcodes..."); - // let whitelist = self.count_barcodes().unwrap(); - let whitelists = self.count_barcodes().unwrap(); - for (i, whitelist) in whitelists.iter().enumerate() { - info!( - "Found {} barcodes. {:.2}% of them have an exact match in whitelist_{}", - whitelist.total_count, - whitelist.frac_exact_match() * 100.0, - i + 1 - ); - } + let fq_reader = self.gen_barcoded_fastq(true); + let is_paired = fq_reader.is_paired_end(); info!("Aligning reads..."); - let fq_records = self.gen_raw_fastq_records(); - let is_paired = fq_records.is_paired_end(); - let corrector = - BarcodeCorrector::default().with_bc_confidence_threshold(self.barcode_correct_prob); let header = self.aligner.header(); self.align_qc.insert( self.modality(), @@ -148,98 +161,36 @@ impl FastqProcessor { ); let align_qc = self.align_qc.get(&self.modality()).unwrap().clone(); - // An auxiliary function to process two barcodes - fn process_two_barcodes<'a>( - corrector: &'a BarcodeCorrector, - whitelists: &'a [Whitelist], - barcode: &'a [Barcode], - mismatch_in_barcode: usize, - ) -> (Option>, String, String) { - let corrected_barcode_1 = corrector - .correct( - whitelists[0].get_barcode_counts(), - barcode[0].sequence(), - barcode[0].quality_scores(), - mismatch_in_barcode, - ) - .ok(); - let corrected_barcode_2 = corrector - .correct( - whitelists[1].get_barcode_counts(), - barcode[1].sequence(), - barcode[1].quality_scores(), - mismatch_in_barcode, - ) - .ok(); - let concat_ori_barcode = barcode - .iter() - .map(|x| String::from_utf8_lossy(x.sequence())) - .collect::>() - .join("|"); - let concat_ori_qual = barcode - .iter() - .map(|x| String::from_utf8_lossy(x.quality_scores())) - .collect::>() - .join("|"); - let concat_corrected_barcode = [corrected_barcode_1, corrected_barcode_2] - .iter() - .map(|x| { - x.map_or_else( - || "N".to_string(), - |y| String::from_utf8_lossy(y).to_string(), - ) - }) - .collect::>() - .join("-"); - let concat_corrected_barcode = if concat_corrected_barcode == "N-N" { - None - } else { - Some(concat_corrected_barcode.into_bytes()) - }; - ( - concat_corrected_barcode, - concat_ori_barcode, - concat_ori_qual, - ) - } - - let mut progress_bar = tqdm!(total = whitelists[0].total_count); - fq_records - .chunk(self.aligner.chunk_size()) + let mut progress_bar = tqdm!(total = fq_reader.total_reads.unwrap_or(0)); + let fq_reader = VectorChunk::new(fq_reader, self.aligner.chunk_size()); + fq_reader .map(move |data| { if is_paired { let (barcodes, mut reads): (Vec<_>, Vec<_>) = data .into_iter() - .map(|(barcode, (read1, read2))| { - (barcode, (read1.unwrap(), read2.unwrap())) + .map(|rec| { + (rec.barcode.unwrap(), (rec.read1.unwrap(), rec.read2.unwrap())) }) .unzip(); let alignments: Vec<_> = self.aligner.align_read_pairs(&mut reads).collect(); let results = barcodes - .into_par_iter() + .into_iter() .zip(alignments) .map(|(barcode, (ali1, ali2))| { - let (concat_corrected_barcode, concat_ori_barcode, concat_ori_qual) = - process_two_barcodes( - &corrector, - &whitelists, - &barcode, - self.mismatch_in_barcode, - ); let ali1_ = add_cell_barcode( &header, &ali1, - concat_ori_barcode.as_bytes(), - concat_ori_qual.as_bytes(), - concat_corrected_barcode.as_deref(), + barcode.raw.sequence(), + barcode.raw.quality_scores(), + barcode.corrected.as_deref(), ) .unwrap(); let ali2_ = add_cell_barcode( &header, &ali2, - concat_ori_barcode.as_bytes(), - concat_ori_qual.as_bytes(), - concat_corrected_barcode.as_deref(), + barcode.raw.sequence(), + barcode.raw.quality_scores(), + barcode.corrected.as_deref(), ) .unwrap(); { @@ -255,27 +206,19 @@ impl FastqProcessor { } else { let (barcodes, mut reads): (Vec<_>, Vec<_>) = data .into_iter() - .map(|(barcode, (read1, _))| (barcode, read1.unwrap())) + .map(|rec| (rec.barcode.unwrap(), rec.read1.unwrap())) .unzip(); let alignments: Vec<_> = self.aligner.align_reads(&mut reads).collect(); let results = barcodes - .into_par_iter() + .into_iter() .zip(alignments) .map(|(barcode, alignment)| { - let corrected_barcode = corrector - .correct( - whitelists[0].get_barcode_counts(), - barcode[0].sequence(), - barcode[0].quality_scores(), - self.mismatch_in_barcode, - ) - .ok(); let ali = add_cell_barcode( &header, &alignment, - barcode[0].sequence(), - barcode[0].quality_scores(), - corrected_barcode, + barcode.raw.sequence(), + barcode.raw.quality_scores(), + barcode.corrected.as_deref(), ) .unwrap(); { @@ -290,53 +233,45 @@ impl FastqProcessor { }) } - pub fn gen_raw_alignments( - &mut self, - ) -> impl Iterator, Vec<(sam::Record, sam::Record)>>> + '_ { - let fq_records = self.gen_raw_fastq_records(); - let is_paired = fq_records.is_paired_end(); - fq_records - .chunk(self.aligner.chunk_size()) - .map(move |data| { - if is_paired { - let mut reads: Vec<_> = data - .into_iter() - .map(|(_, (read1, read2))| (read1.unwrap(), read2.unwrap())) - .collect(); - let alignments = self.aligner.align_read_pairs(&mut reads).collect(); - Either::Right(alignments) - } else { - let mut reads: Vec<_> = data - .into_iter() - .map(|(_, (read1, _))| read1.unwrap()) - .collect(); - let alignments = self.aligner.align_reads(reads.as_mut()).collect(); - Either::Left(alignments) - } - }) - } - - pub fn gen_raw_fastq_records(&self) -> FastqRecords { + pub fn gen_barcoded_fastq(&mut self, correct_barcode: bool) -> AnnotatedFastqReader { let modality = self.modality(); - let data = self + + let whitelists = if correct_barcode { + info!("Counting barcodes..."); + // let whitelist = self.count_barcodes().unwrap(); + let whitelists = self.count_barcodes().unwrap(); + for (id, whitelist) in whitelists.iter() { + info!( + "Found {} barcodes. {:.2}% of them have an exact match in whitelist {}", + whitelist.total_count, + whitelist.frac_exact_match() * 100.0, + id + ); + } + whitelists + } else { + IndexMap::new() + }; + + let corrector = BarcodeCorrector::default() + .with_max_missmatch(self.mismatch_in_barcode) + .with_bc_confidence_threshold(self.barcode_correct_prob); + + let mut fq_reader: AnnotatedFastqReader = self .assay .get_index_by_modality(modality) .filter_map(|(read, index)| { - let regions: Vec<_> = index - .index - .into_iter() - .filter(|x| x.1.is_barcode() || x.1.is_target()) - .collect(); - if regions.is_empty() { - None - } else { - Some((read, regions, read.open().unwrap())) - } - }); - FastqRecords::new(data) + let annotator = FastqAnnotator::new(read, index, &whitelists, corrector.clone())?; + Some((annotator, read.open().unwrap())) + }) + .collect(); + if !whitelists.is_empty() { + fq_reader.total_reads = Some(whitelists[0].total_count); + } + fq_reader } - fn count_barcodes(&mut self) -> Result> { + fn count_barcodes(&mut self) -> Result> { let modality = self.modality(); let mut whitelists = self.get_whitelists()?; @@ -373,194 +308,287 @@ impl FastqProcessor { self.metrics.entry(modality).or_default().insert( "frac_q30_bases_barcode".to_string(), - whitelists.iter().map(|x| x.frac_q30_bases()).sum::() / whitelists.len() as f64, + whitelists.values().map(|x| x.frac_q30_bases()).sum::() / whitelists.len() as f64, ); Ok(whitelists) } - fn get_whitelists(&self) -> Result> { + fn get_whitelists(&self) -> Result> { let regions = self .assay .library_spec .get_modality(&self.modality()) .unwrap() .read() - .unwrap(); - let regions: Vec<_> = regions + .map_err(|_| anyhow::anyhow!("Cannot obtain lock"))?; + regions .subregions .iter() - .filter(|r| r.read().unwrap().region_type.is_barcode()) - .collect(); - - let mut whitelist = Vec::new(); - - if regions.len() == 1 || regions.len() == 2 { - for region in regions { - if region.read().unwrap().sequence_type == SequenceType::Onlist { - whitelist.push(Whitelist::new( - region.read().unwrap().onlist.as_ref().unwrap().read()?, - )); + .filter_map(|r| { + let r = r.read().unwrap(); + if r.region_type.is_barcode() { + if let Some(onlist) = r.onlist.as_ref() { + let list = onlist.read().map(|list| + (r.region_id.to_string(), Whitelist::new(list)) + ); + Some(list) + } else { + None + } } else { - whitelist.push(Whitelist::empty()); + None } - } - Ok(whitelist) - } else { - bail!("No barcode region found or too many barcode regions found"); - } + }).collect() } } -pub struct FastqRecords(Vec>); - -struct FastqRecord { - id: String, - is_reverse: bool, - subregion: Vec<(RegionType, Range)>, - reader: fastq::Reader, - min_len: usize, - max_len: usize, +pub struct AnnotatedFastqReader { + buffer: fastq::Record, + total_reads: Option, + inner: Vec<(FastqAnnotator, fastq::Reader>)>, } -pub type Barcode = fastq::Record; -pub type UMI = fastq::Record; - -impl FastqRecords { - fn new<'a, I>(iter: I) -> Self - where - I: Iterator< - Item = ( - &'a Read, - Vec<(String, RegionType, Range)>, - fastq::Reader, - ), - >, - { - let records = iter - .map(|(read, regions, reader)| FastqRecord { - id: read.read_id.to_string(), - is_reverse: read.is_reverse(), - subregion: regions.into_iter().map(|x| (x.1, x.2)).collect(), - reader, - min_len: read.min_len as usize, - max_len: read.max_len as usize, - }) - .collect(); - Self(records) - } - - fn chunk(self, chunk_size: usize) -> FastqRecordChunk { - FastqRecordChunk { - fq: self, - chunk_size, - } - } - - fn is_paired_end(&self) -> bool { - let mut read1 = false; - let mut read2 = false; - self.0.iter().for_each(|x| { - x.subregion.iter().for_each(|(region_type, _)| { +impl AnnotatedFastqReader { + pub fn is_paired_end(&self) -> bool { + let mut has_read1 = false; + let mut has_read2 = false; + self.inner.iter().for_each(|x| { + x.0.subregions.iter().for_each(|(_, region_type, _)| { if region_type.is_target() { - if x.is_reverse { - read1 = true; + if x.0.is_reverse { + has_read1 = true; } else { - read2 = true; + has_read2 = true; } } }); }); - read1 && read2 + has_read1 && has_read2 } } -impl Iterator for FastqRecords { - type Item = (Vec, (Option, Option)); +impl FromIterator<(FastqAnnotator, fastq::Reader>)> for AnnotatedFastqReader { + fn from_iter>)>>(iter: T) -> Self { + Self { + buffer: fastq::Record::default(), + total_reads: None, + inner: iter.into_iter().collect(), + } + } +} + +impl Iterator for AnnotatedFastqReader { + type Item = AnnotatedRecord; fn next(&mut self) -> Option { - let mut id_without_record = Vec::new(); + let mut missing = None; let records: SmallVec<[_; 4]> = self - .0 + .inner .iter_mut() - .map(|x| { - let mut record = fastq::Record::default(); - if x.reader - .read_record(&mut record) + .flat_map(|(annotator, reader)| { + if reader + .read_record(&mut self.buffer) .expect("error reading fastq record") == 0 { - id_without_record.push(x.id.as_str()); + missing = Some(annotator.id.as_str()); None } else { - let n = record.sequence().len(); - if n < x.min_len || n > x.max_len { - panic!( - "Read length ({}) out of range: {}-{}", - n, x.min_len, x.max_len - ); - } - Some(record) + Some(annotator.annotate(&self.buffer).unwrap()) } }) .collect(); - if id_without_record.len() == records.len() { - return None; - } else if !id_without_record.is_empty() { - panic!( - "Missing records in these files: {}", - id_without_record.join(",") - ); + + if records.is_empty() { + return None + } else if missing.is_some() { + panic!("Missing records in this file: {}", missing.unwrap()); + } else { + Some(records.into_iter().reduce(|mut this, other| { + this.join(other); + this + }).unwrap()) + } + + } +} + +/// A FastqAnnotator that splits the reads into subregions, e.g., barcode, UMI, and +/// return annotated reads. +struct FastqAnnotator { + whitelists: IndexMap, + corrector: BarcodeCorrector, + id: String, + is_reverse: bool, + subregions: Vec<(String, RegionType, Range)>, + min_len: usize, + max_len: usize, +} + +impl<'a> FastqAnnotator { + pub fn new(read: &Read, index: RegionIndex, whitelists: &IndexMap, corrector: BarcodeCorrector) -> Option { + let subregions: Vec<_> = index.index.into_iter() + .filter(|x| x.1.is_barcode() || x.1.is_target()) // only barcode and target regions + .collect(); + if subregions.is_empty() { + None + } else { + let whitelists = subregions.iter() + .flat_map(|(id, _, _)| { + let v = whitelists.get(id)?; + Some((id.clone(), v.get_barcode_counts().clone())) + }).collect(); + let anno = Self { + whitelists, + corrector, + id: read.read_id.clone(), + is_reverse: read.is_reverse(), + subregions, + min_len: read.min_len as usize, + max_len: read.max_len as usize, + }; + Some(anno) } + } - let mut barcode = Vec::new(); + fn annotate(&self, record: &fastq::Record) -> Result { + let n = record.sequence().len(); + if n < self.min_len || n > self.max_len { + bail!("Read length ({}) out of range: {}-{}", n, self.min_len, self.max_len); + } + + let mut barcode: Option = None; + let mut umi = None; let mut read1 = None; let mut read2 = None; - records.iter().enumerate().for_each(|(i, r)| { - let record = r.as_ref().unwrap(); - self.0[i].subregion.iter().for_each(|(region_type, range)| { - // println!( - // "{}:{:?} Region type: {:?}", - // i, self.0[i].is_reverse, region_type - // ); - let fq = slice_fastq_record(record, range.start as usize, range.end as usize); - let is_reverse = self.0[i].is_reverse; - if region_type.is_barcode() { - if is_reverse { - // barcode = Some(rev_compl_fastq_record(fq)); - barcode.push(rev_compl_fastq_record(fq)); + self.subregions.iter().for_each(|(id, region_type, range)| { + let mut fq = slice_fastq_record(&record, range.start as usize, range.end as usize); + if region_type.is_barcode() { + if self.is_reverse { + fq = rev_compl_fastq_record(fq); + } + let corrected = self.whitelists.get(id).map_or(Some(fq.sequence().to_vec()), |counts| + self.corrector.correct( + counts, + fq.sequence(), + fq.quality_scores(), + ).ok().map(|x| x.to_vec()) + ); + if let Some(bc) = &mut barcode { + bc.extend(&Barcode { raw: fq, corrected }); + } else { + barcode = Some(Barcode { raw: fq, corrected }); + } + } else if region_type.is_target() { + if self.is_reverse { + if let Some(s) = &mut read2 { + extend_fastq_record(s, &fq); } else { - // barcode = Some(fq); - barcode.push(fq); + read2 = Some(fq); } - } else if region_type.is_target() { - if is_reverse { - read1 = Some(fq); + } else { + if let Some(s) = &mut read1 { + extend_fastq_record(s, &fq); } else { - read2 = Some(fq); + read1 = Some(fq); } } - }); + } }); - // let barcode = barcode.expect("barcode should be present"); - Some((barcode, (read1, read2))) + Ok(AnnotatedRecord { barcode, umi, read1, read2 }) } } -pub struct FastqRecordChunk { - fq: FastqRecords, +pub struct Barcode { + pub raw: fastq::Record, + pub corrected: Option>, +} + +impl Barcode { + pub fn extend(&mut self, other: &Self) { + extend_fastq_record(&mut self.raw, &other.raw); + if let Some(c2) = &other.corrected { + if let Some(c1) = &mut self.corrected { + c1.extend_from_slice(c2); + } + } else { + self.corrected = None; + } + } +} + +pub type UMI = fastq::Record; + +/// An annotated fastq record with barcode, UMI, and sequence. +pub struct AnnotatedRecord { + pub barcode: Option, + pub umi: Option, + pub read1: Option, + pub read2: Option, +} + +impl AnnotatedRecord { + pub fn len(&self) -> usize { + self.read1.as_ref().map_or(0, |x| x.sequence().len()) + + self.read2.as_ref().map_or(0, |x| x.sequence().len()) + } +} + +impl AnnotatedRecord { + pub fn join(&mut self, other: Self) { + if let Some(bc) = &mut self.barcode { + other.barcode.as_ref().map(|x| bc.extend(x)); + } else { + self.barcode = other.barcode; + } + + if let Some(umi) = &mut self.umi { + other.umi.as_ref().map(|x| extend_fastq_record(umi, x)); + } else { + self.umi = other.umi; + } + + if self.read1.is_some() { + if other.read1.is_some() { + panic!("Read1 already exists"); + } + } else { + self.read1 = other.read1; + } + + if self.read2.is_some() { + if other.read2.is_some() { + panic!("Read2 already exists"); + } + } else { + self.read2 = other.read2; + } + } +} + +pub struct VectorChunk { + inner: I, chunk_size: usize, } -impl Iterator for FastqRecordChunk { - type Item = Vec<(Vec, (Option, Option))>; +impl VectorChunk { + pub fn new(inner: I, chunk_size: usize) -> Self { + Self { + inner, + chunk_size, + } + } +} + +impl> Iterator for VectorChunk { + type Item = Vec; fn next(&mut self) -> Option { let mut chunk = Vec::new(); let mut accumulated_length = 0; - for record in self.fq.by_ref() { - accumulated_length += record.1 .0.as_ref().map_or(0, |x| x.sequence().len()) - + record.1 .1.as_ref().map_or(0, |x| x.sequence().len()); + for record in self.inner.by_ref() { + accumulated_length += record.len(); chunk.push(record); if accumulated_length >= self.chunk_size { break; @@ -606,6 +634,11 @@ fn slice_fastq_record(record: &fastq::Record, start: usize, end: usize) -> fastq ) } +pub fn extend_fastq_record(this: &mut fastq::Record, other: &fastq::Record) { + this.sequence_mut().extend_from_slice(other.sequence()); + this.quality_scores_mut().extend_from_slice(other.quality_scores()); +} + fn rev_compl_fastq_record(mut record: fastq::Record) -> fastq::Record { *record.quality_scores_mut() = record.quality_scores().iter().rev().copied().collect(); *record.sequence_mut() = record diff --git a/precellar/src/barcode.rs b/precellar/src/barcode.rs index aac326f..64c4e07 100644 --- a/precellar/src/barcode.rs +++ b/precellar/src/barcode.rs @@ -269,6 +269,7 @@ impl Whitelist { } /// A barcode validator that uses a barcode counter to validate barcodes. +#[derive(Debug, Clone)] pub struct BarcodeCorrector { /// threshold for sum of probability of error on barcode QVs. Barcodes exceeding /// this threshold will be marked as not valid. @@ -276,6 +277,8 @@ pub struct BarcodeCorrector { /// if the posterior probability of a correction /// exceeds this threshold, the barcode will be corrected. bc_confidence_threshold: f64, + /// The number of mismatches allowed in barcode + max_mismatch: usize, } impl Default for BarcodeCorrector { @@ -283,6 +286,7 @@ impl Default for BarcodeCorrector { Self { max_expected_errors: f64::MAX, bc_confidence_threshold: 0.975, + max_mismatch: 1, } } } @@ -292,6 +296,11 @@ impl BarcodeCorrector { self.bc_confidence_threshold = threshold; self } + + pub fn with_max_missmatch(mut self, max_mismatch: usize) -> Self { + self.max_mismatch = max_mismatch; + self + } } #[derive(Copy, Clone, Debug)] @@ -313,14 +322,13 @@ impl BarcodeCorrector { barcode_counts: &'a OligoFrequncy, barcode: &'a [u8], qual: &[u8], - n_mismatch: usize, ) -> Result<&[u8], BarcodeError> { let expected_errors: f64 = qual.iter().map(|&q| error_probability(q)).sum(); if expected_errors >= self.max_expected_errors { return Err(BarcodeError::ExceedExpectedError(expected_errors)); } - let (bc, prob) = barcode_counts.likelihood(barcode, qual, n_mismatch); + let (bc, prob) = barcode_counts.likelihood(barcode, qual, self.max_mismatch); if prob <= 0.0 { Err(BarcodeError::NoMatch) } else if prob >= self.bc_confidence_threshold { @@ -331,9 +339,6 @@ impl BarcodeCorrector { } } -/// Barcode correction problem: Given a whitelist of barcodes, and a sequenced barcode with quality scores, -/// decide which barcode in the whitelist generated the sequenced barcode. - /// Convert Illumina quality scores to base-calling error probabilities, i.e., /// the probability of an incorrect base call. #[inline(always)] diff --git a/python/src/lib.rs b/python/src/lib.rs index ae1c7d3..292d5f5 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1,9 +1,11 @@ mod utils; mod pyseqspec; -use std::{collections::HashMap, path::PathBuf, str::FromStr}; +use noodles::fastq::io::Writer; +use std::{io::BufWriter, collections::HashMap, path::PathBuf, str::FromStr}; use bwa_mem2::{AlignerOpts, BurrowsWheelerAligner, FMIndex, PairedEndStats}; use either::Either; +use ::precellar::align::{extend_fastq_record, DummyAligner}; use pyo3::prelude::*; use anyhow::Result; use noodles::{bgzf, sam::alignment::io::Write}; @@ -250,6 +252,60 @@ fn make_fragment( Ok(report.into()) } +#[pyfunction] +#[pyo3( + signature = (assay, *, modality, out_dir), + text_signature = "(assay, *, modality, out_dir)", +)] +fn make_fastq( + py: Python<'_>, + assay: Bound<'_, PyAny>, + modality: &str, + out_dir: PathBuf, +) -> Result<()> +{ + let modality = Modality::from_str(modality).unwrap(); + let spec = match assay.extract::() { + Ok(p) => seqspec::Assay::from_path(&p).unwrap(), + _ => assay.extract::>()?.0.clone(), + }; + + let aligner = DummyAligner; + let mut processor = FastqProcessor::new(spec, aligner).with_modality(modality); + let fq_reader = processor.gen_barcoded_fastq(false); + + std::fs::create_dir_all(&out_dir)?; + let read1_fq = out_dir.join("R1.fq.zst"); + let read1_writer = open_file_for_write(read1_fq, Some(Compression::Zstd), None, 8)?; + let mut read1_writer = Writer::new(BufWriter::new(read1_writer)); + let mut read2_writer = if fq_reader.is_paired_end() { + let read2_fq = out_dir.join("R2.fq.zst"); + let read2_writer = open_file_for_write(read2_fq, Some(Compression::Zstd), None, 8)?; + let read2_writer = Writer::new(BufWriter::new(read2_writer)); + Some(read2_writer) + } else { + None + }; + + for (i, record) in fq_reader.enumerate() { + if i % 1000000 == 0 { + py.check_signals().unwrap(); + } + let mut bc = record.barcode.unwrap().raw; + if let Some(umi) = record.umi { + extend_fastq_record(&mut bc, &umi); + } + extend_fastq_record(&mut bc, &record.read1.unwrap()); + + read1_writer.write_record(&bc)?; + if let Some(writer) = &mut read2_writer { + writer.write_record(&record.read2.unwrap())?; + } + } + + Ok(()) +} + /// A Python module implemented in Rust. #[pymodule] @@ -265,6 +321,7 @@ fn precellar(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_function(wrap_pyfunction!(make_genome_index, m)?)?; m.add_function(wrap_pyfunction!(align, m)?)?; m.add_function(wrap_pyfunction!(make_fragment, m)?)?; + m.add_function(wrap_pyfunction!(make_fastq, m)?)?; utils::register_submodule(m)?; Ok(()) diff --git a/python/src/utils.rs b/python/src/utils.rs index f12086e..ec557e2 100644 --- a/python/src/utils.rs +++ b/python/src/utils.rs @@ -97,7 +97,6 @@ fn strip_barcode_from_fastq( in_fq: PathBuf, out_fq: PathBuf, regex: &str, - //out_barcode: Option, out_barcode: Option>, from_description: bool, compression: Option<&str>, diff --git a/seqspec/src/lib.rs b/seqspec/src/lib.rs index 3d942e1..852becd 100644 --- a/seqspec/src/lib.rs +++ b/seqspec/src/lib.rs @@ -9,7 +9,7 @@ pub use read::RegionIndex; pub use read::{File, Read, Strand, UrlType}; use read::{ReadSpan, ValidateResult}; use region::LibSpec; -pub use region::{Onlist, Region, RegionType, SequenceType}; +pub use region::{Onlist, Region, RegionId, RegionType, SequenceType}; use anyhow::{anyhow, bail, Result}; use serde::{Deserialize, Deserializer, Serialize}; diff --git a/seqspec/src/read.rs b/seqspec/src/read.rs index 1bc07ba..7e66397 100644 --- a/seqspec/src/read.rs +++ b/seqspec/src/read.rs @@ -83,7 +83,10 @@ impl Default for Read { } impl Read { - pub fn open(&self) -> Option> { + /// Open the fastq files for reading, and return a fastq reader. + /// If the read has multiple fastq files, they will be concatenated. + /// If the read has no fastq files, return None. + pub fn open(&self) -> Option>> { let files = self .files .clone() @@ -96,9 +99,10 @@ impl Read { } let reader = multi_reader::MultiReader::new(files.into_iter().map(move |file| file.open().unwrap())); - Some(fastq::Reader::new(BufReader::new(reader))) + Some(fastq::Reader::new(Box::new(BufReader::new(reader)))) } + /// Get the actual length of the read by reading the first record from the fastq file. pub fn actual_len(&self) -> Result { let mut reader = self.open().unwrap(); let mut record = fastq::Record::default(); @@ -155,7 +159,7 @@ impl Read { } } - /// Get the regions of the read. + /// Helper function to get the region index for a read. fn get_read_span<'a, I>(&self, mut regions: I) -> RegionIndex where I: Iterator>>, diff --git a/seqspec/src/region.rs b/seqspec/src/region.rs index f3bd7b1..1741aa3 100644 --- a/seqspec/src/region.rs +++ b/seqspec/src/region.rs @@ -106,9 +106,11 @@ impl LibSpec { } } +pub type RegionId = String; + #[derive(Deserialize, Serialize, Debug, Clone)] pub struct Region { - pub region_id: String, + pub region_id: RegionId, pub region_type: RegionType, pub name: String, pub sequence_type: SequenceType, From 733d87575cc8318e86f823ec73d0770f618ef624 Mon Sep 17 00:00:00 2001 From: Kai Zhang Date: Sun, 27 Oct 2024 14:02:13 +0800 Subject: [PATCH 2/2] handle UMIs --- docs/api.rst | 1 + precellar/src/align.rs | 38 +++++++++++++++++++++++++++++++++----- python/src/lib.rs | 13 +++++++++++++ 3 files changed, 47 insertions(+), 5 deletions(-) diff --git a/docs/api.rst b/docs/api.rst index 8c6e08a..be793a5 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -24,6 +24,7 @@ Core functions make_genome_index align make_fragment + make_fastq Utilities ~~~~~~~~~ diff --git a/precellar/src/align.rs b/precellar/src/align.rs index 21715d7..85af6b6 100644 --- a/precellar/src/align.rs +++ b/precellar/src/align.rs @@ -349,6 +349,32 @@ pub struct AnnotatedFastqReader { } impl AnnotatedFastqReader { + pub fn get_all_barcodes(&self) -> Vec<(&str, usize)> { + self.inner + .iter() + .flat_map(|(annotator, _)| { + annotator + .subregions + .iter() + .filter(|(_, region_type, _)| region_type.is_barcode()) + .map(|(id, _, r)| (id.as_str(), r.len())) + }) + .collect() + } + + pub fn get_all_umi(&self) -> Vec<(&str, usize)> { + self.inner + .iter() + .flat_map(|(annotator, _)| { + annotator + .subregions + .iter() + .filter(|(_, region_type, _)| region_type.is_umi()) + .map(|(id, _, r)| (id.as_str(), r.len())) + }) + .collect() + } + pub fn is_paired_end(&self) -> bool { let mut has_read1 = false; let mut has_read2 = false; @@ -428,7 +454,7 @@ struct FastqAnnotator { impl<'a> FastqAnnotator { pub fn new(read: &Read, index: RegionIndex, whitelists: &IndexMap, corrector: BarcodeCorrector) -> Option { let subregions: Vec<_> = index.index.into_iter() - .filter(|x| x.1.is_barcode() || x.1.is_target()) // only barcode and target regions + .filter(|x| x.1.is_barcode() || x.1.is_umi() || x.1.is_target()) // only barcode and target regions .collect(); if subregions.is_empty() { None @@ -463,10 +489,12 @@ impl<'a> FastqAnnotator { let mut read2 = None; self.subregions.iter().for_each(|(id, region_type, range)| { let mut fq = slice_fastq_record(&record, range.start as usize, range.end as usize); - if region_type.is_barcode() { - if self.is_reverse { - fq = rev_compl_fastq_record(fq); - } + if self.is_reverse && (region_type.is_barcode() || region_type.is_umi()) { + fq = rev_compl_fastq_record(fq); + } + if region_type.is_umi() { + umi = Some(fq); + } else if region_type.is_barcode() { let corrected = self.whitelists.get(id).map_or(Some(fq.sequence().to_vec()), |counts| self.corrector.correct( counts, diff --git a/python/src/lib.rs b/python/src/lib.rs index 292d5f5..b89f5f0 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1,6 +1,7 @@ mod utils; mod pyseqspec; +use log::info; use noodles::fastq::io::Writer; use std::{io::BufWriter, collections::HashMap, path::PathBuf, str::FromStr}; use bwa_mem2::{AlignerOpts, BurrowsWheelerAligner, FMIndex, PairedEndStats}; @@ -252,6 +253,9 @@ fn make_fragment( Ok(report.into()) } +/// Generate consolidated fastq files from the sequencing specification. +/// The barcodes and UMIs are concatenated to the read 1 sequence. +/// Fixed sequences and linkers are removed. #[pyfunction] #[pyo3( signature = (assay, *, modality, out_dir), @@ -274,6 +278,15 @@ fn make_fastq( let mut processor = FastqProcessor::new(spec, aligner).with_modality(modality); let fq_reader = processor.gen_barcoded_fastq(false); + info!( + "Adding the following barcodes to Read 1: {}", + fq_reader.get_all_barcodes().into_iter().map(|(x, n)| format!("{} ({})", x, n)).join(" + "), + ); + info!( + "Adding the following UMIs to Read 1: {}", + fq_reader.get_all_umi().into_iter().map(|(x, n)| format!("{} ({})", x, n)).join(" + "), + ); + std::fs::create_dir_all(&out_dir)?; let read1_fq = out_dir.join("R1.fq.zst"); let read1_writer = open_file_for_write(read1_fq, Some(Compression::Zstd), None, 8)?;