Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
kaizhang committed Oct 3, 2024
1 parent ee32dc6 commit dc80360
Show file tree
Hide file tree
Showing 10 changed files with 159 additions and 185 deletions.
6 changes: 1 addition & 5 deletions precellar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,15 @@ anyhow = "1.0"
bed-utils = "0.5.1"
bwa-mem2 = { git = "https://github.com/regulatory-genomics/bwa-mem2-rust.git", rev = "07eda9b9c2815ae52b3fa30b01de0e19fae31fe0" }
bstr = "1.0"
cached-path = "0.6"
either = "1.13"
flate2 = "1.0"
itertools = "0.13"
indexmap = "2.5"
log = "0.4"
lexical = "6.1"
multi_reader = "0.1"
noodles = { version = "0.80", features = ["core", "fastq", "bam", "sam", "async"] }
kdam = "0.5.2"
rayon = "1.10"
smallvec = "1.13"
serde = "1.0"
seqspec = { version = "0.1", workspace = true }
regex = "1.6"
zstd = { version = "0.13", features = ["zstdmt"] }
regex = "1.6"
6 changes: 3 additions & 3 deletions precellar/src/align.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ impl<A: Alinger> FastqProcessor<A> {
if regions.is_empty() {
None
} else {
Some((read, regions, crate::io::read_fastq(read, self.base_dir.clone()).unwrap()))
Some((read, regions, read.open(self.base_dir.clone()).unwrap()))
}
});
FastqRecords::new(data)
Expand All @@ -226,7 +226,7 @@ impl<A: Alinger> FastqProcessor<A> {
.expect("No barcode region found");
let range = index.into_iter().find(|x| x.0.region_type.is_barcode()).unwrap().1;

crate::io::read_fastq(read, &self.base_dir).unwrap().records().for_each(|record| {
read.open(&self.base_dir).unwrap().records().for_each(|record| {
let mut record = record.unwrap();
record = slice_fastq_record(&record, range.start as usize, range.end as usize);
if read.is_reverse() {
Expand All @@ -249,7 +249,7 @@ impl<A: Alinger> FastqProcessor<A> {
}
let region = regions[0];
if region.sequence_type == SequenceType::Onlist {
Ok(Whitelist::new(crate::io::read_onlist(region.onlist.as_ref().unwrap())?))
Ok(Whitelist::new(region.onlist.as_ref().unwrap().read()?))
} else {
Ok(Whitelist::empty())
}
Expand Down
1 change: 0 additions & 1 deletion precellar/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,4 @@ pub mod barcode;
pub mod align;
pub mod fragment;
pub mod qc;
pub mod io;
pub mod utils;
4 changes: 0 additions & 4 deletions python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,15 @@ bwa-mem2 = { git = "https://github.com/regulatory-genomics/bwa-mem2-rust.git", r
bstr = "1.0"
cached-path = "0.6"
either = "1.13"
futures-util = "0.3"
itertools = "0.13"
noodles = { version = "0.80", features = ["core", "fastq", "bam", "sam", "bgzf"] }
seqspec = { version = "0.1", workspace = true }
serde_yaml = "0.9"
termtree = "0.5"
precellar = { version = "0.1", workspace = true }
regex = "1.6"
tokio = { version = "1.40", features = ["rt-multi-thread"] }
log = "0.4"
env_logger = "0.11"
reqwest = { version = "0.12", features = ["blocking", "stream"] }
url = "2.5"

[dependencies.pyo3]
version = "0.22.3"
Expand Down
3 changes: 1 addition & 2 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ use itertools::Itertools;
use ::precellar::{
align::{Alinger, FastqProcessor, NameCollatedRecords},
fragment::FragmentGenerator,
io::{open_file_for_write, Compression},
qc::{FragmentQC, Metrics, AlignQC},
};
use pyseqspec::SeqSpec;
use seqspec::{Assay, Modality};
use seqspec::{Assay, Modality, utils::{open_file_for_write, Compression}};

#[cfg(not(target_env = "msvc"))]
use tikv_jemallocator::Jemalloc;
Expand Down
128 changes: 10 additions & 118 deletions python/src/pyseqspec.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
use std::path::PathBuf;
use std::{collections::HashMap, str::FromStr};
use std::collections::HashMap;

use tokio::io::AsyncWriteExt;
use futures_util::StreamExt;
use pyo3::prelude::*;
use reqwest::header::{HeaderMap, CONTENT_DISPOSITION};
use seqspec::{Assay, File, Modality, Read, Region, Strand, UrlType};
use seqspec::{Assay, Read, Region};
use anyhow::Result;
use termtree::Tree;
use cached_path::Cache;
use url::Url;

/** A SeqSpec object.
Expand Down Expand Up @@ -79,59 +74,16 @@ impl SeqSpec {
min_len: Option<usize>,
max_len: Option<usize>,
) -> Result<()> {
let all_reads = &mut self.0.sequence_spec;
let mut read_exist = false;
let mut read_buffer = Read::default();
read_buffer.read_id = read_id.to_string();
let read = if let Some(r) = all_reads.as_mut().map(|r| r.iter_mut().find(|r| r.read_id == read_id)).flatten() {
read_exist = true;
r
let fastqs = fastq.map(|f| if f.is_instance_of::<pyo3::types::PyList>() {
f.extract::<Vec<String>>().unwrap()
} else {
&mut read_buffer
};
if !read_exist {
assert!(modality.is_some(), "modality must be provided for a new read");
assert!(primer_id.is_some(), "primer_id must be provided for a new read");
assert!(is_reverse.is_some(), "is_reverse must be provided for a new read");
}

if let Some(rev) = is_reverse {
read.strand = if rev { Strand::Neg } else { Strand::Pos };
}
if let Some(modality) = modality {
read.modality = Modality::from_str(modality)?;
}
if let Some(primer_id) = primer_id {
read.primer_id = primer_id.to_string();
}
if let Some(fastq) = fastq {
let fastq = if fastq.is_instance_of::<pyo3::types::PyList>() {
fastq.extract::<Vec<String>>()?
} else {
vec![fastq.extract::<String>()?]
};
read.files = Some(fastq.into_iter().map(|path| make_file_path(&path)).collect::<Result<Vec<File>>>()?);
}


if (min_len.is_none() || max_len.is_none()) && read.files.is_some() {
let len = precellar::io::get_read_length(&read, "./")?;
read.min_len = min_len.unwrap_or(len) as u32;
read.max_len = max_len.unwrap_or(len) as u32;
} else {
read.min_len = min_len.unwrap() as u32;
read.max_len = max_len.unwrap() as u32;
}

if !read_exist {
if let Some(r) = all_reads.as_mut() {
r.push(read_buffer);
} else {
all_reads.replace(vec![read_buffer]);
}
}
vec![f.extract::<String>().unwrap()]
});

Ok(())
self.0.update_read(
read_id, modality, primer_id, is_reverse,
fastqs.as_ref().map(|x| x.as_slice()), min_len, max_len,
)
}

/// Delete a read from the SeqSpec object.
Expand Down Expand Up @@ -214,64 +166,4 @@ fn format_read(read: &Read) -> String {
let orientation = if read.is_reverse() { "↑" } else { "↓" };
let has_files = if read.files.as_ref().map(|x| !x.is_empty()).unwrap_or(false) { "✓" } else { "✗" };
format!("{}{}({}){}", orientation, read.read_id, len, has_files)
}

fn make_file_path(path: &str) -> Result<File> {
let runtime = tokio::runtime::Runtime::new().unwrap();

let path = runtime.block_on(download_file(path))?;
let file = std::fs::File::open(&path)?;
Ok(File {
file_id: path.file_name().unwrap().to_str().unwrap().to_string(),
filename: path.file_name().unwrap().to_str().unwrap().to_string(),
filetype: "fastq".to_string(),
filesize: file.metadata()?.len(),
url: path.to_str().unwrap().to_string(),
urltype: UrlType::Local,
md5: "0".to_string(),
})
}

async fn download_file(url: &str) -> Result<PathBuf> {
if !is_url(url) {
return Ok(PathBuf::from_str(url)?)
}

let response = reqwest::get(url).await?;
let filename = get_filename(&response.headers(), url);

let mut file = tokio::fs::File::create(&filename).await?;
let mut stream = response.bytes_stream();

while let Some(chunk) = stream.next().await {
let chunk = chunk?;
file.write_all(&chunk).await?;
}

Ok(PathBuf::from_str(&filename)?)
}

// Function to extract filename from headers or URL
fn get_filename(headers: &HeaderMap, url: &str) -> String {
// Try to get the filename from the 'Content-Disposition' header
if let Some(content_disposition) = headers.get(CONTENT_DISPOSITION) {
if let Ok(disposition) = content_disposition.to_str() {
if let Some(filename) = disposition.split("filename=").nth(1) {
return filename.trim_matches('"').to_string();
}
}
}

// Fallback to extracting the filename from the URL
let parsed_url = url::Url::parse(url).expect("Invalid URL");
parsed_url
.path_segments()
.and_then(|segments| segments.last())
.unwrap_or("downloaded_file")
.to_string()
}

// Check if the input is a valid URL
fn is_url(input: &str) -> bool {
Url::parse(input).map(|url| url.has_host()).is_ok()
}
3 changes: 2 additions & 1 deletion python/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{io::{BufReader, BufWriter}, path::PathBuf, str::FromStr};
use precellar::{io::{open_file_for_read, open_file_for_write, Compression}, utils::strip_barcode_from_read_name};
use precellar::utils::strip_barcode_from_read_name;
use seqspec::utils::{open_file_for_read, open_file_for_write, Compression};
use noodles::fastq::{self, Reader, io::Writer};
use anyhow::Result;
use pyo3::prelude::*;
Expand Down
7 changes: 6 additions & 1 deletion seqspec/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ edition = "2021"

[dependencies]
anyhow = "1.0"
cached-path = "0.6"
flate2 = "1.0"
log = "0.4"
noodles = { version = "0.80", features = ["core", "fastq", "async"] }
serde = { version = "1.0", features = ["derive"] }
serde_yaml = "0.9"
serde_yaml = "0.9"
zstd = { version = "0.13", features = ["zstdmt"] }
multi_reader = "0.1"
Loading

0 comments on commit dc80360

Please sign in to comment.