Skip to content

Commit

Permalink
AlignQC doesn't need Mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
kaizhang committed Nov 5, 2024
1 parent a545e5e commit 8d18e00
Showing 1 changed file with 149 additions and 90 deletions.
239 changes: 149 additions & 90 deletions precellar/src/align.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,24 @@ use either::Either;
use indexmap::IndexMap;
use kdam::{tqdm, BarExt};
use log::info;
use noodles::sam::alignment::record_buf::data::field::value::Value;
use noodles::sam::alignment::{record::data::field::tag::Tag, record_buf::RecordBuf, Record};
use noodles::sam::alignment::record_buf::{data::field::value::Value, RecordBuf};
use noodles::sam::alignment::{record::data::field::tag::Tag, Record};
use noodles::{bam, fastq, sam};
use rayon::iter::ParallelIterator;
use seqspec::{Assay, Modality, Read, RegionId, RegionIndex, RegionType};
use smallvec::SmallVec;
use star_aligner::StarAligner;
use std::collections::{HashMap, HashSet};
use std::io::BufRead;
use std::ops::Range;
use std::sync::{Arc, Mutex};
use std::{
collections::{HashMap, HashSet},
io::BufRead,
ops::Range,
};

pub trait AsIterator {
type Item;
type AsIter<'a>: Iterator<Item = &'a Self::Item> where Self: 'a;
type AsIter<'a>: Iterator<Item = &'a Self::Item>
where
Self: 'a;

fn as_iter(&self) -> Self::AsIter<'_>;
}
Expand Down Expand Up @@ -51,9 +54,17 @@ pub trait Aligner {

fn header(&self) -> sam::Header;

fn align_reads(&mut self, header: &sam::Header, records: Vec<AnnotatedRecord>) -> Vec<Self::AlignOutput>;
fn align_reads(
&mut self,
header: &sam::Header,
records: Vec<AnnotatedRecord>,
) -> Vec<Self::AlignOutput>;

fn align_read_pairs(&mut self, header: &sam::Header, records: Vec<AnnotatedRecord>) -> Vec<(Self::AlignOutput, Self::AlignOutput)>;
fn align_read_pairs(
&mut self,
header: &sam::Header,
records: Vec<AnnotatedRecord>,
) -> Vec<(Self::AlignOutput, Self::AlignOutput)>;
}

pub struct DummyAligner;
Expand All @@ -73,7 +84,11 @@ impl Aligner for DummyAligner {
Vec::new()
}

fn align_read_pairs(&mut self, _: &sam::Header, _: Vec<AnnotatedRecord>) -> Vec<(Self::AlignOutput, Self::AlignOutput)> {
fn align_read_pairs(
&mut self,
_: &sam::Header,
_: Vec<AnnotatedRecord>,
) -> Vec<(Self::AlignOutput, Self::AlignOutput)> {
Vec::new()
}
}
Expand All @@ -89,28 +104,39 @@ impl Aligner for BurrowsWheelerAligner {
self.get_sam_header()
}

