Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: get cds overlap given chromosome, start, + stop #217

Merged
merged 11 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ pyliftover = "*"
pandas = "*"
hgvs = "*"
"biocommons.seqrepo" = "*"
pydantic = "*"
pydantic = "==1.*"
fastapi = "*"
uvicorn = "*"
gene-normalizer = ">=0.1.34, != 0.2.0, != 0.2.1, != 0.2.2, != 0.2.3, != 0.2.4, != 0.2.5, != 0.2.6, != 0.2.7, != 0.2.8"
gene-normalizer = "==0.1.*"
"ga4gh.vrs" = "*"
"ga4gh.vrsatile.pydantic" = "==0.0.*"

[dev-packages]
cool_seq_tool = {editable = true, path = "."}
Expand Down
38 changes: 21 additions & 17 deletions cool_seq_tool/data/data_downloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,34 +23,38 @@ def __init__(self) -> None:
"""Initialize downloadable data locations."""
self._data_dir = APP_ROOT / "data"

def get_mane_summary(self) -> Path:
"""Identify latest MANE summary data. If unavailable locally, download from
source.
def get_mane(self, is_summary: bool = True) -> Path:
"""Identify latest MANE summary or refseq gff data. If unavailable locally,
download from source.

:param is_summary: `True` if getting summary data. `False` if getting refseq
gff data.
:return: path to MANE summary file
"""
fn_end = ".summary.txt.gz" if is_summary else ".refseq_genomic.gff.gz"

with FTP("ftp.ncbi.nlm.nih.gov") as ftp:
ftp.login()
ftp.cwd("/refseq/MANE/MANE_human/current")
files = ftp.nlst()
mane_summary_file = \
[f for f in files if f.endswith(".summary.txt.gz")]
if not mane_summary_file:
raise Exception("Unable to download MANE summary data")
mane_summary_file = mane_summary_file[0]
self._mane_summary_path = \
self._data_dir / mane_summary_file[:-3]
mane_data_path = self._data_dir / mane_summary_file
if not self._mane_summary_path.exists():
logger.info("Downloading MANE summary file from NCBI.")

mane_file = [f for f in files if f.endswith(fn_end)]
if not mane_file:
raise Exception(f"Unable to find MANE {fn_end[1:]} data")
mane_file = mane_file[0]
self._mane_path = \
self._data_dir / mane_file[:-3]
mane_data_path = self._data_dir / mane_file
if not self._mane_path.exists():
logger.info(f"Downloading {mane_file}...")
with open(mane_data_path, "wb") as fp:
ftp.retrbinary(f"RETR {mane_summary_file}", fp.write)
ftp.retrbinary(f"RETR {mane_file}", fp.write)
with gzip.open(mane_data_path, "rb") as f_in:
with open(self._mane_summary_path, "wb") as f_out:
with open(self._mane_path, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
remove(mane_data_path)
logger.info("MANE summary file download complete.")
return self._mane_summary_path
logger.info(f"{mane_file} download complete.")
return self._mane_path

def get_lrg_refseq_gene_data(self) -> Path:
"""Identify latest LRG RefSeq Gene file. If unavailable locally, download from
Expand Down
1 change: 1 addition & 0 deletions cool_seq_tool/data_sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
from .gene_normalizer import GeneNormalizer
from .mane_transcript import MANETranscript
from .alignment_mapper import AlignmentMapper
from .feature_overlap import FeatureOverlap
238 changes: 238 additions & 0 deletions cool_seq_tool/data_sources/feature_overlap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
"""Module for getting feature (gene/exon) overlap"""
import re
from pathlib import Path
from typing import Dict, Optional

import pandas as pd
from ga4gh.core import ga4gh_identify
from ga4gh.vrs import models

from cool_seq_tool.data_sources import SeqRepoAccess
from cool_seq_tool.paths import MANE_REFSEQ_GFF_PATH
from cool_seq_tool.schemas import Assembly, CdsOverlap, ResidueMode


# Pattern for chromosome
CHR_PATTERN = r"X|Y|([1-9]|1[0-9]|2[0-2])"


class FeatureOverlapError(Exception):
"""Custom exception for the Feature Overlap class"""


class FeatureOverlap:
"""The class for getting feature overlap"""

def __init__(
self,
seqrepo_access: SeqRepoAccess,
mane_refseq_gff_path: Path = MANE_REFSEQ_GFF_PATH,
) -> None:
"""Initialize the FeatureOverlap class. Will load RefSeq data and store as df.

:param seqrepo_access: Client for accessing SeqRepo data
:param mane_refseq_gff_path: Path to the MANE RefSeq GFF file
"""
self.seqrepo_access = seqrepo_access
self.mane_refseq_gff_path = mane_refseq_gff_path
self.df = self._load_mane_refseq_gff_data()

