From 2a3ed5f5e76093e23786ca700e67ea9e869ad90b Mon Sep 17 00:00:00 2001 From: Kai Zhang Date: Sun, 22 Dec 2024 11:07:16 +0800 Subject: [PATCH] implement gene quantification (#7) --- precellar/Cargo.toml | 7 +- precellar/src/align/aligners.rs | 315 ++++++++++++ precellar/src/{align.rs => align/fastq.rs} | 486 +++++------------- precellar/src/align/mod.rs | 5 + precellar/src/barcode.rs | 29 +- precellar/src/fragment.rs | 29 +- .../fragment/{deduplicate.rs => de_dups.rs} | 30 +- precellar/src/lib.rs | 4 +- precellar/src/qc.rs | 243 ++++----- precellar/src/transcript/annotate.rs | 470 +++++++++++++++++ precellar/src/transcript/de_dups.rs | 74 +++ precellar/src/transcript/mod.rs | 8 + precellar/src/transcript/quantification.rs | 182 +++++++ precellar/src/transcript/transcriptome.rs | 476 +++++++++++++++++ python/Cargo.toml | 7 +- python/src/aligner.rs | 35 ++ python/src/lib.rs | 148 ++++-- seqspec/src/lib.rs | 54 +- seqspec/src/read.rs | 220 +++++--- seqspec/src/region.rs | 10 + seqspec_templates/10x_rna_atac.yaml | 15 +- 21 files changed, 2155 insertions(+), 692 deletions(-) create mode 100644 precellar/src/align/aligners.rs rename precellar/src/{align.rs => align/fastq.rs} (57%) create mode 100644 precellar/src/align/mod.rs rename precellar/src/fragment/{deduplicate.rs => de_dups.rs} (94%) create mode 100644 precellar/src/transcript/annotate.rs create mode 100644 precellar/src/transcript/de_dups.rs create mode 100644 precellar/src/transcript/mod.rs create mode 100644 precellar/src/transcript/quantification.rs create mode 100644 precellar/src/transcript/transcriptome.rs create mode 100644 python/src/aligner.rs diff --git a/precellar/Cargo.toml b/precellar/Cargo.toml index c23f016..a852747 100644 --- a/precellar/Cargo.toml +++ b/precellar/Cargo.toml @@ -5,16 +5,15 @@ edition = "2021" [dependencies] anyhow = "1.0" -bed-utils = "0.5.1" +bed-utils = "0.6" bwa-mem2 = { git = "https://github.com/regulatory-genomics/bwa-mem2-rust.git", rev = "8de06bcc0a2145fd819232ffb2bf100fb795db30" } -star-aligner = { git = "https://github.com/regulatory-genomics/star-aligner", rev = "faef1085eaf26e6e8d5875fcbc641c3af9444d89" } +star-aligner = { git = "https://github.com/regulatory-genomics/star-aligner", rev = "4672820b6a2c49ef514f9160e08188928d45a874" } bstr = "1.0" -either = "1.13" itertools = "0.13" indexmap = "2.5" log = "0.4" lexical = "6.1" -noodles = { version = "0.85", features = ["core", "fastq", "bam", "sam", "async"] } +noodles = { version = "0.85", features = ["core", "gtf", "fastq", "bam", "sam", "async"] } kdam = "0.5.2" rayon = "1.10" smallvec = "1.13" diff --git a/precellar/src/align/aligners.rs b/precellar/src/align/aligners.rs new file mode 100644 index 0000000..b91a31c --- /dev/null +++ b/precellar/src/align/aligners.rs @@ -0,0 +1,315 @@ +use std::path::Path; + +/// This module provides an abstraction for aligning sequencing reads using different alignment tools like BWA and STAR. +use super::fastq::AnnotatedFastq; +use crate::barcode::{get_barcode, get_umi}; +use crate::transcript::Transcript; + +use anyhow::{bail, ensure, Result}; +pub use bwa_mem2::BurrowsWheelerAligner; +use noodles::sam::alignment::Record; +pub use star_aligner::StarAligner; + +use bwa_mem2::{AlignerOpts, FMIndex, PairedEndStats}; +use noodles::sam; +use noodles::sam::alignment::record::data::field::tag::Tag; +use noodles::sam::alignment::record_buf::{data::field::value::Value, RecordBuf}; +use rayon::iter::ParallelIterator; +use rayon::slice::ParallelSlice; +use star_aligner::StarOpts; + +pub type MultiMapR = MultiMap; + +/// Represents a set of alignments (primary and optional secondary alignments) for a single sequencing read. +#[derive(Debug, Clone)] +pub struct MultiMap { + /// The primary alignment for the read. + pub primary: R, + /// Optional secondary alignments for the read. + pub others: Option>, +} + +impl MultiMap { + /// Constructs a new `MultiMap`. + /// + /// # Arguments + /// * `primary` - The primary alignment for the read. + /// * `others` - Optional secondary alignments for the read. + pub fn new(primary: R, others: Option>) -> Self { + Self { primary, others } + } + + /// Return the number of records. + pub fn len(&self) -> usize { + self.others.as_ref().map_or(0, |x| x.len()) + 1 + } + + pub fn barcode(&self) -> Result> { + get_barcode(&self.primary) + } + + pub fn umi(&self) -> Result> { + get_umi(&self.primary) + } + + /// Consumes the `MultiMap` and returns the primary alignment. + pub fn into_primary(self) -> R { + self.primary + } + + /// Returns an iterator over all alignments (primary and secondary). + pub fn iter(&self) -> impl Iterator { + std::iter::once(&self.primary).chain(self.others.iter().flatten()) + } +} + +impl From for MultiMap { + fn from(record: R) -> Self { + Self { + primary: record, + others: None, + } + } +} + +impl TryFrom> for MultiMap { + type Error = anyhow::Error; + + fn try_from(mut vec: Vec) -> Result { + let n = vec.len(); + if n == 0 { + Err(anyhow::anyhow!("No alignments")) + } else if n == 1 { + Ok(MultiMap::from(vec.into_iter().next().unwrap())) + } else { + let mut primary = None; + vec.iter().enumerate().try_for_each(|(i, rec)| { + if !rec.flags()?.is_secondary() { + if primary.is_some() { + bail!("Multiple primary alignments"); + } else { + primary = Some(i); + } + } + Ok(()) + })?; + ensure!(primary.is_some(), "No primary alignment"); + + Ok(MultiMap::new(vec.swap_remove(primary.unwrap()), Some(vec))) + } + } +} + +/// Trait defining the behavior of aligners like BWA and STAR. +pub trait Aligner { + /// Creates a new aligner instance from a reference genome index path. + fn from_path>(path: P) -> Self; + + /// Retrieves the SAM header associated with the aligner. + fn header(&self) -> sam::Header; + + /// Aligns a batch of sequencing reads. + /// + /// # Arguments + /// * `num_threads` - Number of threads to use for alignment. + /// * `records` - Vector of annotated FASTQ records to align. + /// + /// # Returns + /// A vector of tuples where each tuple contains the primary alignment and optional secondary alignments for a read. + fn align_reads( + &mut self, + num_threads: u16, + records: Vec, + ) -> Vec<(MultiMapR, Option)>; +} + +impl Aligner for BurrowsWheelerAligner { + fn from_path>(path: P) -> Self { + BurrowsWheelerAligner::new( + FMIndex::read(path).unwrap(), + AlignerOpts::default(), + PairedEndStats::default(), + ) + } + + fn header(&self) -> sam::Header { + self.get_sam_header() + } + + fn align_reads( + &mut self, + num_threads: u16, + records: Vec, + ) -> Vec<(MultiMapR, Option)> { + if records[0].read2.is_some() { + let (info, mut reads): (Vec<_>, Vec<_>) = records + .into_iter() + .map(|rec| { + ( + (rec.barcode.unwrap(), rec.umi), + (rec.read1.unwrap(), rec.read2.unwrap()), + ) + }) + .unzip(); + self.align_read_pairs(num_threads, &mut reads) + .enumerate() + .map(|(i, (mut ali1, mut ali2))| { + let (bc, umi) = info.get(i).unwrap(); + add_cell_barcode( + &mut ali1, + bc.raw.sequence(), + bc.raw.quality_scores(), + bc.corrected.as_deref(), + ); + add_cell_barcode( + &mut ali2, + bc.raw.sequence(), + bc.raw.quality_scores(), + bc.corrected.as_deref(), + ); + if let Some(umi) = umi { + add_umi(&mut ali1, umi.sequence(), umi.quality_scores()); + add_umi(&mut ali2, umi.sequence(), umi.quality_scores()); + } + (ali1.into(), Some(ali2.into())) + }) + .collect() + } else { + let (info, mut reads): (Vec<_>, Vec<_>) = records + .into_iter() + .map(|rec| ((rec.barcode.unwrap(), rec.umi), rec.read1.unwrap())) + .unzip(); + + self.align_reads(num_threads, reads.as_mut_slice()) + .enumerate() + .map(|(i, mut alignment)| { + let (bc, umi) = info.get(i).unwrap(); + add_cell_barcode( + &mut alignment, + bc.raw.sequence(), + bc.raw.quality_scores(), + bc.corrected.as_deref(), + ); + if let Some(umi) = umi { + add_umi(&mut alignment, umi.sequence(), umi.quality_scores()); + } + (alignment.into(), None) + }) + .collect() + } + } +} + +impl Aligner for StarAligner { + fn from_path>(path: P) -> Self { + let opts = StarOpts::new(path); + StarAligner::new(opts).unwrap() + } + + fn header(&self) -> sam::Header { + self.get_header().clone() + } + + fn align_reads( + &mut self, + num_threads: u16, + records: Vec, + ) -> Vec<(MultiMapR, Option)> { + let chunk_size = get_chunk_size(records.len(), num_threads as usize); + + records + .par_chunks(chunk_size) + .flat_map_iter(|chunk| { + let mut aligner = self.clone(); + chunk.iter().map(move |rec| { + let bc = rec.barcode.as_ref().unwrap(); + let read1; + let mut read2 = None; + if rec.read1.is_some() { + read1 = rec.read1.as_ref().unwrap(); + read2 = rec.read2.as_ref(); + } else { + read1 = rec.read2.as_ref().unwrap(); + } + + if read2.is_some() { + let (mut ali1, mut ali2) = + aligner.align_read_pair(read1, &read2.unwrap()).unwrap(); + ali1.iter_mut() + .chain(ali2.iter_mut()) + .for_each(|alignment| { + add_cell_barcode( + alignment, + bc.raw.sequence(), + bc.raw.quality_scores(), + bc.corrected.as_deref(), + ); + if let Some(umi) = &rec.umi { + add_umi(alignment, umi.sequence(), umi.quality_scores()); + }; + }); + (ali1.try_into().unwrap(), Some(ali2.try_into().unwrap())) + } else { + let mut ali = aligner.align_read(read1).unwrap(); + ali.iter_mut().for_each(|alignment| { + add_cell_barcode( + alignment, + bc.raw.sequence(), + bc.raw.quality_scores(), + bc.corrected.as_deref(), + ); + if let Some(umi) = &rec.umi { + add_umi(alignment, umi.sequence(), umi.quality_scores()); + }; + }); + (ali.try_into().unwrap(), None) + } + }) + }) + .collect() + } +} + +pub fn read_transcriptome_star>(dir: P) -> Result> { + let transcriptome = star_aligner::transcript::Transcriptome::from_path(dir)?; + transcriptome + .iter() + .map(|t| t.clone().try_into()) + .collect() +} + +fn get_chunk_size(total_length: usize, num_threads: usize) -> usize { + let chunk_size = total_length / num_threads; + if chunk_size == 0 { + 1 + } else { + chunk_size + } +} + +// Additional helper functions for adding metadata like cell barcodes and UMIs to alignments. +fn add_cell_barcode( + record_buf: &mut RecordBuf, + ori_barcode: &[u8], + ori_qual: &[u8], + correct_barcode: Option<&[u8]>, +) { + let data = record_buf.data_mut(); + data.insert( + Tag::CELL_BARCODE_SEQUENCE, + Value::String(ori_barcode.into()), + ); + data.insert( + Tag::CELL_BARCODE_QUALITY_SCORES, + Value::String(ori_qual.into()), + ); + if let Some(barcode) = correct_barcode { + data.insert(Tag::CELL_BARCODE_ID, Value::String(barcode.into())); + } +} + +fn add_umi(record_buf: &mut RecordBuf, umi: &[u8], qual: &[u8]) { + let data = record_buf.data_mut(); + data.insert(Tag::UMI_SEQUENCE, Value::String(umi.into())); + data.insert(Tag::UMI_QUALITY_SCORES, Value::String(qual.into())); +} diff --git a/precellar/src/align.rs b/precellar/src/align/fastq.rs similarity index 57% rename from precellar/src/align.rs rename to precellar/src/align/fastq.rs index 5ea6c8a..c83a085 100644 --- a/precellar/src/align.rs +++ b/precellar/src/align/fastq.rs @@ -1,237 +1,25 @@ +use super::aligners::{Aligner, MultiMap, MultiMapR}; + +use crate::adapter::trim_poly_nucleotide; use crate::barcode::{BarcodeCorrector, OligoFrequncy, Whitelist}; use crate::qc::{AlignQC, Metrics}; use anyhow::{bail, Result}; use bstr::BString; -use bwa_mem2::BurrowsWheelerAligner; -use either::Either; use indexmap::IndexMap; use kdam::{tqdm, BarExt}; use log::info; -use noodles::sam::alignment::record_buf::{data::field::value::Value, RecordBuf}; -use noodles::sam::alignment::record::data::field::tag::Tag; -use noodles::{bam, fastq, sam}; -use rayon::iter::ParallelIterator; -use rayon::slice::ParallelSlice; -use seqspec::{Assay, Modality, Read, RegionId, RegionIndex, RegionType}; +use noodles::{bam, fastq}; +use seqspec::{Assay, Modality, Read, RegionId, SegmentInfo, SegmentInfoElem}; use smallvec::SmallVec; -use star_aligner::StarAligner; use std::{ collections::{HashMap, HashSet}, io::BufRead, - ops::Range, }; -pub trait AsIterator { - type Item; - type AsIter<'a>: Iterator - where - Self: 'a; - - fn as_iter(&self) -> Self::AsIter<'_>; -} - -impl AsIterator for RecordBuf { - type Item = RecordBuf; - type AsIter<'a> = std::iter::Once<&'a RecordBuf>; - - fn as_iter(&self) -> Self::AsIter<'_> { - std::iter::once(&self) - } -} - -impl AsIterator for Vec { - type Item = RecordBuf; - type AsIter<'a> = std::slice::Iter<'a, RecordBuf>; - - fn as_iter(&self) -> Self::AsIter<'_> { - self.iter() - } -} - -pub trait Aligner { - type AlignOutput: AsIterator; - - fn header(&self) -> sam::Header; - - fn align_reads( - &mut self, - num_threads: u16, - records: Vec, - ) -> Vec; - - fn align_read_pairs( - &mut self, - num_threads: u16, - records: Vec, - ) -> Vec<(Self::AlignOutput, Self::AlignOutput)>; -} - -pub struct DummyAligner; - -impl Aligner for DummyAligner { - type AlignOutput = RecordBuf; - - fn header(&self) -> sam::Header { - sam::Header::default() - } - - fn align_reads(&mut self, _: u16, _: Vec) -> Vec { - Vec::new() - } - - fn align_read_pairs( - &mut self, - _: u16, - _: Vec, - ) -> Vec<(Self::AlignOutput, Self::AlignOutput)> { - Vec::new() - } -} - -impl Aligner for BurrowsWheelerAligner { - type AlignOutput = RecordBuf; - - fn header(&self) -> sam::Header { - self.get_sam_header() - } - - fn align_reads( - &mut self, - num_threads: u16, - records: Vec, - ) -> Vec { - let (info, mut reads): (Vec<_>, Vec<_>) = records - .into_iter() - .map(|rec| ((rec.barcode.unwrap(), rec.umi), rec.read1.unwrap())) - .unzip(); - - // TODO: add UMI - self.align_reads(num_threads, reads.as_mut_slice()) - .enumerate() - .map(|(i, mut alignment)| { - let (bc, umi) = info.get(i).unwrap(); - add_cell_barcode( - &mut alignment, - bc.raw.sequence(), - bc.raw.quality_scores(), - bc.corrected.as_deref(), - ); - alignment - }) - .collect() - } - - fn align_read_pairs( - &mut self, - num_threads: u16, - records: Vec, - ) -> Vec<(Self::AlignOutput, Self::AlignOutput)> { - let (info, mut reads): (Vec<_>, Vec<_>) = records - .into_iter() - .map(|rec| { - ( - (rec.barcode.unwrap(), rec.umi), - (rec.read1.unwrap(), rec.read2.unwrap()), - ) - }) - .unzip(); - self.align_read_pairs(num_threads, &mut reads) - .enumerate() - .map(|(i, (mut ali1, mut ali2))| { - let (bc, umi) = info.get(i).unwrap(); - add_cell_barcode( - &mut ali1, - bc.raw.sequence(), - bc.raw.quality_scores(), - bc.corrected.as_deref(), - ); - add_cell_barcode( - &mut ali2, - bc.raw.sequence(), - bc.raw.quality_scores(), - bc.corrected.as_deref(), - ); - (ali1, ali2) - }) - .collect() - } -} - -impl Aligner for StarAligner { - type AlignOutput = Vec; - - fn header(&self) -> sam::Header { - self.get_header().clone() - } - - fn align_reads( - &mut self, - num_threads: u16, - records: Vec, - ) -> Vec { - let chunk_size = get_chunk_size(records.len(), num_threads as usize); - - records.par_chunks(chunk_size).flat_map_iter(|chunk| { - let mut aligner = self.clone(); - chunk.iter().map(move |rec| { - let bc = rec.barcode.as_ref().unwrap(); - let mut ali = aligner.align_read(&rec.read1.as_ref().unwrap()).unwrap(); - ali.iter_mut().for_each(|alignment| - add_cell_barcode( - alignment, - bc.raw.sequence(), - bc.raw.quality_scores(), - bc.corrected.as_deref(), - ) - ); - ali - }) - }).collect() - } - - fn align_read_pairs( - &mut self, - num_threads: u16, - records: Vec, - ) -> Vec<(Self::AlignOutput, Self::AlignOutput)> { - let chunk_size = get_chunk_size(records.len(), num_threads as usize); - - records.par_chunks(chunk_size).flat_map_iter(|chunk| { - let mut aligner = self.clone(); - chunk.iter().map(move |rec| { - let bc = rec.barcode.as_ref().unwrap(); - let (mut ali1, mut ali2) = aligner.align_read_pair( - &rec.read1.as_ref().unwrap(), - &rec.read2.as_ref().unwrap() - ).unwrap(); - ali1.iter_mut().chain(ali2.iter_mut()).for_each(|alignment| - add_cell_barcode( - alignment, - bc.raw.sequence(), - bc.raw.quality_scores(), - bc.corrected.as_deref(), - ) - ); - (ali1, ali2) - }).collect::>() - }).collect() - } -} - -fn get_chunk_size(total_length: usize, num_threads: usize) -> usize { - let chunk_size = total_length / num_threads; - if chunk_size == 0 { - 1 - } else { - chunk_size - } -} - -pub struct FastqProcessor { +pub struct FastqProcessor { assay: Assay, - aligner: A, current_modality: Option, - mito_dna: HashSet, + mito_dna: HashSet, metrics: HashMap, align_qc: HashMap, barcode_correct_prob: f64, // if the posterior probability of a correction @@ -240,11 +28,10 @@ pub struct FastqProcessor { mismatch_in_barcode: usize, // The number of mismatches allowed in barcode } -impl FastqProcessor { - pub fn new(assay: Assay, aligner: A) -> Self { +impl FastqProcessor { + pub fn new(assay: Assay) -> Self { Self { assay, - aligner, current_modality: None, metrics: HashMap::new(), align_qc: HashMap::new(), @@ -264,12 +51,8 @@ impl FastqProcessor { .expect("modality not set, please call set_modality first") } - pub fn add_mito_dna(&mut self, mito_dna: &str) { - self.aligner - .header() - .reference_sequences() - .get_index_of(&BString::from(mito_dna)) - .map(|x| self.mito_dna.insert(x)); + pub fn add_mito_dna(&mut self, mito_dna: impl Into) { + self.mito_dna.insert(mito_dna.into()); } pub fn with_modality(mut self, modality: Modality) -> Self { @@ -291,56 +74,46 @@ impl FastqProcessor { /// Align reads and return the alignments. /// If the fastq file is paired-end, the alignments will be returned as a tuple. /// Otherwise, the alignments will be returned as a single vector. - /// + /// /// # Arguments - /// + /// /// * `num_threads` - The number of threads to use for alignment. /// * `chunk_size` - The maximum number of bases in a chunk. - /// + /// /// # Returns - /// + /// /// An iterator of alignments. If the fastq file is paired-end, the alignments will be returned as a tuple. /// Otherwise, the alignments will be returned as a single vector. - pub fn gen_barcoded_alignments( - &mut self, + pub fn gen_barcoded_alignments<'a, A: Aligner>( + &'a mut self, + aligner: &'a mut A, num_threads: u16, chunk_size: usize, - ) -> impl Iterator, Vec<(A::AlignOutput, A::AlignOutput)>>> + '_ - { + ) -> impl Iterator)>> + 'a { let fq_reader = self.gen_barcoded_fastq(true); - let is_paired = fq_reader.is_paired_end(); - let modality = self.modality(); + let total_reads = fq_reader.total_reads.unwrap_or(0); - info!("Aligning reads..."); - let header = self.aligner.header(); - self.align_qc.insert( - modality, - AlignQC { - mito_dna: self.mito_dna.clone(), - ..AlignQC::default() - }, - ); + let modality = self.modality(); + info!("Aligning {} reads...", total_reads); + let header = aligner.header(); + let mut qc = AlignQC::default(); + self.mito_dna.iter().for_each(|mito| { + header.reference_sequences() + .get_index_of(&BString::from(mito.as_str())) + .map(|x| qc.mito_dna.insert(x)); + }); + self.align_qc.insert(modality, qc); - let mut progress_bar = tqdm!(total = fq_reader.total_reads.unwrap_or(0)); + let mut progress_bar = tqdm!(total = total_reads); let fq_reader = VectorChunk::new(fq_reader, chunk_size); fq_reader.map(move |data| { let align_qc = self.align_qc.get_mut(&modality).unwrap(); - if is_paired { - let results: Vec<_> = self.aligner.align_read_pairs(num_threads, data); - results.iter().for_each(|(ali1, ali2)| { - ali1.as_iter().for_each(|x| align_qc.update(x, &header)); - ali2.as_iter().for_each(|x| align_qc.update(x, &header)); - }); - progress_bar.update(results.len()).unwrap(); - Either::Right(results) - } else { - let results: Vec<_> = self.aligner.align_reads(num_threads, data); - results.iter().for_each(|ali| { - ali.as_iter().for_each(|x| align_qc.update(x, &header)); - }); - progress_bar.update(results.len()).unwrap(); - Either::Left(results) - } + let results: Vec<_> = aligner.align_reads(num_threads, data); + results + .iter() + .for_each(|(ali1, ali2)| align_qc.add(&header, ali1, ali2.as_ref()).unwrap()); + progress_bar.update(results.len()).unwrap(); + results }) } @@ -370,7 +143,7 @@ impl FastqProcessor { let mut fq_reader: AnnotatedFastqReader = self .assay - .get_index_by_modality(modality) + .get_segments_by_modality(modality) .filter_map(|(read, index)| { let annotator = FastqAnnotator::new(read, index, &whitelists, corrector.clone())?; Some((annotator, read.open().unwrap())) @@ -388,15 +161,15 @@ impl FastqProcessor { fn count( read: &Read, - barcode_region_index: RegionIndex, + barcode_region_index: SegmentInfo, whitelist: &mut Whitelist, ) -> Result<()> { let range = &barcode_region_index - .index + .segments .iter() - .find(|x| x.1.is_barcode()) + .find(|x| x.region_type.is_barcode()) .unwrap() - .2; + .range; read.open().unwrap().records().for_each(|record| { let mut record = record.unwrap(); record = slice_fastq_record(&record, range.start as usize, range.end as usize); @@ -410,8 +183,13 @@ impl FastqProcessor { for (i, (read, barcode_region_index)) in self .assay - .get_index_by_modality(modality) - .filter(|(_, region_index)| region_index.index.iter().any(|x| x.1.is_barcode())) + .get_segments_by_modality(modality) + .filter(|(_, region_index)| { + region_index + .segments + .iter() + .any(|x| x.region_type.is_barcode()) + }) .enumerate() { count(read, barcode_region_index, &mut whitelists[i])?; @@ -457,10 +235,16 @@ impl FastqProcessor { pub struct AnnotatedFastqReader { buffer: fastq::Record, total_reads: Option, + trim_poly_a: bool, inner: Vec<(FastqAnnotator, fastq::Reader>)>, } impl AnnotatedFastqReader { + pub fn with_polya_trimmed(mut self) -> Self { + self.trim_poly_a = true; + self + } + pub fn get_all_barcodes(&self) -> Vec<(&str, usize)> { self.inner .iter() @@ -468,8 +252,8 @@ impl AnnotatedFastqReader { annotator .subregions .iter() - .filter(|(_, region_type, _)| region_type.is_barcode()) - .map(|(id, _, r)| (id.as_str(), r.len())) + .filter(|info| info.region_type.is_barcode()) + .map(|info| (info.region_id.as_str(), info.range.len())) }) .collect() } @@ -481,8 +265,8 @@ impl AnnotatedFastqReader { annotator .subregions .iter() - .filter(|(_, region_type, _)| region_type.is_umi()) - .map(|(id, _, r)| (id.as_str(), r.len())) + .filter(|info| info.region_type.is_umi()) + .map(|info| (info.region_id.as_str(), info.range.len())) }) .collect() } @@ -491,8 +275,8 @@ impl AnnotatedFastqReader { let mut has_read1 = false; let mut has_read2 = false; self.inner.iter().for_each(|x| { - x.0.subregions.iter().for_each(|(_, region_type, _)| { - if region_type.is_target() { + x.0.subregions.iter().for_each(|info| { + if info.region_type.is_target() { if x.0.is_reverse { has_read1 = true; } else { @@ -513,12 +297,13 @@ impl FromIterator<(FastqAnnotator, fastq::Reader>)> for Annotat buffer: fastq::Record::default(), total_reads: None, inner: iter.into_iter().collect(), + trim_poly_a: false, } } } impl Iterator for AnnotatedFastqReader { - type Item = AnnotatedRecord; + type Item = AnnotatedFastq; fn next(&mut self) -> Option { let mut missing = None; @@ -544,27 +329,27 @@ impl Iterator for AnnotatedFastqReader { } else if missing.is_some() { panic!("Missing records in this file: {}", missing.unwrap()); } else { - Some( - records - .into_iter() - .reduce(|mut this, other| { - this.join(other); - this - }) - .unwrap(), - ) + let result = records + .into_iter() + .reduce(|mut this, other| { + this.join(other); + this + }) + .unwrap(); + Some(result) } } } /// A FastqAnnotator that splits the reads into subregions, e.g., barcode, UMI, and /// return annotated reads. +#[derive(Debug)] struct FastqAnnotator { whitelists: IndexMap, corrector: BarcodeCorrector, id: String, is_reverse: bool, - subregions: Vec<(String, RegionType, Range)>, + subregions: Vec, min_len: usize, max_len: usize, } @@ -572,23 +357,25 @@ struct FastqAnnotator { impl FastqAnnotator { pub fn new( read: &Read, - index: RegionIndex, + index: SegmentInfo, whitelists: &IndexMap, corrector: BarcodeCorrector, ) -> Option { let subregions: Vec<_> = index - .index + .segments .into_iter() - .filter(|x| x.1.is_barcode() || x.1.is_umi() || x.1.is_target()) // only barcode and target regions + .filter(|x| { + x.region_type.is_barcode() || x.region_type.is_umi() || x.region_type.is_target() + }) // only barcode and target regions .collect(); if subregions.is_empty() { None } else { let whitelists = subregions .iter() - .flat_map(|(id, _, _)| { - let v = whitelists.get(id)?; - Some((id.clone(), v.get_barcode_counts().clone())) + .flat_map(|info| { + let v = whitelists.get(&info.region_id)?; + Some((info.region_id.clone(), v.get_barcode_counts().clone())) }) .collect(); let anno = Self { @@ -604,7 +391,7 @@ impl FastqAnnotator { } } - fn annotate(&self, record: &fastq::Record) -> Result { + fn annotate(&self, record: &fastq::Record) -> Result { let n = record.sequence().len(); if n < self.min_len || n > self.max_len { bail!( @@ -619,43 +406,51 @@ impl FastqAnnotator { let mut umi = None; let mut read1 = None; let mut read2 = None; - self.subregions.iter().for_each(|(id, region_type, range)| { - let mut fq = slice_fastq_record(record, range.start as usize, range.end as usize); - if self.is_reverse && (region_type.is_barcode() || region_type.is_umi()) { + self.subregions.iter().for_each(|info| { + let mut fq = + slice_fastq_record(record, info.range.start as usize, info.range.end as usize); + if self.is_reverse && (info.region_type.is_barcode() || info.region_type.is_umi()) { fq = rev_compl_fastq_record(fq); } - if region_type.is_umi() { + if info.region_type.is_umi() { umi = Some(fq); - } else if region_type.is_barcode() { - let corrected = - self.whitelists - .get(id) - .map_or(Some(fq.sequence().to_vec()), |counts| { - self.corrector - .correct(counts, fq.sequence(), fq.quality_scores()) - .ok() - .map(|x| x.to_vec()) - }); + } else if info.region_type.is_barcode() { + let corrected = self.whitelists.get(&info.region_id).map_or( + Some(fq.sequence().to_vec()), + |counts| { + self.corrector + .correct(counts, fq.sequence(), fq.quality_scores()) + .ok() + .map(|x| x.to_vec()) + }, + ); if let Some(bc) = &mut barcode { bc.extend(&Barcode { raw: fq, corrected }); } else { barcode = Some(Barcode { raw: fq, corrected }); } - } else if region_type.is_target() { - if self.is_reverse { - if let Some(s) = &mut read2 { - extend_fastq_record(s, &fq); - } else { - read2 = Some(fq); - } - } else if let Some(s) = &mut read1 { - extend_fastq_record(s, &fq); + } else if info.region_type.is_target() { + if read1.is_some() || read2.is_some() { + panic!("Both Read1 and Read2 are set"); } else { - read1 = Some(fq); + if let Some(nucl) = info.region_type.poly_nucl() { + if let Some(idx) = trim_poly_nucleotide(nucl, fq.sequence().iter().copied()) + { + fq = slice_fastq_record(&fq, idx, fq.sequence().len()); + } + } + // Only keep reads with length >= 8 + if fq.sequence().len() >= 8 { + if self.is_reverse { + read2 = Some(fq); + } else { + read1 = Some(fq); + } + } } } }); - Ok(AnnotatedRecord { + Ok(AnnotatedFastq { barcode, umi, read1, @@ -664,6 +459,7 @@ impl FastqAnnotator { } } +#[derive(Debug)] pub struct Barcode { pub raw: fastq::Record, pub corrected: Option>, @@ -685,14 +481,15 @@ impl Barcode { pub type UMI = fastq::Record; /// An annotated fastq record with barcode, UMI, and sequence. -pub struct AnnotatedRecord { +#[derive(Debug)] +pub struct AnnotatedFastq { pub barcode: Option, pub umi: Option, pub read1: Option, pub read2: Option, } -impl AnnotatedRecord { +impl AnnotatedFastq { /// The total number of bases, including read1 and read2, in the record. pub fn len(&self) -> usize { self.read1.as_ref().map_or(0, |x| x.sequence().len()) @@ -703,7 +500,7 @@ impl AnnotatedRecord { } } -impl AnnotatedRecord { +impl AnnotatedFastq { pub fn join(&mut self, other: Self) { if let Some(bc) = &mut self.barcode { if let Some(x) = other.barcode.as_ref() { @@ -741,7 +538,7 @@ impl AnnotatedRecord { pub struct VectorChunk { inner: I, - chunk_size: usize, // The maximum number of bases in a chunk + chunk_size: usize, // The maximum number of bases in a chunk } impl VectorChunk { @@ -750,7 +547,7 @@ impl VectorChunk { } } -impl> Iterator for VectorChunk { +impl> Iterator for VectorChunk { type Item = Vec; fn next(&mut self) -> Option { @@ -773,26 +570,6 @@ impl> Iterator for VectorChunk { } } -fn add_cell_barcode( - record_buf: &mut RecordBuf, - ori_barcode: &[u8], - ori_qual: &[u8], - correct_barcode: Option<&[u8]>, -) { - let data = record_buf.data_mut(); - data.insert( - Tag::CELL_BARCODE_SEQUENCE, - Value::String(ori_barcode.into()), - ); - data.insert( - Tag::CELL_BARCODE_QUALITY_SCORES, - Value::String(ori_qual.into()), - ); - if let Some(barcode) = correct_barcode { - data.insert(Tag::CELL_BARCODE_ID, Value::String(barcode.into())); - } -} - fn slice_fastq_record(record: &fastq::Record, start: usize, end: usize) -> fastq::Record { fastq::Record::new( record.definition().clone(), @@ -849,14 +626,14 @@ impl<'a, R: std::io::Read> NameCollatedRecords<'a, R> { } impl<'a, R: std::io::Read> Iterator for NameCollatedRecords<'a, R> { - type Item = (bam::Record, bam::Record); + type Item = (MultiMap, MultiMap); fn next(&mut self) -> Option { let record = self.records.next()?.unwrap(); let name = record.name().unwrap().to_owned(); if let Some((prev_name, prev_record)) = self.prev_record.take() { if name == prev_name { - Some((prev_record, record)) + Some((prev_record.into(), record.into())) } else { panic!( "Expecting paired end reads with the same name, found {} and {}", @@ -873,23 +650,26 @@ impl<'a, R: std::io::Read> Iterator for NameCollatedRecords<'a, R> { #[cfg(test)] mod tests { - use bwa_mem2::{AlignerOpts, FMIndex, PairedEndStats}; + use bwa_mem2::{AlignerOpts, BurrowsWheelerAligner, FMIndex, PairedEndStats}; use super::*; #[test] fn test_seqspec_io() { let spec = Assay::from_path("tests/data/spec.yaml").unwrap(); - let aligner = BurrowsWheelerAligner::new( + let mut aligner = BurrowsWheelerAligner::new( FMIndex::read("tests/data/hg38").unwrap(), AlignerOpts::default(), PairedEndStats::default(), ); - let mut processor = FastqProcessor::new(spec, aligner).with_modality(Modality::ATAC); + let mut processor = FastqProcessor::new(spec).with_modality(Modality::ATAC); - processor.gen_barcoded_alignments(8, 50000).take(6).for_each(|x| { - println!("{:?}", x); - }); + processor + .gen_barcoded_alignments(&mut aligner, 8, 50000) + .take(6) + .for_each(|x| { + println!("{:?}", x); + }); println!("{}", processor.get_report()); } diff --git a/precellar/src/align/mod.rs b/precellar/src/align/mod.rs new file mode 100644 index 0000000..3ebdacc --- /dev/null +++ b/precellar/src/align/mod.rs @@ -0,0 +1,5 @@ +mod aligners; +mod fastq; + +pub use aligners::{Aligner, BurrowsWheelerAligner, MultiMap, MultiMapR, StarAligner, read_transcriptome_star}; +pub use fastq::{extend_fastq_record, Barcode, FastqProcessor, NameCollatedRecords}; diff --git a/precellar/src/barcode.rs b/precellar/src/barcode.rs index 64c4e07..2a0e97e 100644 --- a/precellar/src/barcode.rs +++ b/precellar/src/barcode.rs @@ -1,4 +1,9 @@ +use anyhow::Result; use core::f64; +use noodles::sam::alignment::{ + record::data::field::{Tag, Value}, + Record, +}; use std::{ collections::HashMap, ops::{Deref, DerefMut}, @@ -278,7 +283,7 @@ pub struct BarcodeCorrector { /// exceeds this threshold, the barcode will be corrected. bc_confidence_threshold: f64, /// The number of mismatches allowed in barcode - max_mismatch: usize, + max_mismatch: usize, } impl Default for BarcodeCorrector { @@ -346,3 +351,25 @@ fn error_probability(qual: u8) -> f64 { let offset = 33.0; // Illumina quality score offset 10f64.powf(-((qual as f64 - offset) / 10.0)) } + +pub(crate) fn get_barcode(rec: &R) -> Result> { + Ok(rec + .data() + .get(&Tag::CELL_BARCODE_ID) + .transpose()? + .and_then(|x| match x { + Value::String(barcode) => Some(barcode.to_string()), + _ => None, + })) +} + +pub(crate) fn get_umi(rec: &R) -> Result> { + Ok(rec + .data() + .get(&Tag::UMI_SEQUENCE) + .transpose()? + .and_then(|x| match x { + Value::String(umi) => Some(umi.to_string()), + _ => None, + })) +} diff --git a/precellar/src/fragment.rs b/precellar/src/fragment.rs index 0bf0dc7..27760ba 100644 --- a/precellar/src/fragment.rs +++ b/precellar/src/fragment.rs @@ -1,12 +1,11 @@ -mod deduplicate; +mod de_dups; use anyhow::Result; use bed_utils::{ bed::{BEDLike, ParseError, Strand}, extsort::ExternalSorterBuilder, }; -use deduplicate::{remove_duplicates, AlignmentInfo}; -use either::Either; +use de_dups::{remove_duplicates, AlignmentInfo}; use itertools::Itertools; use noodles::sam::{ alignment::{record::Flags, Record}, @@ -16,6 +15,8 @@ use rayon::prelude::ParallelSliceMut; use serde::{Deserialize, Serialize}; use std::path::PathBuf; +use crate::align::MultiMap; + pub type CellBarcode = String; /// Fragments from single-cell ATAC-seq experiment. Each fragment is represented @@ -173,25 +174,25 @@ impl FragmentGenerator { impl FnMut(&AlignmentInfo) -> String + 'a, > where - I: Iterator, Vec<(R, R)>>> + 'a, + I: Iterator, Option>)>> + 'a, R: Record + 'a, { - let data = records.flat_map(|x| match x { - Either::Left(chunk) => Box::new(chunk.into_iter().flat_map(|r| { - if filter_read(&r, self.mapq) { - AlignmentInfo::from_read(&r, header).unwrap() + let data = records.flat_map(|chunk| + chunk.into_iter().flat_map(|(r1, r2)| if r2.is_some() { + let r2 = r2.unwrap(); + if filter_read_pair((&r1.primary, &r2.primary), self.mapq) { + AlignmentInfo::from_read_pair((&r1.primary, &r2.primary), header).unwrap() } else { None } - })) as Box>, - Either::Right(chunk) => Box::new(chunk.into_iter().flat_map(|(r1, r2)| { - if filter_read_pair((&r1, &r2), self.mapq) { - AlignmentInfo::from_read_pair((&r1, &r2), header).unwrap() + } else { + if filter_read(&r1.primary, self.mapq) { + AlignmentInfo::from_read(&r1.primary, header).unwrap() } else { None } - })) as Box>, - }); + } + )); let sorted = sort_by_barcode(data, self.temp_dir.clone(), self.chunk_size); UniqueFragments { diff --git a/precellar/src/fragment/deduplicate.rs b/precellar/src/fragment/de_dups.rs similarity index 94% rename from precellar/src/fragment/deduplicate.rs rename to precellar/src/fragment/de_dups.rs index f7e97f7..30f7052 100644 --- a/precellar/src/fragment/deduplicate.rs +++ b/precellar/src/fragment/de_dups.rs @@ -24,14 +24,12 @@ use itertools::Itertools; use noodles::sam::alignment::record::cigar::op::Kind; use noodles::sam::alignment::record::Flags; use noodles::sam::alignment::Record; -use noodles::sam::{ - alignment::record::data::field::{Tag, Value}, - Header, -}; +use noodles::sam::Header; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::hash::Hash; +use crate::barcode::{get_barcode, get_umi}; use crate::fragment::Fragment; // Library type orientation Vizualization according to first strand @@ -340,26 +338,4 @@ where .or_insert(Fragment::from((ali, 1))); }); result -} - -fn get_barcode(rec: &R) -> Result> { - Ok(rec - .data() - .get(&Tag::CELL_BARCODE_ID) - .transpose()? - .and_then(|x| match x { - Value::String(barcode) => Some(barcode.to_string()), - _ => None, - })) -} - -fn get_umi(rec: &R) -> Result> { - Ok(rec - .data() - .get(&Tag::UMI_ID) - .transpose()? - .and_then(|x| match x { - Value::String(umi) => Some(umi.to_string()), - _ => None, - })) -} +} \ No newline at end of file diff --git a/precellar/src/lib.rs b/precellar/src/lib.rs index d539e7a..c66272e 100644 --- a/precellar/src/lib.rs +++ b/precellar/src/lib.rs @@ -1,5 +1,7 @@ pub mod barcode; pub mod align; +pub mod transcript; pub mod fragment; pub mod qc; -pub mod utils; \ No newline at end of file +pub mod utils; +pub mod adapter; \ No newline at end of file diff --git a/precellar/src/qc.rs b/precellar/src/qc.rs index 7400dc4..1fea892 100644 --- a/precellar/src/qc.rs +++ b/precellar/src/qc.rs @@ -4,7 +4,9 @@ use noodles::sam::alignment::{record::data::field::tag::Tag, Record}; use std::collections::{HashMap, HashSet}; use std::fmt::Display; use std::ops::{Deref, DerefMut}; +use anyhow::Result; +use crate::align::MultiMap; use crate::fragment::Fragment; #[derive(Debug, Default, Clone)] @@ -45,112 +47,99 @@ impl Display for Metrics { } } -/// Alignment record statistics. #[derive(Debug, Default)] -pub struct FlagStat { - pub read: u64, - pub primary: u64, - pub secondary: u64, - pub supplementary: u64, - pub duplicate: u64, - pub primary_duplicate: u64, - pub mapped: u64, - pub primary_mapped: u64, - pub paired: u64, - pub read_1: u64, - pub read_2: u64, - pub proper_pair: u64, - pub mate_mapped: u64, - pub singleton: u64, - pub mate_reference_sequence_id_mismatch: u64, +struct AlignStat { + total: u64, // Total number of reads + mapped: u64, // Number of mapped reads + high_quality: u64, // Number of high-quality mapped reads: unique, non-duplicate, and mapping quality >= 30 + multimapped: u64, // Number of reads with multiple alignments + duplicate: u64, // Number of duplicate reads } -impl FlagStat { - pub fn add(&mut self, other: &FlagStat) { - self.read += other.read; - self.primary += other.primary; - self.secondary += other.secondary; - self.supplementary += other.supplementary; - self.duplicate += other.duplicate; - self.primary_duplicate += other.primary_duplicate; - self.mapped += other.mapped; - self.primary_mapped += other.primary_mapped; - self.paired += other.paired; - self.read_1 += other.read_1; - self.read_2 += other.read_2; - self.proper_pair += other.proper_pair; - self.mate_mapped += other.mate_mapped; - self.singleton += other.singleton; - self.mate_reference_sequence_id_mismatch += other.mate_reference_sequence_id_mismatch; - } - - pub fn update(&mut self, header: &sam::Header, record: &R) { - self.read += 1; - let flags = record.flags().unwrap(); - +impl AlignStat { + pub fn add(&mut self, record: &MultiMap) -> Result<()> { + self.total += 1; + let flags = record.primary.flags()?; if flags.is_duplicate() { self.duplicate += 1; } - if !flags.is_unmapped() { self.mapped += 1; + if record.others.is_some() { + self.multimapped += 1; + } else { + let q = record.primary.mapping_quality().transpose()?.map(|x| x.get()).unwrap_or(60); + if q >= 30 { + self.high_quality += 1; + } + } } + Ok(()) + } - if flags.is_secondary() { - self.secondary += 1; - } else if flags.is_supplementary() { - self.supplementary += 1; - } else { - self.primary += 1; + pub fn combine(&mut self, other: &Self) { + self.total += other.total; + self.mapped += other.mapped; + self.high_quality += other.high_quality; + self.multimapped += other.multimapped; + self.duplicate += other.duplicate; + } +} - if !flags.is_unmapped() { - self.primary_mapped += 1; - } +#[derive(Debug, Default)] +struct PairAlignStat { + read1: AlignStat, + read2: AlignStat, + proper_pairs: u64, +} - if flags.is_duplicate() { - self.primary_duplicate += 1; - } +impl PairAlignStat { + fn total_reads(&self) -> u64 { + self.read1.total + self.read2.total + } - if flags.is_segmented() { - self.paired += 1; + fn total_pairs(&self) -> u64 { + self.read2.total + } - if flags.is_first_segment() { - self.read_1 += 1; - } + fn total_mapped(&self) -> u64 { + self.read1.mapped + self.read2.mapped + } - if flags.is_last_segment() { - self.read_2 += 1; - } + fn total_high_quality(&self) -> u64 { + self.read1.high_quality + self.read2.high_quality + } - if !flags.is_unmapped() { - if flags.is_properly_segmented() { - self.proper_pair += 1; - } - - if flags.is_mate_unmapped() { - self.singleton += 1; - } else { - self.mate_mapped += 1; - let rec_id = record.mate_reference_sequence_id(header).unwrap().unwrap(); - let mat_id = record.reference_sequence_id(header).unwrap().unwrap(); - - if mat_id != rec_id { - self.mate_reference_sequence_id_mismatch += 1; - } - } - } - } + fn total_duplicate(&self) -> u64 { + self.read1.duplicate + self.read2.duplicate + } + + fn add(&mut self, record: &MultiMap) -> Result<()> { + self.read1.add(record) + } + + fn add_pair(&mut self, record1: &MultiMap, record2: &MultiMap) -> Result<()> { + self.read1.add(record1)?; + self.read2.add(record2)?; + if record1.primary.flags()?.is_properly_segmented() { + self.proper_pairs += 1; } + Ok(()) + } + + fn combine(&mut self, other: &Self) { + self.read1.combine(&other.read1); + self.read2.combine(&other.read2); + self.proper_pairs += other.proper_pairs; } } #[derive(Debug, Default)] pub struct AlignQC { pub(crate) mito_dna: HashSet, // Mitochondrial DNA reference sequence IDs - pub(crate) all_reads_flagstat: FlagStat, - pub(crate) barcoded_reads_flagstat: FlagStat, - pub(crate) hq_flagstat: FlagStat, - pub(crate) mito_flagstat: FlagStat, + stat_all: PairAlignStat, + stat_barcoded: PairAlignStat, + stat_mito: PairAlignStat, pub(crate) num_read1_bases: u64, pub(crate) num_read1_q30_bases: u64, pub(crate) num_read2_bases: u64, @@ -162,82 +151,58 @@ impl AlignQC { self.mito_dna.insert(mito_dna); } - pub fn update(&mut self, record: &R, header: &sam::Header) { - let mut flagstat = FlagStat::default(); - flagstat.update(header, record); - if flagstat.paired == 1 && flagstat.read_2 == 1 { - self.num_read2_bases += record.sequence().len() as u64; - self.num_read2_q30_bases += record + pub fn add(&mut self, header: &sam::Header, record1: &MultiMap, record2: Option<&MultiMap>) -> Result<()> { + let mut stat= PairAlignStat::default(); + + self.num_read1_bases += record1.primary.sequence().len() as u64; + self.num_read1_q30_bases += record1.primary + .quality_scores() + .iter() + .filter(|s| s.as_ref().map(|x| *x >= 30).unwrap_or(false)) + .count() as u64; + + if let Some(record2) = record2 { + self.num_read2_bases += record2.primary.sequence().len() as u64; + self.num_read2_q30_bases += record2.primary .quality_scores() .iter() .filter(|s| s.as_ref().map(|x| *x >= 30).unwrap_or(false)) .count() as u64; + stat.add_pair(record1, record2)?; } else { - self.num_read1_bases += record.sequence().len() as u64; - self.num_read1_q30_bases += record - .quality_scores() - .iter() - .filter(|s| s.as_ref().map(|x| *x >= 30).unwrap_or(false)) - .count() as u64; - } - - self.all_reads_flagstat.add(&flagstat); - let is_hq = record - .mapping_quality() - .map_or(true, |x| x.unwrap().get() >= 30); - if is_hq { - self.hq_flagstat.add(&flagstat); + stat.add(record1)?; } - if record + self.stat_all.combine(&stat); + + if record1.primary .data() .get(&Tag::CELL_BARCODE_ID) .transpose() .unwrap() .is_some() { - self.barcoded_reads_flagstat.add(&flagstat); - if let Some(rid) = record.reference_sequence_id(header) { - if is_hq && self.mito_dna.contains(&rid.unwrap()) { - self.mito_flagstat.add(&flagstat); + self.stat_barcoded.combine(&stat); + if let Some(rid) = record1.primary.reference_sequence_id(header) { + if self.mito_dna.contains(&rid.unwrap()) { + self.stat_mito.combine(&stat); } } } + Ok(()) } pub fn report(&self, metric: &mut Metrics) { - let flagstat_all = &self.all_reads_flagstat; - let flagstat_barcoded = &self.barcoded_reads_flagstat; - let num_reads = flagstat_all.read; - let num_pairs = flagstat_all.paired / 2; - let num_barcoded_reads = flagstat_barcoded.read; - let num_barcoded_pairs = flagstat_barcoded.paired / 2; - let mapped_pairs = flagstat_barcoded.mate_mapped / 2; - let is_paired = num_pairs > 0; - - let fraction_unmapped = if is_paired { - 1.0 - mapped_pairs as f64 / num_barcoded_pairs as f64 - } else { - 1.0 - flagstat_barcoded.mapped as f64 / num_barcoded_reads as f64 - }; - let valid_barcode = if is_paired { - num_barcoded_pairs as f64 / num_pairs as f64 - } else { - num_barcoded_reads as f64 / num_reads as f64 - }; - let fraction_confidently_mapped = if is_paired { - (self.hq_flagstat.paired / 2) as f64 / num_pairs as f64 - } else { - self.hq_flagstat.read as f64 / num_reads as f64 - }; - let fraction_nonnuclear = if is_paired { - (self.mito_flagstat.paired / 2) as f64 / num_pairs as f64 - } else { - self.mito_flagstat.read as f64 / num_reads as f64 - }; + let stat_all = &self.stat_all; + let stat_barcoded = &self.stat_barcoded; + + let fraction_unmapped = 1.0 - stat_barcoded.total_mapped() as f64 / stat_barcoded.total_reads() as f64; + let valid_barcode = stat_barcoded.total_reads() as f64 / stat_all.total_reads() as f64; + let fraction_confidently_mapped = stat_barcoded.total_high_quality() as f64 / stat_barcoded.total_reads() as f64; + let fraction_nonnuclear = self.stat_mito.total_reads() as f64 / stat_barcoded.total_reads() as f64; - metric.insert("sequenced_reads".to_string(), num_reads as f64); - metric.insert("sequenced_read_pairs".to_string(), num_pairs as f64); + metric.insert("sequenced_reads".to_string(), stat_all.total_reads() as f64); + metric.insert("sequenced_read_pairs".to_string(), stat_all.total_pairs() as f64); metric.insert( "frac_q30_bases_read1".to_string(), self.num_read1_q30_bases as f64 / self.num_read1_bases as f64, diff --git a/precellar/src/transcript/annotate.rs b/precellar/src/transcript/annotate.rs new file mode 100644 index 0000000..49f624a --- /dev/null +++ b/precellar/src/transcript/annotate.rs @@ -0,0 +1,470 @@ +use crate::align::MultiMapR; +/// This module provides utilities for annotating bam records by mapping them to a transcriptome. +/// It supports both single-end and paired-end alignments and uses transcript annotations for gene-level +/// and exon-level classification. +use crate::transcript::{Gene, SpliceSegments, Transcript}; + +use anyhow::{ensure, Result}; +use bed_utils::bed::map::GIntervalMap; +use bed_utils::bed::GenomicRange; +use noodles::gtf::record::strand::Strand; +use noodles::sam; +use noodles::sam::alignment::record_buf::Cigar; +use noodles::sam::alignment::record_buf::RecordBuf; +use serde::{Deserialize, Serialize}; +use std::cmp; +use std::collections::{BTreeMap, HashSet}; + +#[derive(Eq, PartialEq, Debug)] +pub struct Annotation { + pub aln_sense: Vec, + pub aln_antisense: Vec, + pub genes: Vec, + pub region: AnnotationRegion, + pub rescued: bool, + pub chrom: String, +} + +/// Represents an annotated BAM record. +#[derive(Debug)] +pub enum AnnotatedAlignment { + /// Represents a single-end mapped read with annotation. + SeMapped(Annotation), + /// Represents a paired-end mapped read with annotations for each end. + PeMapped( + Annotation, + Annotation, + PairAnnotationData, + ), +} + +impl AnnotatedAlignment { + /// Returns references to the annotations for single-end or paired-end reads. + pub fn annotation(&self) -> (Option<&Annotation>, Option<&Annotation>) { + match self { + AnnotatedAlignment::SeMapped(ref anno) => (Some(anno), None), + AnnotatedAlignment::PeMapped(ref anno1, ref anno2, _) => { + (Some(anno1), Some(anno2)) + } + } + } + + /// Marks the alignment as rescued. + fn set_rescued(&mut self) { + match self { + AnnotatedAlignment::SeMapped(ref mut anno) => anno.rescued = true, + AnnotatedAlignment::PeMapped(ref mut anno1, ref mut anno2, _) => { + anno1.rescued = true; + anno2.rescued = true; + } + } + } +} + +/// Manages the annotation of alignments using transcriptome data. +#[derive(Debug, Clone)] +pub struct AlignmentAnnotator { + /// Map of genomic intervals to transcripts. + pub(crate) transcripts: GIntervalMap, + /// Indicates the strandedness of the chemistry used in the experiment. + chemistry_strandedness: Strand, + /// Number of bases to trim from intergenic alignments. + intergenic_trim_bases: u64, + /// Number of bases to trim from intronic alignments. + intronic_trim_bases: u64, + /// Number of bases to trim from junction alignments. + junction_trim_bases: u64, + /// Minimum overlap fraction required for a region to be considered mapped. + region_min_overlap: f64, +} + +impl AlignmentAnnotator { + /// Creates a new `AlignmentAnnotator` with the provided transcripts. + pub fn new(transcripts: impl IntoIterator) -> Self { + let transcripts = transcripts + .into_iter() + .map(|x| (GenomicRange::new(&x.chrom, x.start, x.end), x)) + .collect(); + Self { + transcripts, + chemistry_strandedness: Strand::Forward, + intergenic_trim_bases: 0, + intronic_trim_bases: 0, + junction_trim_bases: 0, + region_min_overlap: 0.5, + } + } + + /// Annotate the alignments by mapping them to the transcriptome. If multiple + /// alignments are present, we will try to find the confident ones and promote + /// them to primary. A read may align to multiple transcripts and genes, but + /// it is only considered confidently mapped to the transcriptome if it is + /// mapped to a single gene. The confident alignment will be returned if found. + /// + /// # Arguments + /// * `header` - Reference to the SAM header. + /// * `rec` - Vector of single-end alignment records. + pub fn annotate_alignments_se( + &self, + header: &sam::Header, + rec: MultiMapR, + ) -> Option { + let results = rec + .iter() + .filter_map(|rec| self.annotate_alignment_se(header, rec)) + .collect::>(); + rescue_alignments_se(results) + } + + /// Annotates a batch of paired-end alignments. + /// + /// # Arguments + /// * `header` - Reference to the SAM header. + /// * `rec1` - Vector of first-end alignment records. + /// * `rec2` - Vector of second-end alignment records. + pub fn annotate_alignments_pe( + &self, + header: &sam::Header, + rec1: MultiMapR, + rec2: MultiMapR, + ) -> Option { + let pair_improper = rec1.len() != rec2.len(); + let result: Vec<_> = rec1 + .iter() + .zip(rec2.iter()) + .filter_map(|(r1, r2)| self.annotate_alignment_pe(header, r1, r2)) + .collect(); + rescue_alignments_pe(result) + } + + /// Annotates a single-end alignment record. + fn annotate_alignment_se(&self, header: &sam::Header, rec: &RecordBuf) -> Option { + if rec.flags().is_unmapped() { + None + } else { + let anno = self.annotate_alignment(header, rec).unwrap(); + Some(AnnotatedAlignment::SeMapped(anno)) + } + } + + /// Create annotation for a pair of alignments + fn annotate_alignment_pe( + &self, + header: &sam::Header, + rec1: &RecordBuf, + rec2: &RecordBuf, + ) -> Option { + // STAR _shouldn't_ return pairs where only a single end is mapped, + // but if it does, consider the pair unmapped + if rec1.flags().is_unmapped() || rec2.flags().is_unmapped() { + None + } else { + let anno1 = self.annotate_alignment(header, rec1).unwrap(); + let anno2 = self.annotate_alignment(header, rec2).unwrap(); + let annop = PairAnnotationData::from_pair(&anno1, &anno2); + Some(AnnotatedAlignment::PeMapped(anno1, anno2, annop)) + } + } + + fn annotate_alignment(&self, header: &sam::Header, read: &RecordBuf) -> Result { + ensure!( + !read.flags().is_unmapped(), + "Unmapped alignments cannot be annotated" + ); + let chrom = read.reference_sequence(header).unwrap()?.0; + let chrom = std::str::from_utf8(chrom)?.to_string(); + + let region = GenomicRange::new( + &chrom, + read.alignment_start().unwrap().get() as u64, + read.alignment_end().unwrap().get() as u64, + ); + let alignments = self + .transcripts + .find(®ion) + .flat_map(|(_, transcript)| self.align_to_transcript(read, transcript)) + .collect::>(); + + let mut seen_genes = HashSet::new(); + let mut transcripts = BTreeMap::new(); + let mut antisense = BTreeMap::new(); + let annotation_region; + if alignments.is_empty() { + annotation_region = AnnotationRegion::Intergenic; + } else if alignments.iter().any(|x| x.is_exonic()) { + annotation_region = AnnotationRegion::Exonic; + // Check if there are transcriptome compatible alignments + alignments.into_iter().rev().for_each(|aln| { + if let Some(tx_align) = &aln.exon_align { + match aln.strand { + Strand::Forward => { + // Transcript sense alignment + seen_genes.insert(aln.gene.clone()); + transcripts.insert(tx_align.id.clone(), aln); + } + Strand::Reverse => { + // Transcript anti-sense alignment + antisense.insert(tx_align.id.clone(), aln); + } + } + } + }); + } else { + annotation_region = AnnotationRegion::Intronic; + alignments + .into_iter() + .rev() + .for_each(|aln| match aln.strand { + Strand::Forward => { + seen_genes.insert(aln.gene.clone()); + transcripts.insert(aln.gene.id.clone(), aln); + } + Strand::Reverse => { + antisense.insert(aln.gene.id.clone(), aln); + } + }); + } + + let mut annotation = Annotation { + aln_sense: transcripts.into_values().collect::>(), + aln_antisense: antisense.into_values().collect::>(), + genes: seen_genes.into_iter().collect::>(), + region: annotation_region, + rescued: false, + chrom, + }; + // Sorting this makes life easier later. + annotation.genes.sort_unstable(); + + Ok(annotation) + } + + /// Aligns a read to a transcript and determines the region type (exonic, intronic, or intergenic). + fn align_to_transcript( + &self, + read: &RecordBuf, + transcript: &Transcript, + ) -> Option { + // figure out coordinates + let tx_start = transcript.start; + let tx_end = transcript.end; + let genomic_start = read.alignment_start().unwrap().get() as u64; + let genomic_end = read.alignment_end().unwrap().get() as u64; + let splice_segments = SpliceSegments::from(read); + + let is_exonic = splice_segments.is_exonic(transcript, self.region_min_overlap); + if is_exonic || get_overlap(genomic_start, genomic_end, tx_start, tx_end) >= 1.0 { + // compute strand + let tx_reverse_strand = transcript.strand == Strand::Reverse; + let flags = read.flags(); + let mut read_reverse_strand = flags.is_reverse_complemented(); + if flags.is_segmented() && flags.is_last_segment() { + read_reverse_strand = !read_reverse_strand; + }; + let is_antisense = match self.chemistry_strandedness { + Strand::Forward => tx_reverse_strand != read_reverse_strand, + Strand::Reverse => tx_reverse_strand == read_reverse_strand, + }; + let tx_strand = if is_antisense { + Strand::Reverse + } else { + Strand::Forward + }; + + let gene = transcript.gene.clone(); + let mut alignment = TranscriptAlignment { + gene: gene.clone(), + strand: tx_strand, + exon_align: None, + }; + + if is_exonic { + // compute offsets + let mut tx_offset = transcript.get_offset(genomic_start).unwrap().max(0) as u64; + let tx_length = transcript.len(); + + // align the read to the exons + if let Some((mut tx_cigar, tx_aligned_bases)) = splice_segments.align_junctions( + transcript, + self.junction_trim_bases, + self.intergenic_trim_bases, + self.intronic_trim_bases, + ) { + // flip reverse strand + if tx_reverse_strand { + tx_offset = tx_length - (tx_offset + tx_aligned_bases); + tx_cigar.as_mut().reverse(); + }; + alignment = TranscriptAlignment { + gene, + strand: tx_strand, + exon_align: Some(TxAlignProperties { + id: transcript.id.clone(), + pos: tx_offset, + cigar: tx_cigar, + alen: tx_aligned_bases, + }), + }; + } + } + Some(alignment) + } else { + None + } + } +} + +#[derive(Eq, PartialEq, Debug)] +pub struct PairAnnotationData { + /// Genes associated with the pair of alignments. + pub genes: Vec, +} + +impl PairAnnotationData { + /// Annotate a pair of alignments + /// Take the intersection of the non-empty gene sets of the mates + pub fn from_pair(anno1: &Annotation, anno2: &Annotation) -> PairAnnotationData { + let genes = match (!anno1.genes.is_empty(), !anno2.genes.is_empty()) { + (true, false) => anno1.genes.clone(), + (false, true) => anno2.genes.clone(), + _ if anno1.chrom == anno2.chrom => anno1 + .genes + .iter() + .collect::>() + .intersection(&anno2.genes.iter().collect::>()) + .map(|x| (*x).clone()) + .collect(), + _ => vec![], + }; + PairAnnotationData { genes } + } +} + +/// Use transcriptome alignments to promote a single genome alignment +/// when none are confidently mapped to the genome. +/// Returns true if rescue took place. +fn rescue_alignments_se(mut recs: Vec) -> Option { + let n = recs.len(); + if n == 0 { + None + } else if n == 1 { + recs.pop() + } else { + let mut promote_index: Option = None; + let mut seen_genes = HashSet::new(); + + for (i, rec) in recs.iter().enumerate() { + match rec { + AnnotatedAlignment::SeMapped(anno) => { + // Only consider transcriptomic alignments for rescue + if anno.aln_sense.iter().any(|x| x.is_exonic()) { + let genes = &anno.genes; + // Track which record/record-pair we should promote; + // Take the first record/pair with 1 gene + if genes.len() == 1 { + promote_index = promote_index.or(Some(i)); + } + + // Track number of distinct genes we're aligned to + seen_genes.extend(genes); + } + } + _ => unimplemented!("Only single-end alignments can be rescued"), + } + } + + // The alignment can be rescued if there is only one uniquely mapped gene + if seen_genes.len() == 1 && promote_index.is_some() { + let mut rec = recs.swap_remove(promote_index.unwrap()); + rec.set_rescued(); + Some(rec) + } else { + None + } + } +} + +/// Attempts to rescue paired-end alignments using transcript annotations. +/// Returns true if rescue took place. +fn rescue_alignments_pe(mut pairs: Vec) -> Option { + let n = pairs.len(); + if n == 0 { + None + } else if n == 1 { + pairs.pop() + } else { + // Check if rescue is appropriate and determine which record to promote + let mut seen_genes = HashSet::new(); + let mut promote_index: Option = None; + + for (i, pair) in pairs.iter_mut().enumerate() { + match pair { + AnnotatedAlignment::PeMapped(_, _, anno) => { + let genes = &anno.genes; + + // Track which record/record-pair we should promote; + // Take the first record/pair with 1 gene + if genes.len() == 1 { + promote_index = promote_index.or(Some(i)); + } + + // Track number of distinct genes we're aligned to + seen_genes.extend(genes); + } + _ => unimplemented!(), + } + } + + // The alignment can be rescued if there is only one uniquely mapped gene + if seen_genes.len() == 1 && promote_index.is_some() { + let mut pair = pairs.swap_remove(promote_index.unwrap()); + pair.set_rescued(); + Some(pair) + } else { + None + } + } +} + +#[derive(Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd, Debug, Clone, Copy, Hash)] +pub enum AnnotationRegion { + Exonic, + Intronic, + Intergenic, +} + +#[derive(Eq, PartialEq, Debug)] +pub struct TranscriptAlignment { + pub gene: Gene, + pub strand: Strand, + pub exon_align: Option, +} + +impl TranscriptAlignment { + pub fn is_exonic(&self) -> bool { + self.exon_align.is_some() + } + + pub fn is_intronic(&self) -> bool { + !self.is_exonic() + } +} + +// These quantities are well defined for a valid transcriptomic alignment +#[derive(Eq, PartialEq, Debug)] +pub struct TxAlignProperties { + pub id: String, + pub pos: u64, + pub cigar: Cigar, + pub alen: u64, +} + +/// Fraction of read interval covered by ref interval +fn get_overlap(read_start: u64, read_end: u64, ref_start: u64, ref_end: u64) -> f64 { + let mut overlap_bases = + cmp::min(ref_end, read_end) as f64 - cmp::max(ref_start, read_start) as f64; + if overlap_bases < 0.0 { + overlap_bases = 0.0; + } + overlap_bases / ((read_end - read_start) as f64) +} diff --git a/precellar/src/transcript/de_dups.rs b/precellar/src/transcript/de_dups.rs new file mode 100644 index 0000000..90ec328 --- /dev/null +++ b/precellar/src/transcript/de_dups.rs @@ -0,0 +1,74 @@ +use std::collections::{BTreeMap, HashMap, HashSet}; + +use super::quantification::GeneAlignment; + +type Gene = usize; + +pub fn count_unique_umi(alignments: I) -> BTreeMap +where + I: IntoIterator, +{ + fn get_uniq_counts(counts: HashMap<(Vec, Gene), u64>) -> BTreeMap { + let umi_correction= correct_umis(&counts); + + let mut uniq_counts: HashMap> = HashMap::new(); + counts.keys().for_each(|(umi, gene)| { + let corrected_umi = umi_correction.get(&(umi, *gene)).unwrap_or(umi); + uniq_counts.entry(*gene).or_insert(HashSet::new()).insert(corrected_umi); + }); + + uniq_counts.into_iter().map(|(gene, umis)| (gene, umis.len())).collect() + } + + let mut umigene_counts = HashMap::new(); + alignments.into_iter().for_each(|alignment| { + let gene = alignment.idx; + let umi = alignment.umi.unwrap().into_bytes(); + *umigene_counts.entry((umi, gene)).or_insert(0) += 1u64; + }); + + get_uniq_counts(umigene_counts) +} + +/// Within each gene, correct Hamming-distance-one UMIs +fn correct_umis<'a>(umigene_counts: &'a HashMap<(Vec, Gene), u64>) -> HashMap<(&'a [u8], Gene), Vec> { + let nucs = b"ACGT"; + + let mut corrections = HashMap::new(); + + for ((umi, gene), orig_count) in umigene_counts { + let mut test_umi = umi.clone(); + + let mut best_dest_count = *orig_count; + let mut best_dest_umi = umi.to_vec(); + + for pos in 0..umi.len() { + // Try each nucleotide at this position + for test_char in nucs { + if *test_char == umi[pos] { + // Skip the identitical nucleotide + continue; + } + test_umi[pos] = *test_char; + + // Test for the existence of this mutated UMI + let test_count = *umigene_counts.get(&(test_umi.clone(), *gene)).unwrap_or(&0u64); + + // If there's a 1-HD UMI w/ greater count, move to that UMI. + // If there's a 1-HD UMI w/ equal count, move to the lexicographically larger UMI. + if test_count > best_dest_count + || (test_count == best_dest_count && test_umi > best_dest_umi) + { + best_dest_umi = test_umi.clone(); + best_dest_count = test_count; + } + } + // Reset this position to the unmutated sequence + test_umi[pos] = umi[pos]; + } + if *umi != best_dest_umi { + corrections.insert((umi.as_slice(), *gene), best_dest_umi); + } + } + corrections +} \ No newline at end of file diff --git a/precellar/src/transcript/mod.rs b/precellar/src/transcript/mod.rs new file mode 100644 index 0000000..e0501cd --- /dev/null +++ b/precellar/src/transcript/mod.rs @@ -0,0 +1,8 @@ +mod quantification; +mod annotate; +mod transcriptome; +pub(crate) mod de_dups; + +pub use quantification::Quantifier; +pub use transcriptome::{Transcript, Gene, SpliceSegments, Exon, Exons}; +pub use annotate::{AlignmentAnnotator, AnnotatedAlignment}; \ No newline at end of file diff --git a/precellar/src/transcript/quantification.rs b/precellar/src/transcript/quantification.rs new file mode 100644 index 0000000..2b91719 --- /dev/null +++ b/precellar/src/transcript/quantification.rs @@ -0,0 +1,182 @@ +use bed_utils::extsort::ExternalSorterBuilder; +use indexmap::IndexMap; +use itertools::Itertools; +use noodles::sam::Header; +use serde::{Deserialize, Serialize}; +use std::path::PathBuf; + +use crate::{align::MultiMapR, transcript::Gene}; + +use super::{ + annotate::AnnotationRegion, de_dups::count_unique_umi, AlignmentAnnotator, AnnotatedAlignment, +}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct GeneAlignment { + pub idx: usize, + pub umi: Option, + pub align_type: AnnotationRegion, +} + +#[derive(Debug)] +pub struct Quantifier { + annotator: AlignmentAnnotator, + genes: IndexMap, + temp_dir: Option, + chunk_size: usize, +} + +impl Quantifier { + pub fn new(annotator: AlignmentAnnotator) -> Self { + let genes = annotator + .transcripts + .iter() + .map(|(_, t)| { + let g = t.gene.clone(); + (g.id.clone(), g) + }) + .collect(); + Self { + annotator, + genes, + temp_dir: None, + chunk_size: 50000000, + } + } + + pub fn quantify<'a, I, P>(&'a self, header: &'a Header, records: I, out_dir: P) + where + I: Iterator)>> + 'a, + P: AsRef, + { + // create output files + std::fs::create_dir_all(&out_dir).unwrap(); + + let mut output_feature = seqspec::utils::create_file( + out_dir.as_ref().join("features.tsv.gz"), + Some(seqspec::utils::Compression::Gzip), + Some(7), + 8, + ) + .unwrap(); + self.genes.values().for_each(|g| { + writeln!(output_feature, "{}\t{}", g.id, g.name).unwrap(); + }); + + let mut output_barcode = seqspec::utils::create_file( + out_dir.as_ref().join("barcodes.tsv.gz"), + Some(seqspec::utils::Compression::Gzip), + Some(7), + 8, + ) + .unwrap(); + + let mut output_mat = seqspec::utils::create_file( + out_dir.as_ref().join("matrix.mtx.gz"), + Some(seqspec::utils::Compression::Gzip), + Some(7), + 8, + ) + .unwrap(); + + let mut n_barcodes = 0usize; + let alignments = records.flat_map(|recs| { + recs.into_iter() + .filter_map(|(r1, r2)| self.make_gene_alignment(header, r1, r2)) + }); + let alignments_barcode = + sort_alignments(alignments, self.temp_dir.as_ref(), self.chunk_size) + .chunk_by(|x| x.0.clone()); + let counts = alignments_barcode + .into_iter() + .enumerate() + .flat_map(|(i, (barcode, alignments))| { + n_barcodes += 1; + let counts = count_unique_umi(alignments.map(|(_, a)| a)); + writeln!(output_barcode, "{}", barcode).unwrap(); + counts.into_iter().map(move |(gene, count)| + [gene + 1, i + 1, count] + ) + }) + .collect::>(); + + writeln!(output_mat, "%%MatrixMarket matrix coordinate integer general\n%\n{} {} {}", self.genes.len(), n_barcodes, counts.len()).unwrap(); + for count in counts { + writeln!(output_mat, "{} {} {}", count[0], count[1], count[2]).unwrap(); + } + + } + + fn make_gene_alignment( + &self, + header: &Header, + rec1: MultiMapR, + rec2: Option, + ) -> Option<(String, GeneAlignment)> { + let barcode = rec1.barcode().unwrap()?; + let umi = rec1.umi().unwrap(); + let anno = if let Some(rec2) = rec2 { + self.annotator.annotate_alignments_pe(header, rec1, rec2) + } else { + self.annotator.annotate_alignments_se(header, rec1) + }?; + + let gene_id; + let align_type; + + match anno { + AnnotatedAlignment::PeMapped(a1, a2, anno) => { + let genes = anno.genes; + if genes.len() != 1 { + return None; + } + let gene = genes.iter().next().unwrap(); + align_type = match (a1.region, a2.region) { + (AnnotationRegion::Intronic, _) => AnnotationRegion::Intronic, + (_, AnnotationRegion::Intronic) => AnnotationRegion::Intronic, + _ => AnnotationRegion::Exonic, + }; + gene_id = self.genes.get_full(&gene.id).unwrap().0; + } + AnnotatedAlignment::SeMapped(anno) => { + let genes = anno.genes; + if genes.len() != 1 { + return None; + } + let gene = genes.iter().next().unwrap(); + align_type = anno.region; + gene_id = self.genes.get_full(&gene.id).unwrap().0; + } + } + + let alignment = GeneAlignment { + idx: gene_id, + umi, + align_type, + }; + Some((barcode, alignment)) + } +} + +fn sort_alignments( + alignments: I, + temp_dir: Option

, + chunk_size: usize, +) -> impl ExactSizeIterator +where + I: Iterator, + P: AsRef, +{ + let mut sorter = ExternalSorterBuilder::new() + .with_chunk_size(chunk_size) + .with_compression(2); + if let Some(tmp) = temp_dir { + sorter = sorter.with_tmp_dir(tmp); + } + sorter + .build() + .unwrap() + .sort_by(alignments, |a, b| a.0.cmp(&b.0)) + .unwrap() + .map(|x| x.unwrap()) +} diff --git a/precellar/src/transcript/transcriptome.rs b/precellar/src/transcript/transcriptome.rs new file mode 100644 index 0000000..9a05eaf --- /dev/null +++ b/precellar/src/transcript/transcriptome.rs @@ -0,0 +1,476 @@ +use anyhow::{bail, ensure, Result}; +use noodles::gtf::record::strand::Strand; +use noodles::sam::alignment::record::cigar::op::Kind; +use noodles::sam::alignment::record::cigar::Op; +use noodles::sam::alignment::record_buf::Cigar; +use noodles::sam::alignment::record_buf::RecordBuf; +use std::cmp; + +/// 0-based, half-open +#[derive(Debug, Clone)] +pub struct Transcript { + pub id: String, + pub chrom: String, + pub start: u64, + pub end: u64, // exclusive + pub strand: Strand, + pub gene: Gene, + exons: Exons, +} + +impl TryFrom for Transcript { + type Error = anyhow::Error; + + fn try_from(transcript: star_aligner::transcript::Transcript) -> Result { + let start = transcript.start; + let end = transcript.end; + let strand = match transcript.strand { + star_aligner::transcript::Strand::Forward => Strand::Forward, + star_aligner::transcript::Strand::Reverse => Strand::Reverse, + _ => bail!("Strand must be Forward or Reverse"), + }; + let exons = Exons::new(transcript.exons.iter().map(|exon| { + assert!(exon.start < exon.end, "Exon start must be less than exon end"); + assert!(exon.start >= start, "Exon start must be greater than transcript start"); + assert!(exon.end <= end, "Exon end must be less than transcript end"); + (exon.start, exon.end) + }))?; + Ok(Self { + id: transcript.id, + chrom: transcript.chrom, + start, + end, + strand, + gene: Gene { + id: transcript.gene_id, + name: transcript.gene_name, + }, + exons, + }) + } +} + +impl Transcript { + /// Transcript length is the sum of the lengths of the exons. + pub fn len(&self) -> u64 { + self.exons().iter().map(|x| x.len()).sum() + } + + pub fn exons(&self) -> &[Exon] { + self.exons.as_ref() + } + + /// Convert a coordinate in the genome to a coordinate in the exons/transcript. + /// The coordinate starts at the beginning of the transcript. + pub fn get_offset(&self, coord: u64) -> Option { + let mut cum_len = 0; + for exon in &self.exons.0 { + if coord < exon.end { + return Some((cum_len + coord) as i64 - exon.start as i64); + } + cum_len += exon.len(); + } + None + } +} + +#[derive(Debug, Clone)] +pub struct Exons(Vec); + +impl Exons { + pub fn new(iter: impl IntoIterator) -> Result { + let mut prev_end = None; + let exon = iter + .into_iter() + .map(|(start, end)| { + ensure!( + prev_end.is_none() || start >= prev_end.unwrap(), + "Exons must be non-overlapping and in order" + ); + ensure!( + end > start, + "End coordinate must be greater than start coordinate" + ); + prev_end = Some(end); + Ok(Exon { start, end }) + }) + .collect::>>()?; + Ok(Self(exon)) + } +} + +impl AsRef<[Exon]> for Exons { + fn as_ref(&self) -> &[Exon] { + &self.0 + } +} + +/// Exon coordinates are 0-based, half-open. +#[derive(Eq, PartialEq, Debug, Clone, Ord, PartialOrd)] +pub struct Exon { + start: u64, + end: u64, +} + +impl Exon { + pub fn len(&self) -> u64 { + self.end - self.start + } +} + +#[derive(Hash, Eq, PartialEq, Debug, Clone, Ord, PartialOrd)] +pub struct Gene { + pub id: String, + pub name: String, +} + +/// SpliceSegment represents a contiguous block of cigar operations not containing +/// any "Skip" sections. The SpliceSegment is 0-based, half-open with respect to the reference. +#[derive(Debug)] +struct SpliceSegment { + start: u64, + end: u64, + cigar: Cigar, +} + +/// SpliceSegments is used to represent the alignment of a read to a transcript. +/// It consists of the left and right clipping operations, and a list of SpliceSegments. +pub struct SpliceSegments { + left_clip: Cigar, + right_clip: Cigar, + segments: Vec, +} + +impl SpliceSegments { + /// The leftmost position of all segments. + pub fn start(&self) -> u64 { + self.segments.first().map_or(0, |segment| segment.start) + } + + /// The rightmost position of all segments. + pub fn end(&self) -> u64 { + self.segments.last().map_or(0, |segment| segment.end) + } + + /// Determine if the read aligns to exonic regions of a transcript. A read is considered exonic + /// if it aligns to all exons with at least `min_overlap_frac` overlap. + pub fn is_exonic(&self, transcript: &Transcript, min_overlap_frac: f64) -> bool { + self.segments.iter().all(|segment| { + // find first exon that ends to the right of the segment start + let idx = transcript + .exons() + .binary_search_by_key(&segment.start, |ex| ex.end - 1) + .unwrap_or_else(std::convert::identity); + transcript.exons().get(idx).map_or(false, |exon| { + get_overlap(segment.start, segment.end, exon.start, exon.end) >= min_overlap_frac + }) + }) + } + + /// Align to a transcript. Returns the aligned cigar and the number of aligned bases. + pub fn align_junctions( + &self, + transcript: &Transcript, + tolerance: u64, + intergenic_trim_bases: u64, + intronic_trim_bases: u64, + ) -> Option<(Cigar, u64)> { + let (ex_start, ex_end) = find_exons( + &transcript.exons(), + self.start(), + self.end(), + intergenic_trim_bases, + intronic_trim_bases, + )?; + self._align_junctions_helper(&transcript.exons()[ex_start..=ex_end], tolerance) + } + + /// Align the read to the exons. Returns the aligned cigar and the number of aligned bases. + fn _align_junctions_helper(&self, exons: &[Exon], tolerance: u64) -> Option<(Cigar, u64)> { + // check if the number of segments matches the number of exons + if self.segments.len() != exons.len() { + return None; + } + + let mut full_cigar = self.left_clip.clone(); + let mut aligned_bases = 0; + + for i in 0..self.segments.len() { + let curr_segment = &self.segments[i]; + let curr_exon = &exons[i]; + aligned_bases += curr_exon.len(); + let mut tmp_cigar = curr_segment.cigar.clone(); + + // align the start + let start_diff = curr_exon.start as i64 - curr_segment.start as i64; + if i == 0 { + // first segment + if start_diff > 0 { + // overhang -> softclip + tmp_cigar = mask_read_bases( + &mut tmp_cigar, + Op::new(Kind::SoftClip, start_diff as usize), + false, + ); + } else if start_diff < 0 { + // underhang -> decrement aligned bases + aligned_bases -= start_diff.unsigned_abs(); + } + } else if start_diff.unsigned_abs() > tolerance { + return None; // can't align properly + } else if start_diff > 0 { + // overhang -> mark as insertion + tmp_cigar = mask_read_bases( + &mut tmp_cigar, + Op::new(Kind::Insertion, start_diff as usize), + false, + ); + } else if start_diff < 0 { + // underhang -> mark as deletion + tmp_cigar = mark_deleted_ref_bases( + &mut tmp_cigar, + start_diff.unsigned_abs().try_into().unwrap(), + false, + ); + } + + // align the end + let end_diff = curr_segment.end as i64 - curr_exon.end as i64 - 1; + if i == self.segments.len() - 1 { + // last segment + if end_diff > 0 { + // overhang -> softclip + tmp_cigar = mask_read_bases( + &mut tmp_cigar, + Op::new(Kind::SoftClip, end_diff as usize), + true, + ); + } else if end_diff < 0 { + // underhang -> decrement aligned bases + aligned_bases -= end_diff.unsigned_abs(); + } + } else if end_diff.unsigned_abs() > tolerance { + return None; // can't align properly + } else if end_diff > 0 { + // overhang -> mark as insertion + tmp_cigar = mask_read_bases( + &mut tmp_cigar, + Op::new(Kind::Insertion, end_diff as usize), + true, + ); + } else if end_diff < 0 { + // underhang -> mark as deletion + tmp_cigar = mark_deleted_ref_bases( + &mut tmp_cigar, + end_diff.unsigned_abs().try_into().unwrap(), + true, + ); + } + + // extend + full_cigar.extend(Vec::from(tmp_cigar).into_iter()); + } + full_cigar.extend(self.right_clip.as_ref().iter().copied()); + + Some((full_cigar, aligned_bases)) + } +} + +impl From<&RecordBuf> for SpliceSegments { + fn from(read: &RecordBuf) -> Self { + let cigar = read.cigar(); + let alignment_start = read.alignment_start().unwrap().get(); + + let mut left_clip: Vec = Vec::new(); + let mut right_clip: Vec = Vec::new(); + let mut splice_segments: Vec = Vec::new(); + let mut seen_nonclips = false; // whether we've seen non-clip bases yet + let mut curr_segment = SpliceSegment { + start: alignment_start as u64, + end: alignment_start as u64, + cigar: Vec::new().into(), + }; + + for c in cigar.as_ref() { + match c.kind() { + Kind::HardClip | Kind::SoftClip => { + if seen_nonclips { + right_clip.push(*c); + } else { + left_clip.push(*c); + } + } + Kind::Skip => { + seen_nonclips = true; + let next_start = curr_segment.end + c.len() as u64; + splice_segments.push(curr_segment); + curr_segment = SpliceSegment { + start: next_start, + end: next_start, + cigar: Vec::new().into(), + }; + } + Kind::Insertion => { + seen_nonclips = true; + curr_segment.cigar.as_mut().push(*c); + } + Kind::Match | Kind::Deletion | Kind::SequenceMatch | Kind::SequenceMismatch => { + seen_nonclips = true; + curr_segment.end += c.len() as u64; + curr_segment.cigar.as_mut().push(*c); + } + Kind::Pad => unreachable!(), + } + } + splice_segments.push(curr_segment); + + Self { + left_clip: left_clip.into(), + right_clip: right_clip.into(), + segments: splice_segments, + } + } +} + +/// Fraction of read interval covered by ref interval +fn get_overlap(read_start: u64, read_end: u64, ref_start: u64, ref_end: u64) -> f64 { + let mut overlap_bases = + cmp::min(ref_end, read_end) as f64 - cmp::max(ref_start, read_start) as f64; + if overlap_bases < 0.0 { + overlap_bases = 0.0; + } + overlap_bases / ((read_end - read_start) as f64) +} + +/// Find the exons that the read aligns to. Returns the indices of the first and last exons. +fn find_exons( + exon_info: &[Exon], + read_start: u64, + read_end: u64, // inclusive + intergenic_trim_bases: u64, + intronic_trim_bases: u64, +) -> Option<(usize, usize)> { + // find first exon that ends to the right of the read start + let ex_start = exon_info + .binary_search_by_key(&read_start, |ex| ex.end - 1) + .map_or_else(|i| i, |i| i); + // find first exon that starts to the left of the read end + let ex_end = exon_info + .binary_search_by_key(&read_end, |ex| ex.start) + .map_or_else(|i| if i > 0 { Some(i - 1) } else { None }, |i| Some(i))?; + if ex_start >= exon_info.len() { + return None; + } + + let starting_exon = &exon_info[ex_start]; + let ending_exon = &exon_info[ex_end]; + + if read_start < starting_exon.start { + // read overhangs exon on the left + let overhang = starting_exon.start - read_start; + let trim_bases = if ex_start == 0 { + intergenic_trim_bases + } else { + intronic_trim_bases + }; + if overhang > trim_bases { + // too much overhang + return None; + }; + } + + if read_end > ending_exon.end { + // read overhangs exon on the right + let overhang = read_end - ending_exon.end; + let trim_bases = if ex_end >= exon_info.len() { + intergenic_trim_bases + } else { + intronic_trim_bases + }; + if overhang > trim_bases { + // too much overhang + return None; + }; + } + + Some((ex_start, ex_end)) +} + +fn mask_read_bases(cigar: &mut Cigar, mask: Op, reverse: bool) -> Cigar { + // NOTE: this assumes that refskips have been removed + let mut new_cigar = Vec::new(); + let mask_len = mask.len(); + let mut consumed_bases = 0; + new_cigar.push(mask); + if reverse { + cigar.as_mut().reverse(); + } + for c in cigar.as_ref() { + if consumed_bases < mask_len { + // this op should be masked + let read_bases = match c.kind() { + Kind::Deletion => 0, // deletions don't consume read bases + _ => c.len(), + }; + if consumed_bases + read_bases >= mask_len { + let truncated = Op::new(c.kind(), read_bases + consumed_bases - mask_len); + new_cigar.push(truncated); + }; + consumed_bases += read_bases; + } else { + // just copy the op + new_cigar.push(*c); + }; + } + if reverse { + new_cigar.reverse(); + } + new_cigar.into() +} + +fn mark_deleted_ref_bases(cigar: &mut Cigar, del_len: usize, reverse: bool) -> Cigar { + let del = Op::new(Kind::Deletion, del_len); + if reverse { + let mut new_cigar: Cigar = vec![del].into(); + new_cigar.extend(cigar.as_ref().iter().copied()); + new_cigar + } else { + let mut new_cigar = cigar.clone(); + new_cigar.as_mut().push(del); + new_cigar + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashMap; + + #[allow(dead_code)] + struct TranscriptomeTest { + chrom_starts: Vec, + transcript_info: Vec, + exon_info: Vec, + } + + #[test] + fn test_cigar_segment() { + let cigar = Cigar::from(vec![ + Op::new(Kind::SoftClip, 5), + Op::new(Kind::Match, 10), + Op::new(Kind::Skip, 5), + Op::new(Kind::Match, 10), + Op::new(Kind::SoftClip, 5), + ]); + /* + let (left_clip, right_clip, splice_segments) = get_cigar_segments(&cigar, 5); + assert_eq!(left_clip, Cigar::from(vec![Op::new(Kind::SoftClip, 5)])); + assert_eq!(right_clip, Cigar::from(vec![Op::new(Kind::SoftClip, 5)])); + assert_eq!(splice_segments.len(), 2); + assert_eq!(splice_segments[0].start, 5); + assert_eq!(splice_segments[0].end, 15); + assert_eq!(splice_segments[1].start, 20); + assert_eq!(splice_segments[1].end, 30); + */ + } +} diff --git a/python/Cargo.toml b/python/Cargo.toml index f4c15ca..8112a2d 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -10,16 +10,13 @@ crate-type = ["cdylib"] [dependencies] anyhow = "1.0" -bwa-mem2 = { git = "https://github.com/regulatory-genomics/bwa-mem2-rust.git", rev = "8de06bcc0a2145fd819232ffb2bf100fb795db30" } -star-aligner = { git = "https://github.com/regulatory-genomics/star-aligner", rev = "faef1085eaf26e6e8d5875fcbc641c3af9444d89" } bstr = "1.0" -either = "1.13" itertools = "0.13" noodles = { version = "0.85", features = ["core", "fastq", "bam", "sam", "bgzf"] } -seqspec = { version = "0.1", workspace = true } +seqspec = { workspace = true } serde_yaml = "0.9" termtree = "0.5" -precellar = { version = "0.1", workspace = true } +precellar = { workspace = true } regex = "1.6" log = "0.4" env_logger = "0.11" diff --git a/python/src/aligner.rs b/python/src/aligner.rs new file mode 100644 index 0000000..62804b7 --- /dev/null +++ b/python/src/aligner.rs @@ -0,0 +1,35 @@ +use std::path::Path; + +use noodles::sam; +use precellar::align::{Aligner, BurrowsWheelerAligner, StarAligner}; +use seqspec::Modality; + +pub enum AlignerType { + STAR(StarAligner), + BWA(BurrowsWheelerAligner), +} + +impl AlignerType { + pub fn from_name>(name: &str, path: P) -> Self { + match name.to_lowercase().as_str() { + "star" => AlignerType::STAR(StarAligner::from_path(path)), + "bwa" => AlignerType::BWA(BurrowsWheelerAligner::from_path(path)), + _ => unimplemented!(), + } + } + + pub fn from_modality>(modality: Modality, path: P) -> Self { + match modality { + Modality::RNA => AlignerType::STAR(StarAligner::from_path(path)), + Modality::ATAC => AlignerType::BWA(BurrowsWheelerAligner::from_path(path)), + _ => unimplemented!(), + } + } + + pub fn header(&self) -> sam::Header { + match self { + AlignerType::STAR(aligner) => aligner.header(), + AlignerType::BWA(aligner) => aligner.header(), + } + } +} diff --git a/python/src/lib.rs b/python/src/lib.rs index 90ebbcf..dd0cf6b 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1,23 +1,24 @@ +mod aligner; mod pyseqspec; mod utils; +use aligner::AlignerType; + use anyhow::Result; -use bwa_mem2::{AlignerOpts, BurrowsWheelerAligner, FMIndex, PairedEndStats}; -use either::Either; use itertools::Itertools; use log::info; -use noodles::bam; -use noodles::fastq::io::Writer; -use noodles::{bgzf, sam::alignment::io::Write}; +use noodles::{ + bam, fastq, + sam::{self, alignment::io::Write}, +}; use pyo3::prelude::*; use std::{collections::HashMap, io::BufWriter, path::PathBuf, str::FromStr}; use ::precellar::{ - align::{ - extend_fastq_record, Aligner, Barcode, DummyAligner, FastqProcessor, NameCollatedRecords, - }, + align::{extend_fastq_record, Barcode, FastqProcessor, MultiMapR, NameCollatedRecords}, fragment::FragmentGenerator, qc::{AlignQC, FragmentQC, Metrics}, + transcript::{self, Quantifier}, }; use pyseqspec::Assay; use seqspec::{ @@ -43,8 +44,8 @@ static GLOBAL: Jemalloc = Jemalloc; /// File path to the genome index. #[pyfunction] fn make_genome_index(fasta: PathBuf, genome_prefix: PathBuf) -> Result<()> { - FMIndex::new(fasta, genome_prefix).unwrap(); - Ok(()) + //FMIndex::new(fasta, genome_prefix).unwrap(); + todo!() } /// Align fastq reads to the reference genome and generate unique fragments. @@ -63,12 +64,16 @@ fn make_genome_index(fasta: PathBuf, genome_prefix: PathBuf) -> Result<()> { /// File path to the output bam file. If None, the bam file will not be generated. /// output_fragment: Path | None /// File path to the output fragment file. If None, the fragment file will not be generated. +/// output_quantification: Path | None +/// File path to the directory to store the gene quantifications. If None, the gene quantification will not be generated. /// mito_dna: list[str] /// List of mitochondrial DNA names. /// shift_left: int /// The number of bases to shift the left end of the fragment. /// shift_right: int /// The number of bases to shift the right end of the fragment. +/// aligner: str | None +/// The aligner to use for the alignment. If None, the aligner will be inferred from the modality. /// compression: str | None /// The compression algorithm to use for the output fragment file. /// If None, the compression algorithm will be inferred from the file extension. @@ -79,8 +84,8 @@ fn make_genome_index(fasta: PathBuf, genome_prefix: PathBuf) -> Result<()> { /// num_threads: int /// The number of threads to use. /// chunk_size: int -/// This parameter is used to control the number of bases processed in each chunk. -/// The actual value is determined by: chunk_size * num_threads. +/// This parameter is used to control the number of bases processed in each chunk per thread. +/// The total number of bases in each chunk is determined by: chunk_size * num_threads. /// /// Returns /// ------- @@ -90,16 +95,16 @@ fn make_genome_index(fasta: PathBuf, genome_prefix: PathBuf) -> Result<()> { #[pyo3( signature = ( assay, genome_index, *, - modality, output_bam=None, output_fragment=None, + modality, output_bam=None, output_quantification=None, output_fragment=None, mito_dna=vec!["chrM".to_owned(), "M".to_owned()], - shift_left=4, shift_right=-5, + shift_left=4, shift_right=-5, aligner=None, compression=None, compression_level=None, temp_dir=None, num_threads=8, chunk_size=10000000, ), text_signature = "(assay, genome_index, *, - modality, output_bam=None, output_fragment=None, + modality, output_bam=None, output_quantification=None, output_fragment=None, mito_dna=['chrM', 'M'], - shift_left=4, shift_right=-5, + shift_left=4, shift_right=-5, aligner=None, compression=None, compression_level=None, temp_dir=None, num_threads=8, chunk_size=10000000)", )] @@ -109,10 +114,12 @@ fn align( genome_index: PathBuf, modality: &str, output_bam: Option, + output_quantification: Option, output_fragment: Option, mito_dna: Vec, shift_left: i64, shift_right: i64, + aligner: Option<&str>, compression: Option<&str>, compression_level: Option, temp_dir: Option, @@ -120,23 +127,27 @@ fn align( chunk_size: usize, ) -> Result> { assert!( - output_bam.is_some() || output_fragment.is_some(), - "either output_bam or output_fragment must be provided" + output_bam.is_some() || output_fragment.is_some() || output_quantification.is_some(), + "one of the following parameters must be provided: output_bam, output_fragment, output_quantification" + ); + assert!( + output_fragment.is_none() || output_quantification.is_none(), + "output_fragment and output_quantification cannot be used together" ); let modality = Modality::from_str(modality).unwrap(); - let spec = match assay.extract::() { + let assay = match assay.extract::() { Ok(p) => seqspec::Assay::from_path(&p).unwrap(), _ => assay.extract::>()?.0.clone(), }; - let aligner = BurrowsWheelerAligner::new( - FMIndex::read(genome_index).unwrap(), - AlignerOpts::default(), - PairedEndStats::default(), - ); + let mut aligner = if let Some(name) = aligner { + AlignerType::from_name(name, &genome_index) + } else { + AlignerType::from_modality(modality, &genome_index) + }; let header = aligner.header(); - let mut processor = FastqProcessor::new(spec, aligner) + let mut processor = FastqProcessor::new(assay) .with_modality(modality) .with_barcode_correct_prob(0.9); let mut fragment_qc = FragmentQC::default(); @@ -144,8 +155,29 @@ fn align( processor.add_mito_dna(&x); fragment_qc.add_mito_dna(x); }); + let mut transcript_annotator = None; { + let alignments: Box> = match aligner { + AlignerType::STAR(ref mut aligner) => { + let transcriptome = ::precellar::align::read_transcriptome_star(&genome_index)?; + transcript_annotator = Some(::precellar::transcript::AlignmentAnnotator::new( + transcriptome, + )); + Box::new(processor.gen_barcoded_alignments( + aligner, + num_threads, + num_threads as usize * chunk_size, + )) + } + AlignerType::BWA(ref mut aligner) => Box::new(processor.gen_barcoded_alignments( + aligner, + num_threads, + num_threads as usize * chunk_size, + )), + }; + + // Write alignments let mut bam_writer = output_bam .map(|output| { let mut writer = @@ -154,23 +186,7 @@ fn align( anyhow::Ok(writer) }) .transpose()?; - let alignments = processor - .gen_barcoded_alignments(num_threads, chunk_size) - .map(|data| { - py.check_signals().unwrap(); - if let Some(writer) = &mut bam_writer { - match data.as_ref() { - Either::Left(chunk) => chunk - .iter() - .for_each(|x| writer.write_alignment_record(&header, x).unwrap()), - Either::Right(chunk) => chunk.iter().for_each(|(a, b)| { - writer.write_alignment_record(&header, a).unwrap(); - writer.write_alignment_record(&header, b).unwrap(); - }), - }; - } - data - }); + let alignments = write_alignments(py, &mut bam_writer, &header, alignments); let fragment_writer = output_fragment .as_ref() @@ -181,7 +197,13 @@ fn align( create_file(output, compression, compression_level, num_threads as u32) }) .transpose()?; - if let Some(mut writer) = fragment_writer { + + if let Some(quant_dir) = output_quantification { + // Write gene quantification + let quantifier = Quantifier::new(transcript_annotator.unwrap()); + quantifier.quantify(&header, alignments, quant_dir); + } else if let Some(mut writer) = fragment_writer { + // Write fragments let mut fragment_generator = FragmentGenerator::default(); if let Some(dir) = temp_dir { fragment_generator.set_temp_dir(dir); @@ -198,6 +220,10 @@ fn align( writeln!(writer.as_mut(), "{}", frag).unwrap(); }) }); + } else { + // alignments is a lazy iterator, so we need to consume it if no other + // output is generated. + alignments.for_each(drop); } } @@ -208,6 +234,29 @@ fn align( Ok(report.into()) } +fn write_alignments<'a>( + py: Python<'a>, + bam_writer: &'a mut Option>, + header: &'a sam::Header, + alignments: impl Iterator)>> + 'a, +) -> impl Iterator)>> + 'a { + alignments.map(move |data| { + py.check_signals().unwrap(); + if let Some(writer) = bam_writer.as_mut() { + data.iter().for_each(|(a, b)| { + a.iter() + .for_each(|x| writer.write_alignment_record(&header, x).unwrap()); + b.as_ref().map(|x| { + x.iter() + .for_each(|x| writer.write_alignment_record(&header, x).unwrap()) + }); + }); + } + data + }) +} + +/* #[pyfunction] #[pyo3( signature = ( @@ -258,8 +307,7 @@ fn make_fragment( let chunks = NameCollatedRecords::new(reader.records()) .map(|x| { - align_qc.update(&x.0, &header); - align_qc.update(&x.1, &header); + align_qc.add(&header, &x.0, Some(&x.1)).unwrap(); x }) .chunks(chunk_size); @@ -293,6 +341,7 @@ fn make_fragment( fragment_qc.report(&mut report); Ok(report.into()) } + */ /// Generate consolidated fastq files from the sequencing specification. /// The barcodes and UMIs are concatenated to the read 1 sequence. @@ -315,8 +364,7 @@ fn make_fastq( _ => assay.extract::>()?.0.clone(), }; - let aligner = DummyAligner; - let mut processor = FastqProcessor::new(spec, aligner).with_modality(modality); + let mut processor = FastqProcessor::new(spec).with_modality(modality); let fq_reader = processor.gen_barcoded_fastq(correct_barcode); info!( @@ -332,11 +380,11 @@ fn make_fastq( std::fs::create_dir_all(&out_dir)?; let read1_fq = out_dir.join("R1.fq.zst"); let read1_writer = create_file(read1_fq, Some(Compression::Zstd), None, 8)?; - let mut read1_writer = Writer::new(BufWriter::new(read1_writer)); + let mut read1_writer = fastq::Writer::new(BufWriter::new(read1_writer)); let mut read2_writer = if fq_reader.is_paired_end() { let read2_fq = out_dir.join("R2.fq.zst"); let read2_writer = create_file(read2_fq, Some(Compression::Zstd), None, 8)?; - let read2_writer = Writer::new(BufWriter::new(read2_writer)); + let read2_writer = fastq::Writer::new(BufWriter::new(read2_writer)); Some(read2_writer) } else { None @@ -380,7 +428,7 @@ fn precellar(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_function(wrap_pyfunction!(make_genome_index, m)?)?; m.add_function(wrap_pyfunction!(align, m)?)?; - m.add_function(wrap_pyfunction!(make_fragment, m)?)?; + //m.add_function(wrap_pyfunction!(make_fragment, m)?)?; m.add_function(wrap_pyfunction!(make_fastq, m)?)?; utils::register_submodule(m)?; diff --git a/seqspec/src/lib.rs b/seqspec/src/lib.rs index 8e5d3d1..30af89e 100644 --- a/seqspec/src/lib.rs +++ b/seqspec/src/lib.rs @@ -5,8 +5,7 @@ pub mod utils; use log::warn; use noodles::fastq; use read::ReadValidator; -pub use read::RegionIndex; -pub use read::{File, Read, Strand, UrlType}; +pub use read::{SegmentInfo, SegmentInfoElem, File, Read, Strand, UrlType}; use read::{ReadSpan, ValidateResult}; use region::LibSpec; pub use region::{Onlist, Region, RegionId, RegionType, SequenceType}; @@ -323,14 +322,14 @@ impl Assay { } /// Get the index of atomic regions of each read in the sequence spec. - pub fn get_index_by_modality( + pub fn get_segments_by_modality( &self, modality: Modality, - ) -> impl Iterator { + ) -> impl Iterator { self.sequence_spec.values().filter_map(move |read| { if read.modality == modality { let parent_region_index = self - .get_index(&read.read_id) + .get_segments(&read.read_id) .unwrap_or_else(|| panic!("Cannot find index for Read: {}", read.read_id)); Some((read, parent_region_index)) } else { @@ -339,11 +338,11 @@ impl Assay { }) } - /// Get the index of atomic regions of a read in the sequence spec. - pub fn get_index(&self, read_id: &str) -> Option { + /// Get atomic regions of a read in the sequence spec. + pub fn get_segments(&self, read_id: &str) -> Option { let read = self.sequence_spec.get(read_id)?; let library_parent_region = self.library_spec.get_parent(&read.primer_id)?; - read.get_index(&library_parent_region.read().unwrap()) + read.get_segments(&library_parent_region.read().unwrap()) } pub fn iter_reads(&self, modality: Modality) -> impl Iterator { @@ -370,13 +369,13 @@ impl Assay { .get_parent(&read.primer_id) .ok_or_else(|| anyhow!("Primer not found: {}", read.primer_id)) .unwrap(); - let index = read.get_index(®ion.read().unwrap())?; + let index = read.get_segments(®ion.read().unwrap())?; let regions: Vec<_> = index - .index + .segments .iter() - .map(|(region_id, _, range)| { - let region = self.library_spec.get(region_id).unwrap(); - (region.read().unwrap(), range.clone()) + .map(|elem_info| { + let region = self.library_spec.get(&elem_info.region_id).unwrap(); + (region.read().unwrap(), elem_info.range.clone()) }) .collect(); Some((reader, (read, regions))) @@ -459,13 +458,14 @@ impl Assay { Ok(()) } + /// Verify reads in the sequence spec. fn verify(&self, read: &Read) -> Result<()> { let region = self .library_spec .get_parent(&read.primer_id) .ok_or_else(|| anyhow!("Primer not found: {}", read.primer_id))?; // Check if the primer exists - if let Some(index) = read.get_index(®ion.read().unwrap()) { + if let Some(index) = read.get_segments(®ion.read().unwrap()) { match index.readlen_info { ReadSpan::Covered | ReadSpan::Span(_) => {} ReadSpan::NotEnough => { @@ -487,11 +487,11 @@ impl Assay { if let Some(mut reader) = read.open() { let regions = index - .index + .segments .iter() - .map(|(region_id, _, range)| { - let region = self.library_spec.get(region_id).unwrap(); - (region.read().unwrap(), range) + .map(|info| { + let region = self.library_spec.get(&info.region_id).unwrap(); + (region.read().unwrap(), &info.range) }) .collect::>(); let mut validators = regions @@ -801,36 +801,36 @@ mod tests { let yaml_str = fs::read_to_string(YAML_FILE).expect("Failed to read file"); let assay: Assay = serde_yaml::from_str(&yaml_str).expect("Failed to parse YAML"); - for (read, index) in assay.get_index_by_modality(Modality::RNA) { + for (read, index) in assay.get_segments_by_modality(Modality::RNA) { println!( "{}: {:?}", read.read_id, index - .index + .segments .into_iter() - .map(|x| (x.1, x.2)) + .map(|x| (x.region_type, x.range)) .collect::>() ); } - for (read, index) in assay.get_index_by_modality(Modality::ATAC) { + for (read, index) in assay.get_segments_by_modality(Modality::ATAC) { println!( "{}: {:?}", read.read_id, index - .index + .segments .into_iter() - .map(|x| (x.1, x.2)) + .map(|x| (x.region_type, x.range)) .collect::>() ); } - for (read, index) in assay.get_index_by_modality(Modality::Protein) { + for (read, index) in assay.get_segments_by_modality(Modality::Protein) { println!( "{}: {:?}", read.read_id, index - .index + .segments .into_iter() - .map(|x| (x.1, x.2)) + .map(|x| (x.region_type, x.range)) .collect::>() ); } diff --git a/seqspec/src/read.rs b/seqspec/src/read.rs index 0e64b1f..4b3778e 100644 --- a/seqspec/src/read.rs +++ b/seqspec/src/read.rs @@ -15,6 +15,7 @@ use std::{ ops::Range, }; +/// Specification of a sequencing library. #[derive(Debug, Clone, PartialEq)] pub struct SeqSpec(IndexMap); @@ -110,48 +111,41 @@ impl Read { Ok(record.sequence().len()) } - pub(crate) fn get_index<'a>(&'a self, region: &'a Region) -> Option { + /// Check if the read is reverse. + pub fn is_reverse(&self) -> bool { + match self.strand { + Strand::Neg => true, + Strand::Pos => false, + } + } + + pub(crate) fn get_segments<'a>(&'a self, region: &'a Region) -> Option { if region.sequence_type != SequenceType::Joined { return None; } let mut found_primer = false; - - let result = if self.is_reverse() { - self.get_read_span( - region - .subregions - .iter() - .rev() - .skip_while(|region| { - let region = region.read().unwrap(); - let found = region.region_type.is_sequencing_primer() - && region.region_id == self.primer_id; - if found { - found_primer = true; - } - !found - }) - .skip(1), - ) + let subregions = region.subregions.iter(); + let subregions: Box> = if self.is_reverse() { + Box::new(subregions.rev()) } else { - self.get_read_span( - region - .subregions - .iter() - .skip_while(|region| { - let region = region.read().unwrap(); - let found = region.region_type.is_sequencing_primer() - && region.region_id == self.primer_id; - if found { - found_primer = true; - } - !found - }) - .skip(1), - ) + Box::new(subregions) }; + let result = self.get_read_span( + subregions + .skip_while(|region| { + let region = region.read().unwrap(); + let found = region.region_type.is_sequencing_primer() + && region.region_id == self.primer_id; + if found { + found_primer = true; + } + !found + }) + .skip(1), + ); + if found_primer { Some(result) } else { @@ -160,74 +154,162 @@ impl Read { } /// Helper function to get the region index for a read. - fn get_read_span<'a, I>(&self, mut regions: I) -> RegionIndex + fn get_read_span<'a, I>(&self, mut regions: I) -> SegmentInfo where I: Iterator>>, { - let mut index = Vec::new(); + let mut segments = Vec::new(); let read_len = self.max_len; let mut cur_pos = 0; let mut readlen_info = ReadSpan::Covered; while let Some(region) = regions.next() { - let region = region.read().unwrap(); - let region_id = region.region_id.clone(); - let region_type = region.region_type; + let mut region = region.read().unwrap(); + let mut region_id = region.region_id.clone(); + let mut region_type = SegmentType::R(region.region_type); if region.is_fixed_length() { // Fixed-length region let end = cur_pos + region.min_len; if end >= read_len { - index.push((region_id, region_type, cur_pos..read_len)); + segments.push(SegmentInfoElem::new( + region_id, + region_type, + cur_pos..read_len, + )); if end > read_len { readlen_info = ReadSpan::NotEnough; } break; } else { - index.push((region_id, region_type, cur_pos..end)); + segments.push(SegmentInfoElem::new(region_id, region_type, cur_pos..end)); cur_pos = end; } - } else if cur_pos + region.min_len >= read_len { - // Variable-length region and read is shorter - index.push((region_id, region_type, cur_pos..read_len)); - readlen_info = ReadSpan::Span((read_len - cur_pos) as usize); - break; - } else if cur_pos + region.max_len < read_len { - // Variable-length region and read is longer than max length - index.push((region_id, region_type, cur_pos..cur_pos + region.max_len)); - if let Some(next_region) = regions.next() { - let next_region = next_region.read().unwrap(); - readlen_info = ReadSpan::ReadThrough(next_region.region_id.clone()); - } - break; } else { - // Variable-length region and read is within the length range - index.push((region_id, region_type, cur_pos..read_len)); - if let Some(next_region) = regions.next() { - let next_region = next_region.read().unwrap(); - readlen_info = ReadSpan::MayReadThrough(next_region.region_id.clone()); + // Variable-length region + if let Some(nucl) = region.region_type.is_poly_nucl() { + if let Some(next_region) = regions.next() { + region = next_region.read().unwrap(); + region_id = region.region_id.clone(); + region_type = SegmentType::PolyNucl((nucl, region.region_type)); + } + } + if cur_pos + region.min_len >= read_len { + // Variable-length region and read is shorter + segments.push(SegmentInfoElem::new( + region_id, + region_type, + cur_pos..read_len, + )); + readlen_info = ReadSpan::Span((read_len - cur_pos) as usize); + break; + } else if cur_pos + region.max_len < read_len { + // Variable-length region and read is longer than max length + segments.push(SegmentInfoElem::new( + region_id, + region_type, + cur_pos..cur_pos + region.max_len, + )); + if let Some(next_region) = regions.next() { + let next_region = next_region.read().unwrap(); + readlen_info = ReadSpan::ReadThrough(next_region.region_id.clone()); + } + break; + } else { + // Variable-length region and read is within the length range + segments.push(SegmentInfoElem::new( + region_id, + region_type, + cur_pos..read_len, + )); + if let Some(next_region) = regions.next() { + let next_region = next_region.read().unwrap(); + readlen_info = ReadSpan::MayReadThrough(next_region.region_id.clone()); + } + break; } - break; } } - RegionIndex { - index, + SegmentInfo { + segments, readlen_info, } } +} - pub fn is_reverse(&self) -> bool { - match self.strand { - Strand::Neg => true, - Strand::Pos => false, +/// A read may be divided into multiple segments, each corresponding to a region +/// with biological meaning. SegmentInfo stores the information about the regions +/// that the read is divided into. +#[derive(Debug, Clone)] +pub struct SegmentInfo { + pub segments: Vec, + pub readlen_info: ReadSpan, +} + +#[derive(Clone)] +pub enum SegmentType { + R(RegionType), + PolyNucl((u8, RegionType)), // (nucleotide, region_type) +} + +impl std::fmt::Debug for SegmentType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SegmentType::R(region_type) => write!(f, "{:?}", region_type), + SegmentType::PolyNucl((nucl, region_type)) => { + write!(f, "poly{}+{:?}", *nucl as char, region_type) + } + } + } +} + +impl SegmentType { + pub fn is_barcode(&self) -> bool { + match self { + SegmentType::R(region_type) => region_type.is_barcode(), + _ => false, + } + } + + pub fn is_umi(&self) -> bool { + match self { + SegmentType::R(region_type) => region_type.is_umi(), + _ => false, + } + } + + pub fn is_target(&self) -> bool { + match self { + SegmentType::R(region_type) => region_type.is_target(), + SegmentType::PolyNucl((_, region_type)) => region_type.is_target(), + } + } + + pub fn poly_nucl(&self) -> Option { + match self { + SegmentType::PolyNucl((nucl, _)) => Some(*nucl), + _ => None, } } } +/// Information about a segment in a read. #[derive(Debug, Clone)] -pub struct RegionIndex { - pub index: Vec<(String, RegionType, Range)>, - pub readlen_info: ReadSpan, +pub struct SegmentInfoElem { + pub region_id: String, + pub region_type: SegmentType, + pub range: Range, +} + +impl SegmentInfoElem { + pub fn new(region_id: String, region_type: SegmentType, range: Range) -> Self { + Self { + region_id, + region_type: region_type, + range, + } + } } +/// Information about the region index for a read. #[derive(Debug, Clone)] pub enum ReadSpan { Covered, // The read is fully contained within the target region diff --git a/seqspec/src/region.rs b/seqspec/src/region.rs index 9235dbf..e62ddec 100644 --- a/seqspec/src/region.rs +++ b/seqspec/src/region.rs @@ -237,6 +237,16 @@ impl RegionType { matches!(self, RegionType::Gdna | RegionType::Cdna) } + pub fn is_poly_nucl(&self) -> Option { + match self { + RegionType::PolyA => Some(b'A'), + RegionType::PolyG => Some(b'G'), + RegionType::PolyT => Some(b'T'), + RegionType::PolyC => Some(b'C'), + _ => None, + } + } + pub fn is_sequencing_primer(&self) -> bool { matches!( self, diff --git a/seqspec_templates/10x_rna_atac.yaml b/seqspec_templates/10x_rna_atac.yaml index 6946757..43a2856 100644 --- a/seqspec_templates/10x_rna_atac.yaml +++ b/seqspec_templates/10x_rna_atac.yaml @@ -20,7 +20,7 @@ sequence_spec: modality: rna primer_id: rna-truseq_read1 min_len: 28 - max_len: 28 + max_len: 98 strand: pos - !Read read_id: rna-I1 @@ -134,11 +134,22 @@ library_spec: onlist: null regions: null parent_id: rna + - !Region + parent_id: rna + region_id: rna-polyT + region_type: poly_t + name: rna-polyT + sequence_type: random + sequence: XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX + min_len: 10 + max_len: 250 + onlist: null + regions: null - !Region parent_id: rna region_id: rna-cDNA region_type: cdna - name: RNA-cDNA + name: rna-cDNA sequence_type: random sequence: XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX min_len: 1