Skip to content

Commit

Permalink
use RwLock
Browse files Browse the repository at this point in the history
  • Loading branch information
kaizhang committed Oct 18, 2024
1 parent c722a09 commit 8665d63
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 67 deletions.
8 changes: 4 additions & 4 deletions precellar/src/align.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,14 @@ impl<A: Alinger> FastqProcessor<A> {
}

fn get_whitelist(&self) -> Result<Whitelist> {
let regions: Vec<_> = self.assay.library_spec.get_modality(&self.modality()).unwrap()
.subregions.iter().filter(|r| r.region_type.is_barcode()).collect();
let regions = self.assay.library_spec.get_modality(&self.modality()).unwrap().read().unwrap();
let regions: Vec<_> = regions.subregions.iter().filter(|r| r.read().unwrap().region_type.is_barcode()).collect();
if regions.len() != 1 {
bail!("Expecting exactly one barcode region, found {}", regions.len());
}
let region = regions[0];
if region.sequence_type == SequenceType::Onlist {
Ok(Whitelist::new(region.onlist.as_ref().unwrap().read()?))
if region.read().unwrap().sequence_type == SequenceType::Onlist {
Ok(Whitelist::new(region.read().unwrap().onlist.as_ref().unwrap().read()?))
} else {
Ok(Whitelist::empty())
}
Expand Down
4 changes: 2 additions & 2 deletions python/src/pyseqspec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl Assay {
}

let tree = Tree::new("".to_string()).with_leaves(
assay.library_spec.modalities().map(|region| build_tree(region, &read_list))
assay.library_spec.modalities().map(|region| build_tree(&region.read().unwrap(), &read_list))
);
format!("{}", tree)
}
Expand All @@ -212,7 +212,7 @@ fn build_tree(region: &Region, read_list: &HashMap<String, Vec<&Read>>) -> Tree<
format!("{}({})", id, len)
};
Tree::new(label)
.with_leaves(region.subregions.iter().map(|child| build_tree(child, read_list)))
.with_leaves(region.subregions.iter().map(|child| build_tree(&child.read().unwrap(), read_list)))
}