def _load_mane_refseq_gff_data(self) -> pd.core.frame.DataFrame:
"""Load MANE RefSeq GFF data file into DataFrame.

:return: DataFrame containing MANE RefSeq GFF data for CDS. Columns include
`type`, `chromosome` (chromosome without 'chr' prefix), `cds_start`,
`cds_stop`, `info_name` (name of record), and `gene`. `cds_start` and
`cds_stop` use inter-residue coordinates.
"""
df = pd.read_csv(
self.mane_refseq_gff_path,
sep="\t",
header=None,
skiprows=9,
usecols=[0, 2, 3, 4, 8],
)
df.columns = ["chromosome", "type", "cds_start", "cds_stop", "info"]

# Restrict to only feature of interest: CDS (which has gene info)
df = df[df["type"] == "CDS"].copy()

# Get name from the info field
df["info_name"] = df["info"].apply(
lambda info: re.findall("Name=([^;]+)", info)[0]
)

# Get gene from the info field
df["gene"] = df["info"].apply(lambda info: re.findall("gene=([^;]+)", info)[0])

# Get chromosome names without prefix and without suffix for alternate
# transcripts
df["chromosome"] = df["chromosome"].apply(
lambda chromosome: chromosome.strip("chr").split("_")[0]
)
df["chromosome"] = df["chromosome"].astype(str)

# Convert start and stop to ints
df["cds_start"] = df["cds_start"].astype(int)
df["cds_stop"] = df["cds_stop"].astype(int)

# Convert to inter-residue coordinates
df["cds_start"] -= 1

# Only retain certain columns
df = df[["type", "chromosome", "cds_start", "cds_stop", "info_name", "gene"]]

return df

def _get_chr_from_alt_ac(self, identifier: str) -> str:
"""Get chromosome given genomic identifier

:param identifier: Genomic identifier on GRCh38 assembly
:raises FeatureOverlapError: If unable to find associated GRCh38 chromosome
:return: Chromosome. 1..22, X, Y. No 'chr' prefix.
"""
aliases, error_msg = self.seqrepo_access.translate_identifier(
identifier, Assembly.GRCH38.value
)

if error_msg:
raise FeatureOverlapError(str(error_msg))

if not aliases:
raise FeatureOverlapError(
f"Unable to find {Assembly.GRCH38.value} aliases for: {identifier}"
)

assembly_chr_pattern = rf"^{Assembly.GRCH38.value}:(?P<chromosome>{CHR_PATTERN})$" # noqa: E501
for a in aliases:
chr_match = re.match(assembly_chr_pattern, a)
if chr_match:
break

if not chr_match:
raise FeatureOverlapError(
f"Unable to find {Assembly.GRCH38.value} chromosome for: {identifier}"
)

chr_groupdict = chr_match.groupdict()
return chr_groupdict["chromosome"]

def get_grch38_mane_gene_cds_overlap(
self,
start: int,
end: int,
chromosome: Optional[str] = None,
identifier: Optional[str] = None,
residue_mode: ResidueMode = ResidueMode.RESIDUE,
) -> Optional[Dict[str, CdsOverlap]]:
"""Given GRCh38 genomic data, find the overlapping MANE features (gene and cds)

:param start: GRCh38 start position
:param end: GRCh38 end position
:param chromosome: Chromosome. 1..22, X, or Y. If not provided, must provide
`identifier`. If both `chromosome` and `identifier` are provided,
`chromosome` will be used.
:param identifier: Genomic identifier on GRCh38 assembly. If not provided, must
provide `chromosome`. If both `chromosome` and `identifier` are provided,
`chromosome` will be used.
:param residue_mode: Residue mode for `start` and `end`
:raise FeatureOverlapError: If missing required fields or unable to find
associated ga4gh identifier
:return: MANE feature (gene/cds) overlap data represented as a dict
{
gene: {
'cds': VRS Sequence Location
'overlap': VRS Sequence Location
}
}
"""
ga4gh_seq_id = None
if chromosome:
if not re.match(f"^{CHR_PATTERN}$", chromosome):
raise FeatureOverlapError("`chromosome` must be 1, ..., 22, X, or Y")
else:
if identifier:
chromosome = self._get_chr_from_alt_ac(identifier)
if identifier.startswith("ga4gh:SQ."):
ga4gh_seq_id = identifier
else:
raise FeatureOverlapError(
"Must provide either `chromosome` or `identifier`"
)

# Convert residue to inter-residue
if residue_mode == ResidueMode.RESIDUE:
start -= 1

# Get feature dataframe (df uses inter-residue)
feature_df = self.df[
(self.df["chromosome"] == chromosome)
& (self.df["cds_start"] <= end) # noqa: W503
& (self.df["cds_stop"] >= start) # noqa: W503
].copy()

if feature_df.empty:
return None

# Add overlap columns
feature_df["overlap_start"] = feature_df["cds_start"].apply(
lambda x: x if start <= x else start
)
feature_df["overlap_stop"] = feature_df["cds_stop"].apply(
lambda x: end if end <= x else x
)

