Skip to content

Commit

Permalink
add StarAligner
Browse files Browse the repository at this point in the history
  • Loading branch information
kaizhang committed Nov 5, 2024
1 parent c780e48 commit a545e5e
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 95 deletions.
1 change: 1 addition & 0 deletions precellar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ edition = "2021"
anyhow = "1.0"
bed-utils = "0.5.1"
bwa-mem2 = { git = "https://github.com/regulatory-genomics/bwa-mem2-rust.git", rev = "07eda9b9c2815ae52b3fa30b01de0e19fae31fe0" }
star-aligner = { git = "https://github.com/regulatory-genomics/star-aligner", rev = "f9915ea3afbac1e8f4773e2e7c22376f1549c3c7" }
bstr = "1.0"
either = "1.13"
itertools = "0.13"
Expand Down
265 changes: 171 additions & 94 deletions precellar/src/align.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,57 @@ use log::info;
use noodles::sam::alignment::record_buf::data::field::value::Value;
use noodles::sam::alignment::{record::data::field::tag::Tag, record_buf::RecordBuf, Record};
use noodles::{bam, fastq, sam};
use rayon::iter::ParallelIterator;
use seqspec::{Assay, Modality, Read, RegionId, RegionIndex, RegionType};
use smallvec::SmallVec;
use star_aligner::StarAligner;
use std::collections::{HashMap, HashSet};
use std::io::BufRead;
use std::ops::Range;
use std::sync::{Arc, Mutex};

pub trait Alinger {
pub trait AsIterator {
type Item;
type AsIter<'a>: Iterator<Item = &'a Self::Item> where Self: 'a;

fn as_iter(&self) -> Self::AsIter<'_>;
}

impl AsIterator for RecordBuf {
type Item = RecordBuf;
type AsIter<'a> = std::iter::Once<&'a RecordBuf>;

fn as_iter(&self) -> Self::AsIter<'_> {
std::iter::once(&self)
}
}

impl AsIterator for Vec<RecordBuf> {
type Item = RecordBuf;
type AsIter<'a> = std::slice::Iter<'a, RecordBuf>;

fn as_iter(&self) -> Self::AsIter<'_> {
self.iter()
}
}

pub trait Aligner {
type AlignOutput: AsIterator<Item = RecordBuf>;

fn chunk_size(&self) -> usize;

fn header(&self) -> sam::Header;

fn align_reads(
&mut self,
records: &mut [fastq::Record],
) -> impl ExactSizeIterator<Item = sam::Record>;
fn align_reads(&mut self, header: &sam::Header, records: Vec<AnnotatedRecord>) -> Vec<Self::AlignOutput>;

fn align_read_pairs(
&mut self,
records: &mut [(fastq::Record, fastq::Record)],
) -> impl ExactSizeIterator<Item = (sam::Record, sam::Record)>;
fn align_read_pairs(&mut self, header: &sam::Header, records: Vec<AnnotatedRecord>) -> Vec<(Self::AlignOutput, Self::AlignOutput)>;
}

pub struct DummyAligner;

