Skip to content

Commit

Permalink
fix validate
Browse files Browse the repository at this point in the history
  • Loading branch information
kaizhang committed Oct 19, 2024
1 parent 8665d63 commit d59119c
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 37 deletions.
2 changes: 1 addition & 1 deletion python/src/pyseqspec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl Assay {

fn validate(&self, modality: &str, out_dir: PathBuf) -> Result<()> {
let modality = Modality::from_str(modality)?;
self.0.iter_reads(modality).try_for_each(|read| self.0.validate(read, &out_dir))
self.0.validate(modality, out_dir)
}

/*
Expand Down
1 change: 1 addition & 0 deletions seqspec/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ log = "0.4"
hamming = "0.1"
noodles = { version = "0.80", features = ["core", "fastq", "async"] }
reqwest = { version = "0.12", features = ["blocking"] }
rayon = "1.10"
serde = { version = "1.0", features = ["derive", "rc"] }
serde_yaml = "0.9"
zstd = { version = "0.13", features = ["zstdmt"] }
Expand Down
100 changes: 64 additions & 36 deletions seqspec/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod region;
pub mod utils;

use log::warn;
use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, IntoParallelRefMutIterator, ParallelIterator};
use read::{ReadSpan, RegionIndex, ValidateResult};
pub use read::{Read, File, UrlType, Strand};
use read::ReadValidator;
Expand All @@ -12,7 +13,7 @@ use noodles::fastq;

use serde::{Deserialize, Deserializer, Serialize};
use serde_yaml::{self, Value};
use utils::open_file_for_write;
use utils::{open_file_for_write, Compression};
use std::{fs, path::PathBuf, str::FromStr, sync::{Arc, RwLock}};
use anyhow::{bail, anyhow, Result};
use std::path::Path;
Expand Down Expand Up @@ -285,45 +286,72 @@ impl Assay {
self.sequence_spec.values().filter(move |read| read.modality == modality)
}

/// Demultiplex reads into separate files based on the primer ID.
pub fn validate<P: AsRef<Path>>(&self, read: &Read, dir: P) -> Result<()> {
let region = self.library_spec.get_parent(&read.primer_id)
.ok_or_else(|| anyhow!("Primer not found: {}", read.primer_id))?;
if let Some(index) = read.get_index(&region.read().unwrap()) {
fs::create_dir_all(&dir)?;
/// Validate reads in the sequence spec based on the fixed sequences and
/// save the valid and invalid reads to different files.
pub fn validate<P: AsRef<Path>>(&self, modality: Modality, dir: P) -> Result<()> {
fs::create_dir_all(&dir)?;
let (mut readers, reads): (Vec<_>, Vec<_>) = self.iter_reads(modality).flat_map(|read| {
let reader = read.open()?;
let region = self.library_spec.get_parent(&read.primer_id)
.ok_or_else(|| anyhow!("Primer not found: {}", read.primer_id)).unwrap();
let index = read.get_index(&region.read().unwrap())?;
let regions: Vec<_> = index.index.iter().map(|(region_id, _, range)| {
let region = self.library_spec.get(region_id).unwrap();
(region.read().unwrap(), range.clone())
}).collect();
Some((reader, (read, regions)))
}).collect();

let mut validators: Vec<_> = reads.iter().map(|(read, regions)|
regions.iter().map(|(region, range)| {
ReadValidator::new(region)
.with_range(range.start as usize ..range.end as usize)
.with_strand(read.strand)
}).collect::<Vec<_>>()
).collect();

let mut outputs: Vec<_> = reads.iter().map(|(read, _)| {
let output_valid = dir.as_ref().join(format!("{}.fq.zst", read.read_id));
let output_valid = open_file_for_write(output_valid, None, None, 8)?;
let mut output_valid = fastq::io::Writer::new(output_valid);
let output_valid = open_file_for_write(output_valid, Some(Compression::Zstd), Some(9), 8)?;
let output_valid = fastq::io::Writer::new(output_valid);
let output_other = dir.as_ref().join(format!("Invalid_{}.fq.zst", read.read_id));
let output_other = open_file_for_write(output_other, None, None, 8)?;
let mut output_other = fastq::io::Writer::new(output_other);
if let Some(mut reader) = read.open() {
let regions: Vec<_> = index.index.iter().map(|(region_id, _, range)| {
let region = self.library_spec.get(region_id).unwrap();
(region.read().unwrap(), range)
}).collect();
let mut validators: Vec<_> = regions.iter().map(|(region, range)| {
ReadValidator::new(region)
.with_range(range.start as usize ..range.end as usize)
.with_strand(read.strand)
}).collect();
let output_other = open_file_for_write(output_other, Some(Compression::Zstd), Some(9), 8)?;
let output_other = fastq::io::Writer::new(output_other);

reader.records().try_for_each(|record| {
let record = record?;
let valid = validators.iter_mut().all(|validator| {
match validator.validate(record.sequence()) {
ValidateResult::OnlistFail | ValidateResult::Valid => true,
_ => false,
}
});
if valid {
output_valid.write_record(&record)?;
} else {
output_other.write_record(&record)?;
}
anyhow::Ok(())
})?;
anyhow::Ok((output_valid, output_other))
}).collect::<Result<_>>()?;

loop {
let records: Vec<_> = readers.iter_mut().flat_map(|reader| {
let mut record = fastq::Record::default();
if reader.read_record(&mut record).unwrap() == 0 {
None
} else {
Some(record)
}
}).collect();

if records.len() != readers.len() {
break;
}

let valid = records.par_iter().zip(validators.par_iter_mut()).all(|(record, validators)|
validators.iter_mut().all(|validator| {
match validator.validate(record.sequence()) {
ValidateResult::OnlistFail | ValidateResult::Valid => true,
_ => false,
}
})
);

records.iter().zip(outputs.iter_mut()).try_for_each(|(record, (output_valid, output_other))| {
if valid {
output_valid.write_record(record)?;
} else {
output_other.write_record(record)?;
}
anyhow::Ok(())
})?;
}
Ok(())
}
Expand Down

0 comments on commit d59119c

Please sign in to comment.