fn align_reads(&mut self, header: &sam::Header, records: Vec<AnnotatedRecord>) -> Vec<Self::AlignOutput> {
fn align_reads(
&mut self,
header: &sam::Header,
records: Vec<AnnotatedRecord>,
) -> Vec<Self::AlignOutput> {
let (info, mut reads): (Vec<_>, Vec<_>) = records
.into_iter()
.map(|rec| ((rec.barcode.unwrap(), rec.umi), rec.read1.unwrap()))
.unzip();

// TODO: add UMI
self.align_reads(reads.as_mut_slice()).enumerate().map(|(i, alignment)| {
let (bc, umi) = info.get(i).unwrap();
add_cell_barcode(
header,
&alignment,
bc.raw.sequence(),
bc.raw.quality_scores(),
bc.corrected.as_deref(),
)
.unwrap()
}).collect()
self.align_reads(reads.as_mut_slice())
.enumerate()
.map(|(i, alignment)| {
let (bc, umi) = info.get(i).unwrap();
add_cell_barcode(
header,
&alignment,
bc.raw.sequence(),
bc.raw.quality_scores(),
bc.corrected.as_deref(),
)
.unwrap()
})
.collect()
}

fn align_read_pairs(&mut self, header: &sam::Header, records: Vec<AnnotatedRecord>) -> Vec<(Self::AlignOutput, Self::AlignOutput)> {
let (info, mut reads): (Vec<_>, Vec<_>) = records
fn align_read_pairs(
&mut self,
header: &sam::Header,
records: Vec<AnnotatedRecord>,
) -> Vec<(Self::AlignOutput, Self::AlignOutput)> {
let (info, mut reads): (Vec<_>, Vec<_>) = records
.into_iter()
.map(|rec| {
(
Expand All @@ -119,26 +145,29 @@ impl Aligner for BurrowsWheelerAligner {
)
})
.unzip();
self.align_read_pairs(&mut reads).enumerate().map(|(i, (ali1, ali2))| {
let (bc, umi) = info.get(i).unwrap();
let ali1_ = add_cell_barcode(
&header,
&ali1,
bc.raw.sequence(),
bc.raw.quality_scores(),
bc.corrected.as_deref(),
)
.unwrap();
let ali2_ = add_cell_barcode(
&header,
&ali2,
bc.raw.sequence(),
bc.raw.quality_scores(),
bc.corrected.as_deref(),
)
.unwrap();
(ali1_, ali2_)
}).collect()
self.align_read_pairs(&mut reads)
.enumerate()
.map(|(i, (ali1, ali2))| {
let (bc, umi) = info.get(i).unwrap();
let ali1_ = add_cell_barcode(
&header,
&ali1,
bc.raw.sequence(),
bc.raw.quality_scores(),
bc.corrected.as_deref(),
)
.unwrap();
let ali2_ = add_cell_barcode(
&header,
&ali2,
bc.raw.sequence(),
bc.raw.quality_scores(),
bc.corrected.as_deref(),
)
.unwrap();
(ali1_, ali2_)
})
.collect()
}
}

Expand All @@ -153,7 +182,11 @@ impl Aligner for StarAligner {
self.get_header().clone()
}

fn align_reads(&mut self, header: &sam::Header, records: Vec<AnnotatedRecord>) -> Vec<Self::AlignOutput> {
fn align_reads(
&mut self,
header: &sam::Header,
records: Vec<AnnotatedRecord>,
) -> Vec<Self::AlignOutput> {
let (info, mut reads): (Vec<_>, Vec<_>) = records
.into_iter()
.map(|rec| ((rec.barcode.unwrap(), rec.umi), rec.read1.unwrap()))
Expand All @@ -162,23 +195,34 @@ impl Aligner for StarAligner {
// TODO: StarAligner can expose a method to align a single read instead of a batch,
// so that barcode and UMI processing can be done in parallel.
StarAligner::align_reads(self, reads.as_mut_slice())
.collect::<Vec<_>>().into_iter().enumerate().map(|(i, alignment)| {
.collect::<Vec<_>>()
.into_iter()
.enumerate()
.map(|(i, alignment)| {
let (bc, umi) = info.get(i).unwrap();
alignment.into_iter().map(|x|
add_cell_barcode(
header,
&x,
bc.raw.sequence(),
bc.raw.quality_scores(),
bc.corrected.as_deref(),
)
.unwrap()
).collect()
}).collect()
}

fn align_read_pairs(&mut self, header: &sam::Header, records: Vec<AnnotatedRecord>) -> Vec<(Self::AlignOutput, Self::AlignOutput)> {
let (info, mut reads): (Vec<_>, Vec<_>) = records
alignment
.into_iter()
.map(|x| {
add_cell_barcode(
header,
&x,
bc.raw.sequence(),
bc.raw.quality_scores(),
bc.corrected.as_deref(),
)
.unwrap()
})
.collect()
})
.collect()
}

fn align_read_pairs(
&mut self,
header: &sam::Header,
records: Vec<AnnotatedRecord>,
) -> Vec<(Self::AlignOutput, Self::AlignOutput)> {
let (info, mut reads): (Vec<_>, Vec<_>) = records
.into_iter()
.map(|rec| {
(
Expand All @@ -188,26 +232,40 @@ impl Aligner for StarAligner {
})
.unzip();
StarAligner::align_read_pairs(self, &mut reads)
.collect::<Vec<_>>().into_iter().enumerate().map(|(i, (ali1, ali2))| {
.collect::<Vec<_>>()
.into_iter()
.enumerate()
.map(|(i, (ali1, ali2))| {
let (bc, umi) = info.get(i).unwrap();
let ali1_ = ali1.into_iter().map(|x| add_cell_barcode(
&header,
&x,
bc.raw.sequence(),
bc.raw.quality_scores(),
bc.corrected.as_deref(),
)
.unwrap()).collect();
let ali2_ = ali2.into_iter().map(|x| add_cell_barcode(
&header,
&x,
bc.raw.sequence(),
bc.raw.quality_scores(),
bc.corrected.as_deref(),
)
.unwrap()).collect();
let ali1_ = ali1
.into_iter()
.map(|x| {
add_cell_barcode(
&header,
&x,
bc.raw.sequence(),
bc.raw.quality_scores(),
bc.corrected.as_deref(),
)
.unwrap()
})
.collect();
let ali2_ = ali2
.into_iter()
.map(|x| {
add_cell_barcode(
&header,
&x,
bc.raw.sequence(),
bc.raw.quality_scores(),
bc.corrected.as_deref(),
)
.unwrap()
})
.collect();
(ali1_, ali2_)
}).collect()
})
.collect()
}
}

Expand All @@ -217,7 +275,7 @@ pub struct FastqProcessor<A> {
current_modality: Option<Modality>,
mito_dna: HashSet<usize>,
metrics: HashMap<Modality, Metrics>,
align_qc: HashMap<Modality, Arc<Mutex<AlignQC>>>,
align_qc: HashMap<Modality, AlignQC>,
barcode_correct_prob: f64, // if the posterior probability of a correction
// exceeds this threshold, the barcode will be corrected.
// cellrange uses 0.975 for ATAC and 0.9 for multiome.
Expand Down Expand Up @@ -267,44 +325,45 @@ impl<A: Aligner> FastqProcessor<A> {
.get(&self.modality())
.map_or(Metrics::default(), |x| x.clone());
if let Some(align_qc) = self.align_qc.get(&self.modality()) {
align_qc.lock().unwrap().report(&mut metrics);
align_qc.report(&mut metrics);
}
metrics
}

pub fn gen_barcoded_alignments(
&mut self,
) -> impl Iterator<Item = Either<Vec<A::AlignOutput>, Vec<(A::AlignOutput, A::AlignOutput)>>> + '_ {
) -> impl Iterator<Item = Either<Vec<A::AlignOutput>, Vec<(A::AlignOutput, A::AlignOutput)>>> + '_
{
let fq_reader = self.gen_barcoded_fastq(true);
let is_paired = fq_reader.is_paired_end();
let modality = self.modality();

info!("Aligning reads...");
let header = self.aligner.header();
self.align_qc.insert(
self.modality(),
Arc::new(Mutex::new(AlignQC {
modality,
AlignQC {
mito_dna: self.mito_dna.clone(),
..AlignQC::default()
})),
},
);
let align_qc = self.align_qc.get(&self.modality()).unwrap().clone();

let mut progress_bar = tqdm!(total = fq_reader.total_reads.unwrap_or(0));
let fq_reader = VectorChunk::new(fq_reader, self.aligner.chunk_size());
fq_reader.map(move |data| {
let mut align_qc_lock = align_qc.lock().unwrap();
let align_qc = self.align_qc.get_mut(&modality).unwrap();
if is_paired {
let results: Vec<_> = self.aligner.align_read_pairs(&header, data);
results.iter().for_each(|(ali1, ali2)| {
ali1.as_iter().for_each(|x| align_qc_lock.update(x, &header));
ali2.as_iter().for_each(|x| align_qc_lock.update(x, &header));
ali1.as_iter().for_each(|x| align_qc.update(x, &header));
ali2.as_iter().for_each(|x| align_qc.update(x, &header));
});
progress_bar.update(results.len()).unwrap();
Either::Right(results)
} else {
let results: Vec<_> = self.aligner.align_reads(&header, data);
results.iter().for_each(|ali| {
ali.as_iter().for_each(|x| align_qc_lock.update(x, &header));
ali.as_iter().for_each(|x| align_qc.update(x, &header));
});
progress_bar.update(results.len()).unwrap();
Either::Left(results)
Expand Down

0 comments on commit 8d18e00

Please sign in to comment.