From 8d18e00b212fbb6057c9e0ea30b3682f4af94cfc Mon Sep 17 00:00:00 2001 From: Kai Zhang Date: Wed, 6 Nov 2024 07:45:44 +0800 Subject: [PATCH] `AlignQC` doesn't need Mutex --- precellar/src/align.rs | 239 +++++++++++++++++++++++++---------------- 1 file changed, 149 insertions(+), 90 deletions(-) diff --git a/precellar/src/align.rs b/precellar/src/align.rs index bb46fd1..4fe924c 100644 --- a/precellar/src/align.rs +++ b/precellar/src/align.rs @@ -7,21 +7,24 @@ use either::Either; use indexmap::IndexMap; use kdam::{tqdm, BarExt}; use log::info; -use noodles::sam::alignment::record_buf::data::field::value::Value; -use noodles::sam::alignment::{record::data::field::tag::Tag, record_buf::RecordBuf, Record}; +use noodles::sam::alignment::record_buf::{data::field::value::Value, RecordBuf}; +use noodles::sam::alignment::{record::data::field::tag::Tag, Record}; use noodles::{bam, fastq, sam}; use rayon::iter::ParallelIterator; use seqspec::{Assay, Modality, Read, RegionId, RegionIndex, RegionType}; use smallvec::SmallVec; use star_aligner::StarAligner; -use std::collections::{HashMap, HashSet}; -use std::io::BufRead; -use std::ops::Range; -use std::sync::{Arc, Mutex}; +use std::{ + collections::{HashMap, HashSet}, + io::BufRead, + ops::Range, +}; pub trait AsIterator { type Item; - type AsIter<'a>: Iterator where Self: 'a; + type AsIter<'a>: Iterator + where + Self: 'a; fn as_iter(&self) -> Self::AsIter<'_>; } @@ -51,9 +54,17 @@ pub trait Aligner { fn header(&self) -> sam::Header; - fn align_reads(&mut self, header: &sam::Header, records: Vec) -> Vec; + fn align_reads( + &mut self, + header: &sam::Header, + records: Vec, + ) -> Vec; - fn align_read_pairs(&mut self, header: &sam::Header, records: Vec) -> Vec<(Self::AlignOutput, Self::AlignOutput)>; + fn align_read_pairs( + &mut self, + header: &sam::Header, + records: Vec, + ) -> Vec<(Self::AlignOutput, Self::AlignOutput)>; } pub struct DummyAligner; @@ -73,7 +84,11 @@ impl Aligner for DummyAligner { Vec::new() } - fn align_read_pairs(&mut self, _: &sam::Header, _: Vec) -> Vec<(Self::AlignOutput, Self::AlignOutput)> { + fn align_read_pairs( + &mut self, + _: &sam::Header, + _: Vec, + ) -> Vec<(Self::AlignOutput, Self::AlignOutput)> { Vec::new() } } @@ -89,28 +104,39 @@ impl Aligner for BurrowsWheelerAligner { self.get_sam_header() } - fn align_reads(&mut self, header: &sam::Header, records: Vec) -> Vec { + fn align_reads( + &mut self, + header: &sam::Header, + records: Vec, + ) -> Vec { let (info, mut reads): (Vec<_>, Vec<_>) = records .into_iter() .map(|rec| ((rec.barcode.unwrap(), rec.umi), rec.read1.unwrap())) .unzip(); // TODO: add UMI - self.align_reads(reads.as_mut_slice()).enumerate().map(|(i, alignment)| { - let (bc, umi) = info.get(i).unwrap(); - add_cell_barcode( - header, - &alignment, - bc.raw.sequence(), - bc.raw.quality_scores(), - bc.corrected.as_deref(), - ) - .unwrap() - }).collect() + self.align_reads(reads.as_mut_slice()) + .enumerate() + .map(|(i, alignment)| { + let (bc, umi) = info.get(i).unwrap(); + add_cell_barcode( + header, + &alignment, + bc.raw.sequence(), + bc.raw.quality_scores(), + bc.corrected.as_deref(), + ) + .unwrap() + }) + .collect() } - fn align_read_pairs(&mut self, header: &sam::Header, records: Vec) -> Vec<(Self::AlignOutput, Self::AlignOutput)> { - let (info, mut reads): (Vec<_>, Vec<_>) = records + fn align_read_pairs( + &mut self, + header: &sam::Header, + records: Vec, + ) -> Vec<(Self::AlignOutput, Self::AlignOutput)> { + let (info, mut reads): (Vec<_>, Vec<_>) = records .into_iter() .map(|rec| { ( @@ -119,26 +145,29 @@ impl Aligner for BurrowsWheelerAligner { ) }) .unzip(); - self.align_read_pairs(&mut reads).enumerate().map(|(i, (ali1, ali2))| { - let (bc, umi) = info.get(i).unwrap(); - let ali1_ = add_cell_barcode( - &header, - &ali1, - bc.raw.sequence(), - bc.raw.quality_scores(), - bc.corrected.as_deref(), - ) - .unwrap(); - let ali2_ = add_cell_barcode( - &header, - &ali2, - bc.raw.sequence(), - bc.raw.quality_scores(), - bc.corrected.as_deref(), - ) - .unwrap(); - (ali1_, ali2_) - }).collect() + self.align_read_pairs(&mut reads) + .enumerate() + .map(|(i, (ali1, ali2))| { + let (bc, umi) = info.get(i).unwrap(); + let ali1_ = add_cell_barcode( + &header, + &ali1, + bc.raw.sequence(), + bc.raw.quality_scores(), + bc.corrected.as_deref(), + ) + .unwrap(); + let ali2_ = add_cell_barcode( + &header, + &ali2, + bc.raw.sequence(), + bc.raw.quality_scores(), + bc.corrected.as_deref(), + ) + .unwrap(); + (ali1_, ali2_) + }) + .collect() } } @@ -153,7 +182,11 @@ impl Aligner for StarAligner { self.get_header().clone() } - fn align_reads(&mut self, header: &sam::Header, records: Vec) -> Vec { + fn align_reads( + &mut self, + header: &sam::Header, + records: Vec, + ) -> Vec { let (info, mut reads): (Vec<_>, Vec<_>) = records .into_iter() .map(|rec| ((rec.barcode.unwrap(), rec.umi), rec.read1.unwrap())) @@ -162,23 +195,34 @@ impl Aligner for StarAligner { // TODO: StarAligner can expose a method to align a single read instead of a batch, // so that barcode and UMI processing can be done in parallel. StarAligner::align_reads(self, reads.as_mut_slice()) - .collect::>().into_iter().enumerate().map(|(i, alignment)| { + .collect::>() + .into_iter() + .enumerate() + .map(|(i, alignment)| { let (bc, umi) = info.get(i).unwrap(); - alignment.into_iter().map(|x| - add_cell_barcode( - header, - &x, - bc.raw.sequence(), - bc.raw.quality_scores(), - bc.corrected.as_deref(), - ) - .unwrap() - ).collect() - }).collect() - } - - fn align_read_pairs(&mut self, header: &sam::Header, records: Vec) -> Vec<(Self::AlignOutput, Self::AlignOutput)> { - let (info, mut reads): (Vec<_>, Vec<_>) = records + alignment + .into_iter() + .map(|x| { + add_cell_barcode( + header, + &x, + bc.raw.sequence(), + bc.raw.quality_scores(), + bc.corrected.as_deref(), + ) + .unwrap() + }) + .collect() + }) + .collect() + } + + fn align_read_pairs( + &mut self, + header: &sam::Header, + records: Vec, + ) -> Vec<(Self::AlignOutput, Self::AlignOutput)> { + let (info, mut reads): (Vec<_>, Vec<_>) = records .into_iter() .map(|rec| { ( @@ -188,26 +232,40 @@ impl Aligner for StarAligner { }) .unzip(); StarAligner::align_read_pairs(self, &mut reads) - .collect::>().into_iter().enumerate().map(|(i, (ali1, ali2))| { + .collect::>() + .into_iter() + .enumerate() + .map(|(i, (ali1, ali2))| { let (bc, umi) = info.get(i).unwrap(); - let ali1_ = ali1.into_iter().map(|x| add_cell_barcode( - &header, - &x, - bc.raw.sequence(), - bc.raw.quality_scores(), - bc.corrected.as_deref(), - ) - .unwrap()).collect(); - let ali2_ = ali2.into_iter().map(|x| add_cell_barcode( - &header, - &x, - bc.raw.sequence(), - bc.raw.quality_scores(), - bc.corrected.as_deref(), - ) - .unwrap()).collect(); + let ali1_ = ali1 + .into_iter() + .map(|x| { + add_cell_barcode( + &header, + &x, + bc.raw.sequence(), + bc.raw.quality_scores(), + bc.corrected.as_deref(), + ) + .unwrap() + }) + .collect(); + let ali2_ = ali2 + .into_iter() + .map(|x| { + add_cell_barcode( + &header, + &x, + bc.raw.sequence(), + bc.raw.quality_scores(), + bc.corrected.as_deref(), + ) + .unwrap() + }) + .collect(); (ali1_, ali2_) - }).collect() + }) + .collect() } } @@ -217,7 +275,7 @@ pub struct FastqProcessor { current_modality: Option, mito_dna: HashSet, metrics: HashMap, - align_qc: HashMap>>, + align_qc: HashMap, barcode_correct_prob: f64, // if the posterior probability of a correction // exceeds this threshold, the barcode will be corrected. // cellrange uses 0.975 for ATAC and 0.9 for multiome. @@ -267,44 +325,45 @@ impl FastqProcessor { .get(&self.modality()) .map_or(Metrics::default(), |x| x.clone()); if let Some(align_qc) = self.align_qc.get(&self.modality()) { - align_qc.lock().unwrap().report(&mut metrics); + align_qc.report(&mut metrics); } metrics } pub fn gen_barcoded_alignments( &mut self, - ) -> impl Iterator, Vec<(A::AlignOutput, A::AlignOutput)>>> + '_ { + ) -> impl Iterator, Vec<(A::AlignOutput, A::AlignOutput)>>> + '_ + { let fq_reader = self.gen_barcoded_fastq(true); let is_paired = fq_reader.is_paired_end(); + let modality = self.modality(); info!("Aligning reads..."); let header = self.aligner.header(); self.align_qc.insert( - self.modality(), - Arc::new(Mutex::new(AlignQC { + modality, + AlignQC { mito_dna: self.mito_dna.clone(), ..AlignQC::default() - })), + }, ); - let align_qc = self.align_qc.get(&self.modality()).unwrap().clone(); let mut progress_bar = tqdm!(total = fq_reader.total_reads.unwrap_or(0)); let fq_reader = VectorChunk::new(fq_reader, self.aligner.chunk_size()); fq_reader.map(move |data| { - let mut align_qc_lock = align_qc.lock().unwrap(); + let align_qc = self.align_qc.get_mut(&modality).unwrap(); if is_paired { let results: Vec<_> = self.aligner.align_read_pairs(&header, data); results.iter().for_each(|(ali1, ali2)| { - ali1.as_iter().for_each(|x| align_qc_lock.update(x, &header)); - ali2.as_iter().for_each(|x| align_qc_lock.update(x, &header)); + ali1.as_iter().for_each(|x| align_qc.update(x, &header)); + ali2.as_iter().for_each(|x| align_qc.update(x, &header)); }); progress_bar.update(results.len()).unwrap(); Either::Right(results) } else { let results: Vec<_> = self.aligner.align_reads(&header, data); results.iter().for_each(|ali| { - ali.as_iter().for_each(|x| align_qc_lock.update(x, &header)); + ali.as_iter().for_each(|x| align_qc.update(x, &header)); }); progress_bar.update(results.len()).unwrap(); Either::Left(results)