Skip to content

Commit

Permalink
upgrade seqspec to 0.3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
kaizhang committed Sep 27, 2024
1 parent 5a52c15 commit af733ad
Show file tree
Hide file tree
Showing 4 changed files with 511 additions and 394 deletions.
3 changes: 2 additions & 1 deletion precellar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ multi_reader = "0.1"
noodles = { version = "0.80", features = ["core", "fastq", "bam", "sam", "async"] }
kdam = "0.5.2"
rayon = "1.10"
smallvec = "1.13"
serde = "1.0"
serde_yaml = "0.9"
regex = "1.6"
yaml-rust = "0.4"
zstd = { version = "0.13", features = ["zstdmt"] }
197 changes: 87 additions & 110 deletions precellar/src/align.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use crate::barcode::{BarcodeCorrector, Whitelist};
use crate::seqspec::{Modality, RegionType, SeqSpec};
use crate::seqspec::{Assay, Modality, Read, Region, RegionType, SequenceType};
use crate::qc::{AlignQC, Metrics};

use bstr::BString;
use kdam::{tqdm, BarExt};
use std::ops::Range;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::collections::{HashMap, HashSet};
use std::io::BufRead;
use smallvec::SmallVec;
use anyhow::{bail, Result};
use noodles::{bam, sam, fastq};
use noodles::sam::alignment::{
Expand Down Expand Up @@ -51,9 +53,9 @@ impl Alinger for BurrowsWheelerAligner {

pub struct FastqProcessor<A> {
base_dir: PathBuf,
seqspec: SeqSpec,
assay: Assay,
aligner: A,
current_modality: Option<String>,
current_modality: Option<Modality>,
mito_dna: HashSet<usize>,
metrics: HashMap<Modality, Metrics>,
align_qc: HashMap<Modality, Arc<Mutex<AlignQC>>>,
Expand All @@ -63,9 +65,9 @@ pub struct FastqProcessor<A> {
}

impl<A: Alinger> FastqProcessor<A> {
pub fn new(seqspec: SeqSpec, aligner: A) -> Self {
pub fn new(assay: Assay, aligner: A) -> Self {
Self {
seqspec, aligner, current_modality: None, metrics: HashMap::new(),
assay, aligner, current_modality: None, metrics: HashMap::new(),
align_qc: HashMap::new(), mito_dna: HashSet::new(), base_dir: PathBuf::from("./"),
barcode_correct_prob: 0.975,
}
Expand All @@ -81,23 +83,23 @@ impl<A: Alinger> FastqProcessor<A> {
self
}

pub fn modality(&self) -> &str {
self.current_modality.as_ref().expect("modality not set, please call set_modality first")
pub fn modality(&self) -> Modality {
self.current_modality.expect("modality not set, please call set_modality first")
}

pub fn add_mito_dna(&mut self, mito_dna: &str) {
self.aligner.header().reference_sequences().get_index_of(&BString::from(mito_dna))
.map(|x| self.mito_dna.insert(x));
}

pub fn with_modality(mut self, modality: &str) -> Self {
pub fn with_modality(mut self, modality: Modality) -> Self {
self.current_modality = Some(modality.into());
self
}

pub fn get_report(&self) -> Metrics {
let mut metrics = self.metrics.get(self.modality()).map_or(Metrics::default(), |x| x.clone());
if let Some(align_qc) = self.align_qc.get(self.modality()) {
let mut metrics = self.metrics.get(&self.modality()).map_or(Metrics::default(), |x| x.clone());
if let Some(align_qc) = self.align_qc.get(&self.modality()) {
align_qc.lock().unwrap().report(&mut metrics);
}
metrics
Expand All @@ -116,13 +118,13 @@ impl<A: Alinger> FastqProcessor<A> {
let corrector = BarcodeCorrector::default().with_bc_confidence_threshold(self.barcode_correct_prob);
let header = self.aligner.header();
self.align_qc.insert(
self.modality().into(),
self.modality(),
Arc::new(Mutex::new(AlignQC {
mito_dna: self.mito_dna.clone(),
..AlignQC::default()
}))
);
let align_qc = self.align_qc.get(self.modality()).unwrap().clone();
let align_qc = self.align_qc.get(&self.modality()).unwrap().clone();

let mut progress_bar = tqdm!(total = whitelist.total_count);
fq_records.chunk(self.aligner.chunk_size()).map(move |data| if is_paired {
Expand Down Expand Up @@ -156,7 +158,7 @@ impl<A: Alinger> FastqProcessor<A> {
}
(ali1_, ali2_)
}).collect::<Vec<_>>();
progress_bar.update(results.len());
progress_bar.update(results.len()).unwrap();
Either::Right(results)
} else {
let (barcodes, mut reads): (Vec<_>, Vec<_>) = data.into_iter()
Expand All @@ -179,7 +181,7 @@ impl<A: Alinger> FastqProcessor<A> {
{ align_qc.lock().unwrap().update(&ali, &header); }
ali
}).collect::<Vec<_>>();
progress_bar.update(results.len());
progress_bar.update(results.len()).unwrap();
Either::Left(results)
})
}
Expand All @@ -202,111 +204,88 @@ impl<A: Alinger> FastqProcessor<A> {
})
}

pub fn gen_raw_fastq_records(&self) -> FastqRecords<impl BufRead> {
pub fn gen_raw_fastq_records(&self) -> FastqRecords<impl BufRead> {
let modality = self.modality();
let fq_list: HashMap<_, _> = self.seqspec.modality(modality).unwrap()
.iter_regions()
.filter(|region| region.region_type == RegionType::Fastq &&
region.iter_regions().any(|r|
r.region_type == RegionType::GDNA ||
r.region_type == RegionType::CDNA ||
r.region_type == RegionType::Barcode ||
r.region_type == RegionType::UMI
)
).map(|fq| (&fq.id, fq)).collect();
let mut read_list = HashMap::new();
self.seqspec.sequence_spec.get(modality).unwrap().iter()
.for_each(|read| if fq_list.contains_key(&read.primer_id) {
read_list.insert(&read.primer_id, read);
});
let data = read_list.into_iter().map(|(id, read)| {
let is_reverse = read.is_reverse();
let fq = fq_list.get(id).unwrap();
let regions = if is_reverse {
fq.subregion_range_rev()
} else {
fq.subregion_range()
};
(fq.id.clone(), is_reverse, regions, read.read_fastq(self.base_dir.clone()))
});
let data = self.assay.get_index_of(modality)
.map(|(read, regions)| (read, regions, read.read_fastq(self.base_dir.clone())));
FastqRecords::new(data)
}

fn count_barcodes(&mut self) -> Result<Whitelist> {
let modality = self.modality();
let mut whitelist = self.get_whitelist()?;
let region_with_barcode = self.seqspec.modality(self.modality()).unwrap()
.iter_regions().find(|r|
r.region_type == RegionType::Fastq && r.iter_regions().any(|x| x.region_type == RegionType::Barcode)
).unwrap();

let sequence_read = self.seqspec.get_read_by_primer_id(modality, &region_with_barcode.id).unwrap();
let subregions = if sequence_read.is_reverse() {
region_with_barcode.subregion_range_rev()
} else {
region_with_barcode.subregion_range()
};
let range = subregions.into_iter().find(|x| x.0 == RegionType::Barcode).unwrap().1;

sequence_read.read_fastq(&self.base_dir).records().for_each(|record| {

let (read, index) = self.assay.get_index_of(modality).into_iter()
.find(|(_, index)| index.into_iter().any(|x| x.0.region_type.is_barcode())).unwrap();
let range = index.into_iter().find(|x| x.0.region_type.is_barcode()).unwrap().1;

read.read_fastq(&self.base_dir).records().for_each(|record| {
let mut record = record.unwrap();
let n = record.sequence().len();
record = slice_fastq_record(&record, range.start, range.end.unwrap_or(n));
if sequence_read.is_reverse() {
record = slice_fastq_record(&record, range.start as usize, range.end as usize);
if read.is_reverse() {
record = rev_compl_fastq_record(record);
}
whitelist.count_barcode(std::str::from_utf8(record.sequence()).unwrap(), record.quality_scores());
});

self.metrics.entry(modality.to_string()).or_insert_with(Metrics::default)
self.metrics.entry(modality).or_insert_with(Metrics::default)
.insert("frac_q30_bases_barcode".to_string(), whitelist.frac_q30_bases());

Ok(whitelist)
}

fn get_whitelist(&self) -> Result<Whitelist> {
let regions: Vec<_> = self.seqspec.modality(self.modality()).unwrap()
.iter_regions().filter(|r| r.region_type == RegionType::Barcode).collect();
let regions: Vec<_> = self.assay.get_region_by_modality(self.modality()).unwrap()
.iter_regions().filter(|r| r.region_type.is_barcode()).collect();
if regions.len() != 1 {
bail!("Expecting exactly one barcode region, found {}", regions.len());
}
let region = regions[0];
if region.sequence_type.as_str() == "onlist" {
Ok(Whitelist::new(region.sequence_type.fetch_onlist()?))
if region.sequence_type == SequenceType::Onlist {
Ok(Whitelist::new(region.onlist.as_ref().unwrap().read()?))
} else {
Ok(Whitelist::empty())
}
}
}

pub struct FastqRecords<R> {
ids: Vec<String>,
is_reverse: Vec<bool>,
subregions: Vec<Vec<(RegionType, crate::seqspec::Range)>>,
readers: Vec<fastq::Reader<R>>,
pub struct FastqRecords<R>(Vec<FastqRecord<R>>);

struct FastqRecord<R> {
id: String,
is_reverse: bool,
subregion: Vec<(RegionType, Range<u32>)>,
reader: fastq::Reader<R>,
min_len: usize,
max_len: usize,
}

pub type Barcode = fastq::Record;
pub type UMI = fastq::Record;

impl<R: BufRead> FastqRecords<R> {
fn new<I>(iter: I) -> Self
fn new<'a, I>(iter: I) -> Self
where
I: Iterator<Item = (String, bool, Vec<(RegionType, crate::seqspec::Range)>, fastq::Reader<R>)>,
I: Iterator<Item = (&'a Read, Vec<(&'a Region, Range<u32>)>, fastq::Reader<R>)>,
{
let mut ids = Vec::new();
let mut is_reverse = Vec::new();
let mut subregions = Vec::new();
let mut readers = Vec::new();
iter.for_each(|(f, s, sr, r)| {
ids.push(f);
is_reverse.push(s);
subregions.push(sr.into_iter().filter(|x|
x.0 == RegionType::Barcode || x.0 == RegionType::CDNA || x.0 == RegionType::GDNA
).collect());
readers.push(r);
});
Self { ids, is_reverse, subregions, readers }
let records = iter.map(|(read, regions, reader)|
FastqRecord {
id: read.id().to_string(),
is_reverse: read.is_reverse(),
subregion: regions.into_iter().filter_map(|x| {
let region_type = x.0.region_type;
if region_type.is_barcode() || region_type.is_dna() {
Some((region_type, x.1))
} else {
None
}
}).collect(),
reader,
min_len: read.min_len as usize,
max_len: read.max_len as usize,
}
).collect();
Self(records)
}

fn chunk(self, chunk_size: usize) -> FastqRecordChunk<R> {
Expand All @@ -316,17 +295,12 @@ impl<R: BufRead> FastqRecords<R> {
fn is_paired_end(&self) -> bool {
let mut read1 = false;
let mut read2 = false;
self.subregions.iter().enumerate().for_each(|(i, sr)| {
sr.iter().for_each(|(region_type, _)| {
match region_type {
RegionType::CDNA | RegionType::GDNA => {
if self.is_reverse[i] {
read1 = true;
} else {
read2 = true;
}
},
_ => (),
self.0.iter().for_each(|x| {
x.subregion.iter().for_each(|(region_type, _)| if region_type.is_dna() {
if x.is_reverse {
read1 = true;
} else {
read2 = true;
}
});
});
Expand All @@ -339,16 +313,19 @@ impl<R: BufRead> Iterator for FastqRecords<R> {

fn next(&mut self) -> Option<Self::Item> {
let mut id_without_record = Vec::new();
let records = self.readers.iter_mut().enumerate().map(|(i, reader)| {
let records: SmallVec<[_; 4]> = self.0.iter_mut().map(|x| {
let mut record = fastq::Record::default();
let s = reader.read_record(&mut record).expect("error reading fastq record");
if s == 0 {
id_without_record.push(self.ids[i].as_str());
if x.reader.read_record(&mut record).expect("error reading fastq record") == 0 {
id_without_record.push(x.id.as_str());
None
} else {
let n = record.sequence().len();
if n < x.min_len || n > x.max_len {
panic!("Read length ({}) out of range: {}-{}", n, x.min_len, x.max_len);
}
Some(record)
}
}).collect::<Vec<_>>();
}).collect();
if id_without_record.len() == records.len() {
return None;
} else if id_without_record.len() > 0 {
Expand All @@ -360,21 +337,21 @@ impl<R: BufRead> Iterator for FastqRecords<R> {
let mut read2 = None;
records.iter().enumerate().for_each(|(i, r)| {
let record = r.as_ref().unwrap();
self.subregions[i].iter().for_each(|(region_type, range)| {
let fq = slice_fastq_record(record, range.start, range.end.unwrap_or(record.sequence().len()));
let is_reverse = self.is_reverse[i];
match region_type {
RegionType::Barcode => if is_reverse {
self.0[i].subregion.iter().for_each(|(region_type, range)| {
let fq = slice_fastq_record(record, range.start as usize, range.end as usize);
let is_reverse = self.0[i].is_reverse;
if region_type.is_barcode() {
if is_reverse {
barcode = Some(rev_compl_fastq_record(fq));
} else {
barcode = Some(fq);
},
RegionType::CDNA | RegionType::GDNA => if is_reverse {
}
} else if region_type.is_dna() {
if is_reverse {
read1 = Some(fq);
} else {
read2 = Some(fq);
},
_ => (),
}
}
});
});
Expand Down Expand Up @@ -500,17 +477,17 @@ mod tests {

#[test]
fn test_seqspec_io() {
let spec = SeqSpec::from_path("tests/data/spec.yaml").unwrap();
let spec = Assay::from_path("tests/data/spec.yaml").unwrap();
let aligner = BurrowsWheelerAligner::new(
FMIndex::read("tests/data/hg38").unwrap(),
AlignerOpts::default(),
PairedEndStats::default()
);
let mut processor = FastqProcessor::new(spec, aligner)
.with_modality("atac");
.with_modality(Modality::Atac);

processor.gen_barcoded_alignments().take(6).for_each(|x| {
()
println!("{:?}", x);
});

println!("{}", processor.get_report());
Expand Down
Loading

0 comments on commit af733ad

Please sign in to comment.