fn format_read(read: &Read) -> String {
Expand Down
53 changes: 31 additions & 22 deletions seqspec/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use noodles::fastq;
use serde::{Deserialize, Deserializer, Serialize};
use serde_yaml::{self, Value};
use utils::open_file_for_write;
use std::{fs, path::PathBuf, str::FromStr, sync::Arc};
use std::{fs, path::PathBuf, str::FromStr, sync::{Arc, RwLock}};
use anyhow::{bail, anyhow, Result};
use std::path::Path;

Expand Down Expand Up @@ -65,8 +65,8 @@ impl Assay {
});
}
});
self.library_spec.regions_mut().for_each(|region| {
if let Some(onlist) = &mut Arc::<Region>::get_mut(region).unwrap().onlist {
self.library_spec.regions().for_each(|region| {
if let Some(onlist) = &mut region.write().unwrap().onlist {
if let Err(e) = onlist.normalize_path(base_dir) {
warn!("{}", e);
}
Expand All @@ -85,23 +85,22 @@ impl Assay {
});
}
});
/*
self.library_spec.regions_mut().for_each(|region| {
if let Some(onlist) = &mut Arc::<Region>::get_mut(region).unwrap().onlist {
self.library_spec.regions().for_each(|region| {
if let Some(onlist) = &mut region.write().unwrap().onlist {
if let Err(e) = onlist.unnormalize_path(base_dir.as_ref()) {
warn!("Failed to unnormalize path: {}", e);
}
}
});
*/
}

/// Add default Illumina reads to the sequence spec.
pub fn add_illumina_reads(&mut self, modality: Modality, read_len: usize, forward_strand_workflow: bool) -> Result<()> {
fn advance_until(iterator: &mut std::slice::Iter<'_, Arc<Region>>, f: fn(&Region) -> bool) -> Option<(Arc<Region>, Vec<Arc<Region>>)> {
fn advance_until(iterator: &mut std::slice::Iter<'_, Arc<RwLock<Region>>>, f: fn(&Region) -> bool) -> Option<(Arc<RwLock<Region>>, Vec<Arc<RwLock<Region>>>)> {
let mut regions = Vec::new();
while let Some(next_region) = iterator.next() {
if f(next_region) {
let r = next_region.read().unwrap();
if f(&r) {
return Some((next_region.clone(), regions))
} else {
regions.push(next_region.clone());
Expand All @@ -110,15 +109,15 @@ impl Assay {
None
}

fn get_length(regions: &[Arc<Region>], reverse: bool) -> usize {
fn get_length(regions: &[Arc<RwLock<Region>>], reverse: bool) -> usize {
if reverse {
regions.iter()
.skip_while(|region| region.sequence_type == SequenceType::Fixed)
.map(|region| region.len().unwrap() as usize).sum()
.skip_while(|region| region.read().unwrap().sequence_type == SequenceType::Fixed)
.map(|region| region.read().unwrap().len().unwrap() as usize).sum()
} else {
regions.iter().rev()
.skip_while(|region| region.sequence_type == SequenceType::Fixed)
.map(|region| region.len().unwrap() as usize).sum()
.skip_while(|region| region.read().unwrap().sequence_type == SequenceType::Fixed)
.map(|region| region.read().unwrap().len().unwrap() as usize).sum()
}
}

Expand All @@ -140,10 +139,13 @@ impl Assay {

self.delete_all_reads(modality);
let regions = self.library_spec.get_modality(&modality).ok_or_else(|| anyhow!("Modality not found: {:?}", modality))?.clone();
let regions = regions.read().unwrap();
let mut regions = regions.subregions.iter();
while let Some(current_region) = regions.next() {
let current_region = current_region.read().unwrap();
if is_p5(&current_region) {
if let Some((next_region, acc)) = advance_until(&mut regions, is_read1) {
let next_region = next_region.read().unwrap();
self.update_read::<PathBuf>(
&format!("{}-R1", modality.to_string()),
Some(modality),
Expand All @@ -155,7 +157,7 @@ impl Assay {
let acc_len = get_length(acc.as_slice(), false);
if acc_len > 0 {
self.update_read::<PathBuf>(
&format!("{}-I1", modality.to_string()),
&format!("{}-I2", modality.to_string()),
Some(modality),
Some(&current_region.region_id),
Some(false),
Expand All @@ -166,7 +168,7 @@ impl Assay {
let acc_len = get_length(acc.as_slice(), true);
if acc_len > 0 {
self.update_read::<PathBuf>(
&format!("{}-I1", modality.to_string()),
&format!("{}-I2", modality.to_string()),
Some(modality),
Some(&next_region.region_id),
Some(true),
Expand All @@ -187,7 +189,7 @@ impl Assay {
)?;
if acc_len > 0 {
self.update_read::<PathBuf>(
&format!("{}-I2", modality.to_string()),
&format!("{}-I1", modality.to_string()),
Some(modality),
Some(&current_region.region_id),
Some(false),
Expand Down Expand Up @@ -276,7 +278,7 @@ impl Assay {
pub fn get_index(&self, read_id: &str) -> Option<RegionIndex> {
let read = self.sequence_spec.get(read_id)?;
let region = self.library_spec.get_parent(&read.primer_id)?;
read.get_index(region)
read.get_index(&region.read().unwrap())
}

pub fn iter_reads(&self, modality: Modality) -> impl Iterator<Item = &Read> {
Expand All @@ -287,16 +289,20 @@ impl Assay {
pub fn validate<P: AsRef<Path>>(&self, read: &Read, dir: P) -> Result<()> {
let region = self.library_spec.get_parent(&read.primer_id)
.ok_or_else(|| anyhow!("Primer not found: {}", read.primer_id))?;
if let Some(index) = read.get_index(region) {
if let Some(index) = read.get_index(&region.read().unwrap()) {
fs::create_dir_all(&dir)?;
let output_valid = dir.as_ref().join(format!("{}.fq.zst", read.read_id));
let output_valid = open_file_for_write(output_valid, None, None, 8)?;
let mut output_valid = fastq::io::Writer::new(output_valid);
let output_other = dir.as_ref().join(format!("Invalid_{}.fq.zst", read.read_id));
let output_other = open_file_for_write(output_other, None, None, 8)?;
let mut output_other = fastq::io::Writer::new(output_other);
if let Some(mut reader) = read.open() {
let mut validators: Vec<_> = index.index.iter().map(|(region_id, _, range)| {
let regions: Vec<_> = index.index.iter().map(|(region_id, _, range)| {
let region = self.library_spec.get(region_id).unwrap();
(region.read().unwrap(), range)
}).collect();
let mut validators: Vec<_> = regions.iter().map(|(region, range)| {
ReadValidator::new(region)
.with_range(range.start as usize ..range.end as usize)
.with_strand(read.strand)
Expand Down Expand Up @@ -326,7 +332,7 @@ impl Assay {
let region = self.library_spec.get_parent(&read.primer_id)
.ok_or_else(|| anyhow!("Primer not found: {}", read.primer_id))?;
// Check if the primer exists
if let Some(index) = read.get_index(region) {
if let Some(index) = read.get_index(&region.read().unwrap()) {
match index.readlen_info {
ReadSpan::Covered | ReadSpan::Span(_) => {},
ReadSpan::NotEnough => {
Expand All @@ -344,8 +350,11 @@ impl Assay {
}

if let Some(mut reader) = read.open() {
let mut validators: Vec<_> = index.index.iter().map(|(region_id, _, range)| {
let regions = index.index.iter().map(|(region_id, _, range)| {
let region = self.library_spec.get(region_id).unwrap();
(region.read().unwrap(), range)
}).collect::<Vec<_>>();
let mut validators: Vec<_> = regions.iter().map(|(region, range)| {
ReadValidator::new(region)
.with_range(range.start as usize ..range.end as usize)
.with_strand(read.strand)
Expand Down
9 changes: 7 additions & 2 deletions seqspec/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use indexmap::IndexMap;
use serde::{Deserialize, Serialize, Serializer};
use std::collections::HashSet;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::{io::{BufRead, BufReader}, ops::Range};
use anyhow::Result;
use std::path::Path;
Expand Down Expand Up @@ -107,6 +107,7 @@ impl Read {
self.get_read_span(
region.subregions.iter().rev()
.skip_while(|region| {
let region = region.read().unwrap();
let found = region.region_type.is_sequencing_primer() && region.region_id == self.primer_id;
if found {
found_primer = true;
Expand All @@ -118,6 +119,7 @@ impl Read {
self.get_read_span(
region.subregions.iter()
.skip_while(|region| {
let region = region.read().unwrap();
let found = region.region_type.is_sequencing_primer() && region.region_id == self.primer_id;
if found {
found_primer = true;
Expand All @@ -137,13 +139,14 @@ impl Read {
/// Get the regions of the read.
fn get_read_span<'a, I>(&self, mut regions: I) -> RegionIndex
where
I: Iterator<Item = &'a Arc<Region>>,
I: Iterator<Item = &'a Arc<RwLock<Region>>>,
{
let mut index = Vec::new();
let read_len = self.max_len;
let mut cur_pos = 0;
let mut readlen_info = ReadSpan::Covered;
while let Some(region) = regions.next() {
let region = region.read().unwrap();
let region_id = region.region_id.clone();
let region_type = region.region_type;
if region.is_fixed_length() { // Fixed-length region
Expand All @@ -165,12 +168,14 @@ impl Read {
} else if cur_pos + region.max_len < read_len { // Variable-length region and read is longer than max length
index.push((region_id, region_type, cur_pos..cur_pos + region.max_len));
if let Some(next_region) = regions.next() {
let next_region = next_region.read().unwrap();
readlen_info = ReadSpan::ReadThrough(next_region.region_id.clone());
}
break;
} else { // Variable-length region and read is within the length range
index.push((region_id, region_type, cur_pos..read_len));
if let Some(next_region) = regions.next() {
let next_region = next_region.read().unwrap();
readlen_info = ReadSpan::MayReadThrough(next_region.region_id.clone());
}
break;
Expand Down
Loading

0 comments on commit 8665d63

Please sign in to comment.