Skip to content

Commit

Permalink
path normalizatioin
Browse files Browse the repository at this point in the history
  • Loading branch information
kaizhang committed Oct 18, 2024
1 parent 6b8ae88 commit c722a09
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 23 deletions.
7 changes: 6 additions & 1 deletion python/src/pyseqspec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl Assay {
signature = (read_id, *, modality=None, primer_id=None, is_reverse=None, fastq=None, min_len=None, max_len=None),
text_signature = "($self, read_id, *, modality=None, primer_id=None, is_reverse=None, fastq=None, min_len=None, max_len=None)",
)]
pub fn update_read(
fn update_read(
&mut self,
read_id: &str,
modality: Option<&str>,
Expand Down Expand Up @@ -125,6 +125,11 @@ impl Assay {
self.0.delete_read(read_id);
}

fn validate(&self, modality: &str, out_dir: PathBuf) -> Result<()> {
let modality = Modality::from_str(modality)?;
self.0.iter_reads(modality).try_for_each(|read| self.0.validate(read, &out_dir))
}

/*
/// Identify the position of elements in a spec.
///
Expand Down
1 change: 1 addition & 0 deletions seqspec/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ cached-path = "0.6"
flate2 = "1.0"
indexmap = "2.5"
log = "0.4"
hamming = "0.1"
noodles = { version = "0.80", features = ["core", "fastq", "async"] }
reqwest = { version = "0.12", features = ["blocking"] }
serde = { version = "1.0", features = ["derive", "rc"] }
Expand Down
77 changes: 69 additions & 8 deletions seqspec/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ mod region;
pub mod utils;

use log::warn;
use read::{ReadSpan, RegionIndex};
use read::{ReadSpan, RegionIndex, ValidateResult};
pub use read::{Read, File, UrlType, Strand};
use read::ReadValidator;
use region::LibSpec;
pub use region::{Region, RegionType, SequenceType, Onlist};
use noodles::fastq;

use serde::{Deserialize, Deserializer, Serialize};
use serde_yaml::{self, Value};
use utils::open_file_for_write;
use std::{fs, path::PathBuf, str::FromStr, sync::Arc};
use anyhow::{bail, anyhow, Result};
use std::path::Path;
Expand Down Expand Up @@ -50,6 +52,7 @@ impl Assay {
}

/// Normalize all paths in the sequence spec.
/// This is used to bring relative paths to absolute paths.
pub fn normalize_all_paths(&mut self) {
if let Some(file) = &self.file {
let base_dir = file.parent().unwrap();
Expand All @@ -62,6 +65,13 @@ impl Assay {
});
}
});
self.library_spec.regions_mut().for_each(|region| {
if let Some(onlist) = &mut Arc::<Region>::get_mut(region).unwrap().onlist {
if let Err(e) = onlist.normalize_path(base_dir) {
warn!("{}", e);
}
}
});
}
}

Expand All @@ -75,6 +85,15 @@ impl Assay {
});
}
});
/*
self.library_spec.regions_mut().for_each(|region| {
if let Some(onlist) = &mut Arc::<Region>::get_mut(region).unwrap().onlist {
if let Err(e) = onlist.unnormalize_path(base_dir.as_ref()) {
warn!("Failed to unnormalize path: {}", e);
}
}
});
*/
}

/// Add default Illumina reads to the sequence spec.
Expand Down Expand Up @@ -223,7 +242,7 @@ impl Assay {
read_buffer.max_len = max_len.unwrap() as u32;
}

self.validate_reads(&read_buffer)?;
self.verify(&read_buffer)?;
self.sequence_spec.insert(read_id.to_string(), read_buffer);

Ok(())
Expand Down Expand Up @@ -264,7 +283,46 @@ impl Assay {
self.sequence_spec.values().filter(move |read| read.modality == modality)
}

pub fn validate_reads(&self, read: &Read) -> Result<()> {
/// Demultiplex reads into separate files based on the primer ID.
pub fn validate<P: AsRef<Path>>(&self, read: &Read, dir: P) -> Result<()> {
let region = self.library_spec.get_parent(&read.primer_id)
.ok_or_else(|| anyhow!("Primer not found: {}", read.primer_id))?;
if let Some(index) = read.get_index(region) {
let output_valid = dir.as_ref().join(format!("{}.fq.zst", read.read_id));
let output_valid = open_file_for_write(output_valid, None, None, 8)?;
let mut output_valid = fastq::io::Writer::new(output_valid);
let output_other = dir.as_ref().join(format!("Invalid_{}.fq.zst", read.read_id));
let output_other = open_file_for_write(output_other, None, None, 8)?;
let mut output_other = fastq::io::Writer::new(output_other);
if let Some(mut reader) = read.open() {
let mut validators: Vec<_> = index.index.iter().map(|(region_id, _, range)| {
let region = self.library_spec.get(region_id).unwrap();
ReadValidator::new(region)
.with_range(range.start as usize ..range.end as usize)
.with_strand(read.strand)
}).collect();

reader.records().try_for_each(|record| {
let record = record?;
let valid = validators.iter_mut().all(|validator| {
match validator.validate(record.sequence()) {
ValidateResult::OnlistFail | ValidateResult::Valid => true,
_ => false,
}
});
if valid {
output_valid.write_record(&record)?;
} else {
output_other.write_record(&record)?;
}
anyhow::Ok(())
})?;
}
}
Ok(())
}

fn verify(&self, read: &Read) -> Result<()> {
let region = self.library_spec.get_parent(&read.primer_id)
.ok_or_else(|| anyhow!("Primer not found: {}", read.primer_id))?;
// Check if the primer exists
Expand Down Expand Up @@ -296,17 +354,20 @@ impl Assay {
reader.records().take(500).try_for_each(|record| {
let record = record?;
for validator in &mut validators {
validator.validate(record.sequence())?;
let result = validator.validate(record.sequence());
match result {
ValidateResult::TooLong(_) | ValidateResult::TooShort(_) => {
bail!("{}", result);
}
_ => {},
}
}
anyhow::Ok(())
})?;

for validator in validators {
let percent_matched = validator.frac_matched() * 100.0;
if percent_matched < 5.0 {
bail!("Read '{}' failed validation for region '{}'. \
Percentage of matched bases: {:.2}%", read.read_id, validator.id(), percent_matched);
} else if percent_matched < 50.0 {
if percent_matched < 50.0 {
warn!("Read '{}' has low percentage of matched bases for region '{}'. \
Percentage of matched bases: {:.2}%", read.read_id, validator.id(), percent_matched);
}
Expand Down
78 changes: 65 additions & 13 deletions seqspec/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,18 +285,21 @@ pub(crate) struct ReadValidator<'a> {
n_matched: usize,
onlist: Option<HashSet<String>>,
strand: Strand,
tolerance: f64,
}

impl<'a> ReadValidator<'a> {
pub fn id(&self) -> &str {
&self.region.region_id
}

pub fn sequence_type(&self) -> SequenceType {
self.region.sequence_type
}

pub fn new(region: &'a Region) -> Self {
let onlist = if let Some(onlist) = &region.onlist {
Some(onlist.read().unwrap())
} else if region.sequence_type == SequenceType::Fixed {
Some(HashSet::from([region.sequence.clone()]))
} else {
None
};
Expand All @@ -307,6 +310,7 @@ impl<'a> ReadValidator<'a> {
n_matched: 0,
onlist,
strand: Strand::Pos,
tolerance: 0.2,
}
}

Expand All @@ -324,36 +328,84 @@ impl<'a> ReadValidator<'a> {
self.n_matched as f64 / self.n_total as f64
}

pub fn validate(&mut self, seq: &[u8]) -> Result<()> {
/// Validate a sequence. Return true if the sequence is valid.
/// For fixed sequences, the sequence is checked against the fixed sequence with
/// certain tolerance (default is 1 mismatches).
/// Note that for onlist, we check for the exact match.
pub fn validate(&mut self, seq: &[u8]) -> ValidateResult {
self.n_total += 1;
let seq = if let Some(range) = &self.range {
&seq[range.clone()]
} else {
seq
};
if seq.len() < self.region.min_len as usize {
return Err(anyhow::anyhow!("Sequence too short: {}", seq.len()));
}
if seq.len() > self.region.max_len as usize {
return Err(anyhow::anyhow!("Sequence too long: {}", seq.len()));
}
if let Some(onlist) = &self.onlist {
let len = seq.len();
if len < self.region.min_len as usize {
ValidateResult::TooShort((seq.len(), self.region.min_len as usize))
} else if seq.len() > self.region.max_len as usize {
ValidateResult::TooLong((seq.len(), self.region.max_len as usize))
} else if self.sequence_type() == SequenceType::Fixed {
let d = hamming::distance(seq, self.region.sequence.as_bytes()) as f64;
if d <= self.tolerance * len as f64 {
self.n_matched += 1;
ValidateResult::Valid
} else {
ValidateResult::MisMatch(d as usize)
}
} else if let Some(onlist) = &self.onlist {
match self.strand {
Strand::Neg => {
let seq = crate::utils::rev_compl(seq);
if onlist.contains(std::str::from_utf8(&seq)?) {
if onlist.contains(std::str::from_utf8(&seq).unwrap()) {
self.n_matched += 1;
ValidateResult::Valid
} else {
ValidateResult::OnlistFail
}
},
Strand::Pos => {
if onlist.contains(std::str::from_utf8(seq)?) {
if onlist.contains(std::str::from_utf8(seq).unwrap()) {
self.n_matched += 1;
ValidateResult::Valid
} else {
ValidateResult::OnlistFail
}
},
}
} else {
self.n_matched += 1;
ValidateResult::Valid
}
}
}

#[derive(Debug, Clone)]
pub(crate) enum ValidateResult {
TooShort((usize, usize)),
TooLong((usize, usize)),
OnlistFail,
MisMatch(usize),
Valid,
}

impl std::fmt::Display for ValidateResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ValidateResult::TooShort((n, min_len)) => {
write!(f, "Sequence too short: {} < {}", n, min_len)
},
ValidateResult::TooLong((n, max_len)) => {
write!(f, "Sequence too long: {} > {}", n, max_len)
},
ValidateResult::OnlistFail => {
write!(f, "Not on the onlist")
},
ValidateResult::MisMatch(n) => {
write!(f, "Mismatch: {}", n)
},
ValidateResult::Valid => {
write!(f, "Valid")
},
}
Ok(())
}
}
22 changes: 21 additions & 1 deletion seqspec/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::read::UrlType;
use cached_path::Cache;
use indexmap::IndexMap;
use serde::{Deserialize, Serialize};
use std::{collections::{HashMap, HashSet}, io::BufRead, sync::Arc};
use std::{collections::{HashMap, HashSet}, io::BufRead, path::Path, sync::Arc};
use anyhow::Result;

#[derive(Debug, Clone, PartialEq)]
Expand Down Expand Up @@ -66,6 +66,10 @@ impl LibSpec {
self.region_map.values()
}

pub fn regions_mut(&mut self) -> impl Iterator<Item = &mut Arc<Region>> {
self.region_map.values_mut()
}

pub fn get_modality(&self, modality: &Modality) -> Option<&Arc<Region>> {
self.modalities.get(modality)
}
Expand Down Expand Up @@ -248,6 +252,22 @@ impl Onlist {
let reader = std::io::BufReader::new(crate::utils::open_file_for_read(file));
Ok(reader.lines().map(|x| x.unwrap()).collect())
}

pub(crate) fn normalize_path<P: AsRef<Path>>(&mut self, work_dir: P) -> Result<()> {
if self.urltype == UrlType::Local {
self.url = crate::utils::normalize_path(work_dir, &mut self.url)?
.to_str().unwrap().to_owned();
}
Ok(())
}

pub(crate) fn unnormalize_path<P: AsRef<Path>>(&mut self, work_dir: P) -> Result<()> {
if self.urltype == UrlType::Local {
self.url = crate::utils::unnormalize_path(work_dir, &mut self.url)?
.to_str().unwrap().to_owned();
}
Ok(())
}
}

#[derive(Deserialize, Serialize, Debug, Clone, Copy, PartialEq)]
Expand Down

0 comments on commit c722a09

Please sign in to comment.