impl Alinger for DummyAligner {
impl Aligner for DummyAligner {
type AlignOutput = RecordBuf;

fn chunk_size(&self) -> usize {
0
}
Expand All @@ -44,22 +69,18 @@ impl Alinger for DummyAligner {
sam::Header::default()
}

fn align_reads(
&mut self,
_records: &mut [fastq::Record],
) -> impl ExactSizeIterator<Item = sam::Record> {
std::iter::empty()
fn align_reads(&mut self, _: &sam::Header, _: Vec<AnnotatedRecord>) -> Vec<Self::AlignOutput> {
Vec::new()
}

fn align_read_pairs(
&mut self,
_records: &mut [(fastq::Record, fastq::Record)],
) -> impl ExactSizeIterator<Item = (sam::Record, sam::Record)> {
std::iter::empty()
fn align_read_pairs(&mut self, _: &sam::Header, _: Vec<AnnotatedRecord>) -> Vec<(Self::AlignOutput, Self::AlignOutput)> {
Vec::new()
}
}

impl Alinger for BurrowsWheelerAligner {
impl Aligner for BurrowsWheelerAligner {
type AlignOutput = RecordBuf;

fn chunk_size(&self) -> usize {
self.chunk_size()
}
Expand All @@ -68,18 +89,125 @@ impl Alinger for BurrowsWheelerAligner {
self.get_sam_header()
}

fn align_reads(
&mut self,
records: &mut [fastq::Record],
) -> impl ExactSizeIterator<Item = sam::Record> {
self.align_reads(records)
fn align_reads(&mut self, header: &sam::Header, records: Vec<AnnotatedRecord>) -> Vec<Self::AlignOutput> {
let (info, mut reads): (Vec<_>, Vec<_>) = records
.into_iter()
.map(|rec| ((rec.barcode.unwrap(), rec.umi), rec.read1.unwrap()))
.unzip();

// TODO: add UMI
self.align_reads(reads.as_mut_slice()).enumerate().map(|(i, alignment)| {
let (bc, umi) = info.get(i).unwrap();
add_cell_barcode(
header,
&alignment,
bc.raw.sequence(),
bc.raw.quality_scores(),
bc.corrected.as_deref(),
)
.unwrap()
}).collect()
}

fn align_read_pairs(
&mut self,
records: &mut [(fastq::Record, fastq::Record)],
) -> impl ExactSizeIterator<Item = (sam::Record, sam::Record)> {
self.align_read_pairs(records)
fn align_read_pairs(&mut self, header: &sam::Header, records: Vec<AnnotatedRecord>) -> Vec<(Self::AlignOutput, Self::AlignOutput)> {
let (info, mut reads): (Vec<_>, Vec<_>) = records
.into_iter()
.map(|rec| {
(
(rec.barcode.unwrap(), rec.umi),
(rec.read1.unwrap(), rec.read2.unwrap()),
)
})
.unzip();
self.align_read_pairs(&mut reads).enumerate().map(|(i, (ali1, ali2))| {
let (bc, umi) = info.get(i).unwrap();
let ali1_ = add_cell_barcode(
&header,
&ali1,
bc.raw.sequence(),
bc.raw.quality_scores(),
bc.corrected.as_deref(),
)
.unwrap();
let ali2_ = add_cell_barcode(
&header,
&ali2,
bc.raw.sequence(),
bc.raw.quality_scores(),
bc.corrected.as_deref(),
)
.unwrap();
(ali1_, ali2_)
}).collect()
}
}

impl Aligner for StarAligner {
type AlignOutput = Vec<RecordBuf>;

fn chunk_size(&self) -> usize {
0
}

fn header(&self) -> sam::Header {
self.get_header().clone()
}

fn align_reads(&mut self, header: &sam::Header, records: Vec<AnnotatedRecord>) -> Vec<Self::AlignOutput> {
let (info, mut reads): (Vec<_>, Vec<_>) = records
.into_iter()
.map(|rec| ((rec.barcode.unwrap(), rec.umi), rec.read1.unwrap()))
.unzip();

// TODO: StarAligner can expose a method to align a single read instead of a batch,
// so that barcode and UMI processing can be done in parallel.
StarAligner::align_reads(self, reads.as_mut_slice())
.collect::<Vec<_>>().into_iter().enumerate().map(|(i, alignment)| {
let (bc, umi) = info.get(i).unwrap();
alignment.into_iter().map(|x|
add_cell_barcode(
header,
&x,
bc.raw.sequence(),
bc.raw.quality_scores(),
bc.corrected.as_deref(),
)
.unwrap()
).collect()
}).collect()
}

fn align_read_pairs(&mut self, header: &sam::Header, records: Vec<AnnotatedRecord>) -> Vec<(Self::AlignOutput, Self::AlignOutput)> {
let (info, mut reads): (Vec<_>, Vec<_>) = records
.into_iter()
.map(|rec| {
(
(rec.barcode.unwrap(), rec.umi),
(rec.read1.unwrap(), rec.read2.unwrap()),
)
})
.unzip();
StarAligner::align_read_pairs(self, &mut reads)
.collect::<Vec<_>>().into_iter().enumerate().map(|(i, (ali1, ali2))| {
let (bc, umi) = info.get(i).unwrap();
let ali1_ = ali1.into_iter().map(|x| add_cell_barcode(
&header,
&x,
bc.raw.sequence(),
bc.raw.quality_scores(),
bc.corrected.as_deref(),
)
.unwrap()).collect();
let ali2_ = ali2.into_iter().map(|x| add_cell_barcode(
&header,
&x,
bc.raw.sequence(),
bc.raw.quality_scores(),
bc.corrected.as_deref(),
)
.unwrap()).collect();
(ali1_, ali2_)
}).collect()
}
}

Expand All @@ -96,7 +224,7 @@ pub struct FastqProcessor<A> {
mismatch_in_barcode: usize, // The number of mismatches allowed in barcode
}

impl<A: Alinger> FastqProcessor<A> {
impl<A: Aligner> FastqProcessor<A> {
pub fn new(assay: Assay, aligner: A) -> Self {
Self {
assay,
Expand Down Expand Up @@ -146,7 +274,7 @@ impl<A: Alinger> FastqProcessor<A> {

pub fn gen_barcoded_alignments(
&mut self,
) -> impl Iterator<Item = Either<Vec<RecordBuf>, Vec<(RecordBuf, RecordBuf)>>> + '_ {
) -> impl Iterator<Item = Either<Vec<A::AlignOutput>, Vec<(A::AlignOutput, A::AlignOutput)>>> + '_ {
let fq_reader = self.gen_barcoded_fastq(true);
let is_paired = fq_reader.is_paired_end();

Expand All @@ -164,71 +292,20 @@ impl<A: Alinger> FastqProcessor<A> {
let mut progress_bar = tqdm!(total = fq_reader.total_reads.unwrap_or(0));
let fq_reader = VectorChunk::new(fq_reader, self.aligner.chunk_size());
fq_reader.map(move |data| {
let mut align_qc_lock = align_qc.lock().unwrap();
if is_paired {
let (barcodes, mut reads): (Vec<_>, Vec<_>) = data
.into_iter()
.map(|rec| {
(
rec.barcode.unwrap(),
(rec.read1.unwrap(), rec.read2.unwrap()),
)
})
.unzip();
let alignments: Vec<_> = self.aligner.align_read_pairs(&mut reads).collect();
let results = barcodes
.into_iter()
.zip(alignments)
.map(|(barcode, (ali1, ali2))| {
let ali1_ = add_cell_barcode(
&header,
&ali1,
barcode.raw.sequence(),
barcode.raw.quality_scores(),
barcode.corrected.as_deref(),
)
.unwrap();
let ali2_ = add_cell_barcode(
&header,
&ali2,
barcode.raw.sequence(),
barcode.raw.quality_scores(),
barcode.corrected.as_deref(),
)
.unwrap();
{
let mut align_qc_lock = align_qc.lock().unwrap();
align_qc_lock.update(&ali1_, &header);
align_qc_lock.update(&ali2_, &header);
}
(ali1_, ali2_)
})
.collect::<Vec<_>>();
let results: Vec<_> = self.aligner.align_read_pairs(&header, data);
results.iter().for_each(|(ali1, ali2)| {
ali1.as_iter().for_each(|x| align_qc_lock.update(x, &header));
ali2.as_iter().for_each(|x| align_qc_lock.update(x, &header));
});
progress_bar.update(results.len()).unwrap();
Either::Right(results)
} else {
let (barcodes, mut reads): (Vec<_>, Vec<_>) = data
.into_iter()
.map(|rec| (rec.barcode.unwrap(), rec.read1.unwrap()))
.unzip();
let alignments: Vec<_> = self.aligner.align_reads(&mut reads).collect();
let results = barcodes
.into_iter()
.zip(alignments)
.map(|(barcode, alignment)| {
let ali = add_cell_barcode(
&header,
&alignment,
barcode.raw.sequence(),
barcode.raw.quality_scores(),
barcode.corrected.as_deref(),
)
.unwrap();
{
align_qc.lock().unwrap().update(&ali, &header);
}
ali
})
.collect::<Vec<_>>();
let results: Vec<_> = self.aligner.align_reads(&header, data);
results.iter().for_each(|ali| {
ali.as_iter().for_each(|x| align_qc_lock.update(x, &header));
});
progress_bar.update(results.len()).unwrap();
Either::Left(results)
}
Expand Down
1 change: 1 addition & 0 deletions python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ crate-type = ["cdylib"]
[dependencies]
anyhow = "1.0"
bwa-mem2 = { git = "https://github.com/regulatory-genomics/bwa-mem2-rust.git", rev = "07eda9b9c2815ae52b3fa30b01de0e19fae31fe0" }
star-aligner = { git = "https://github.com/regulatory-genomics/star-aligner", rev = "f9915ea3afbac1e8f4773e2e7c22376f1549c3c7" }
bstr = "1.0"
either = "1.13"
itertools = "0.13"
Expand Down
2 changes: 1 addition & 1 deletion python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{collections::HashMap, io::BufWriter, path::PathBuf, str::FromStr};

use ::precellar::{
align::{
extend_fastq_record, Alinger, Barcode, DummyAligner, FastqProcessor, NameCollatedRecords,
extend_fastq_record, Aligner, Barcode, DummyAligner, FastqProcessor, NameCollatedRecords,
},
fragment::FragmentGenerator,
qc::{AlignQC, FragmentQC, Metrics},
Expand Down

0 comments on commit a545e5e

Please sign in to comment.