# Get ga4gh identifier for chromosome
if not ga4gh_seq_id:
grch38_chr = f"{Assembly.GRCH38.value}:{chromosome}"
ga4gh_aliases, error_msg = self.seqrepo_access.translate_identifier(
grch38_chr, "ga4gh"
)

# Errors should never happen but catching just in case
if error_msg:
raise FeatureOverlapError(str(error_msg))
elif not ga4gh_aliases:
raise FeatureOverlapError(
f"Unable to find ga4gh identifier for: {grch38_chr}"
)

ga4gh_seq_id = ga4gh_aliases[0]

def _get_seq_loc(start_pos: int, stop_pos: int, ga4gh_sequence_id: str) -> Dict:
"""Get VRS Sequence Location represented as a dict

:param start_pos: Start position
:param stop_pos: Stop position
:param ga4gh_sequence_id: ga4gh sequence identifier
:return: VRS Sequence Location represented as dictionary with the ga4gh ID
included
"""
_sl = models.SequenceLocation(
sequence_id=ga4gh_sequence_id,
interval=models.SequenceInterval(
start=models.Number(value=start_pos),
end=models.Number(value=stop_pos),
),
)
_sl._id = ga4gh_identify(_sl)
return _sl.as_dict()

resp = {}
for gene, group in feature_df.groupby("gene"):
_gene_overlap_data = []

for cds_row in group.itertuples():
_gene_overlap_data.append(
CdsOverlap(
cds=_get_seq_loc(
cds_row.cds_start, cds_row.cds_stop, ga4gh_seq_id
),
overlap=_get_seq_loc(
cds_row.overlap_start, cds_row.overlap_stop, ga4gh_seq_id
),
).dict(by_alias=True)
)
resp[gene] = _gene_overlap_data

return resp
8 changes: 7 additions & 1 deletion cool_seq_tool/paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,17 @@

d = DataDownload()

provided_mane_refseq_gff_path = environ.get("MANE_REFSEQ_GFF_PATH")
if provided_mane_refseq_gff_path:
MANE_REFSEQ_GFF_PATH = Path(provided_mane_refseq_gff_path)
else:
MANE_REFSEQ_GFF_PATH = d.get_mane(is_summary=False)

provided_mane_summary_path = environ.get("MANE_SUMMARY_PATH", "")
if provided_mane_summary_path:
MANE_SUMMARY_PATH = Path(provided_mane_summary_path)
else:
MANE_SUMMARY_PATH = d.get_mane_summary()
MANE_SUMMARY_PATH = d.get_mane(is_summary=True)

provided_lrg_refseq_path = environ.get("LRG_REFSEQGENE_PATH", "")
if provided_lrg_refseq_path:
Expand Down
41 changes: 41 additions & 0 deletions cool_seq_tool/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import re
from typing import Literal, Optional, List, Tuple, Union, Dict, Any, Type

from ga4gh.vrsatile.pydantic.vrs_models import SequenceLocation
from pydantic import BaseModel, root_validator, validator
from pydantic.main import Extra
from pydantic.types import StrictStr, StrictInt
Expand Down Expand Up @@ -616,3 +617,43 @@ def schema_extra(schema: Dict[str, Any],
"url": "https://github.com/GenomicMedLab/cool-seq-tool"
}
}


class CdsOverlap(BaseModelForbidExtra):
"""Create model for representing CDS start/stop and Overlap start/stop"""

cds: SequenceLocation
overlap: SequenceLocation

class Config(BaseModelForbidExtra.Config):
"""Configure model."""

@staticmethod
def schema_extra(schema: Dict[str, Any], model: Type["CdsOverlap"]) -> None:
"""Configure OpenAPI schema."""
if "title" in schema.keys():
schema.pop("title", None)
for prop in schema.get("properties", {}).values():
prop.pop("title", None)
schema["example"] = {
"cds": {
"_id": "ga4gh:VSL._H2ST69A4RkWCSRHOoMv-edt-R45fPdq",
"type": "SequenceLocation",
"sequence_id": "ga4gh:SQ.F-LrLMe1SRpfUZHkQmvkVKFEGaoDeHul",
"interval": {
"type": "SequenceInterval",
"start": {"value": 140726493, "type": "Number"},
"end": {"value": 140726516, "type": "Number"},
},
},
"overlap": {
"_id": "ga4gh:VSL._H2ST69A4RkWCSRHOoMv-edt-R45fPdq",
"type": "SequenceLocation",
"sequence_id": "ga4gh:SQ.F-LrLMe1SRpfUZHkQmvkVKFEGaoDeHul",
"interval": {
"type": "SequenceInterval",
"start": {"value": 140726493, "type": "Number"},
"end": {"value": 140726516, "type": "Number"},
},
}
}
2 changes: 1 addition & 1 deletion cool_seq_tool/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.1.14-dev1"
__version__ = "0.1.14-dev2"
Loading