Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancing and tweaking the utils.question module #102

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions PLATER/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ fastapi==0.85.0
pyaml==20.4.0
pytest==7.4.3
pytest-asyncio==0.21.1
deepdiff==6.7.1
uvicorn==0.24.0
reasoner-transpiler==2.0.5
reasoner-pydantic==4.1.6
Expand Down
130 changes: 97 additions & 33 deletions PLATER/services/util/question.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import List, Dict
import copy
import orjson
import time
Expand Down Expand Up @@ -26,6 +27,7 @@
config.get('logging_format'),
)


class Question:
# SPEC VARS
QUERY_GRAPH_KEY = 'query_graph'
Expand Down Expand Up @@ -61,44 +63,95 @@ def compile_cypher(self, **kwargs):
item['qualifier_type_id'] = item['qualifier_type_id'].replace('biolink:', '')
return get_query(query_graph, **kwargs)

def _construct_sources_tree(self, sources: List[Dict]) -> List[Dict]:
"""
Method to fill out the full annotation for edge "sources"
entries including "upstream_resource_ids" tree.
:param sources: List[Dict], edge 'sources' property entries
:return: enhanced "sources" including top-level "Monarch TRAPI" source entry.
"""
if not sources:
# empty sources.. pretty strange, but then just send back
# an instance of the top-level "Monarch TRAPI" source entry
return [
{
"resource_id": self.provenance,
"resource_role": "aggregator_knowledge_source",
"source_record_urls": None,
"upstream_resource_ids": None
}
]

def _construct_sources_tree(self, sources):
# if primary source and aggregator source are specified in the graph, upstream_resource_ids of all aggregator_ks
# be that source
# if primary source and aggregator source are specified in the graph,
# upstream_resource_ids of all aggregator_ks be that source

# if aggregator ks are coming from db, plater would add itself as aggregator and use other aggregator ids
# if aggregator ks are coming from db, mta would add itself as aggregator and use other aggregator ids
# as upstream resources, if no aggregators are found and only primary ks is provided that would be added
# as upstream for the plater entry
# as upstream for the mta entry
formatted_sources = []
resource_ids_with_resource_role = {}
source_record_urls_to_resource_id = {}

# filter out source entries that actually have values
temp = {}
for source in sources:
if not source['resource_id']:

if not (
'resource_id' in source and
source['resource_id'] and
'resource_role' in source and
source['resource_role']
):
# silently pruning TRAPI non-compliant source records
logger.warning(f"Invalid edge 'source' entry: '{str(source)}'? Skipped!")
continue
temp[source['resource_role']] = temp.get(source['resource_role'], set())

# 'resource_role' values are now ResourceRoleEnum without a biolink: CURIE prefix
source['resource_role'] = source['resource_role'].lstrip("biolink:")

resource_ids_with_resource_role[source['resource_role']] = \
resource_ids_with_resource_role.get(source['resource_role'], set())

source_record_urls_to_resource_id[source['resource_id']] = \
source['source_record_urls'] if 'source_record_urls' in source else None

if isinstance(source["resource_id"], str):
temp[source["resource_role"]].add(source["resource_id"])
resource_ids_with_resource_role[source["resource_role"]].add(source["resource_id"])
elif isinstance(source["resource_id"], list):
for resource_id in source["resource_id"]:
temp[source["resource_role"]].add(resource_id)
resource_ids_with_resource_role[source["resource_role"]].add(resource_id)

for resource_role in resource_ids_with_resource_role:

for resource_role in temp:
upstreams = None
if resource_role == "biolink:aggregator_knowledge_source":
upstreams = temp.get("biolink:primary_knowledge_source", None)

if resource_role == "aggregator_knowledge_source":
upstreams = resource_ids_with_resource_role.get("primary_knowledge_source", None)
elif resource_role == "primary_knowledge_source":
upstreams = resource_ids_with_resource_role.get("supporting_data_source", None)

formatted_sources += [
{"resource_id": resource_id, "resource_role": resource_role.lstrip('biolink:'), "upstream_resource_ids": upstreams}
for resource_id in temp[resource_role]
{
"resource_id": resource_id,
"resource_role": resource_role.lstrip("biolink:"),
"source_record_urls": source_record_urls_to_resource_id[resource_id],
"upstream_resource_ids": list(upstreams) if upstreams else None
}
for resource_id in resource_ids_with_resource_role[resource_role]
]
upstreams_for_plater_entry = temp.get("biolink:aggregator_knowledge_source") or temp.get("biolink:primary_knowledge_source")

upstreams_for_top_level_entry = \
resource_ids_with_resource_role.get("aggregator_knowledge_source") or \
resource_ids_with_resource_role.get("primary_knowledge_source") or \
resource_ids_with_resource_role.get("supporting_data_source")

formatted_sources.append({
"resource_id":self.provenance,
"resource_id": self.provenance,
"resource_role": "aggregator_knowledge_source",
"upstream_resource_ids": upstreams_for_plater_entry
"source_record_urls": None,
"upstream_resource_ids": list(upstreams_for_top_level_entry) if upstreams_for_top_level_entry else None
})
return formatted_sources

return formatted_sources

def format_attribute_trapi(self, kg_items, node=False):
for identifier in kg_items:
Expand All @@ -110,8 +163,10 @@ def format_attribute_trapi(self, kg_items, node=False):

# separate the qualifiers from attributes for edges and format them
if not node:
qualifier_results = [attrib for attrib in attributes
if 'qualifie' in attrib['original_attribute_name']]
qualifier_results = [
attrib for attrib in attributes
if 'original_attribute_name' in attrib and 'qualifie' in attrib['original_attribute_name']
]
if qualifier_results:
formatted_qualifiers = []
for qualifier in qualifier_results:
Expand All @@ -123,36 +178,45 @@ def format_attribute_trapi(self, kg_items, node=False):
})
props['qualifiers'] = formatted_qualifiers

# create a new list that doesnt have the core properties or qualifiers
new_attribs = [attrib for attrib in attributes
if attrib['original_attribute_name'] not in props and
attrib['original_attribute_name'] not in skip_list and
'qualifie' not in attrib['original_attribute_name']
]
# create a new list that doesn't have the core properties or qualifiers
new_attribs: List = list()
for attrib in attributes:
if 'original_attribute_name' not in attrib or (
attrib['original_attribute_name'] not in props and
attrib['original_attribute_name'] not in skip_list and
'qualifie' not in attrib['original_attribute_name']
):
new_attribs.append(attrib)

# for the non-core properties
for attr in new_attribs:
# make sure the original_attribute_name has something other than none
attr['original_attribute_name'] = attr['original_attribute_name'] or ''
attr['original_attribute_name'] = \
('original_attribute_name' in attr and attr['original_attribute_name']) or ''

# map the attribute type to the list above, otherwise generic default
attr["value_type_id"] = VALUE_TYPES.get(attr["original_attribute_name"], "EDAM:data_0006")
attr["value_type_id"] = \
("value_type_id" in attr and attr["value_type_id"]) or \
VALUE_TYPES.get(attr["original_attribute_name"], "EDAM:data_0006")

# uses generic data as attribute type id if not defined
if not ("attribute_type_id" in attr and attr["attribute_type_id"] != 'NA'):
attribute_data = get_attribute_bl_info(attr["original_attribute_name"])
if attribute_data:
attr.update(attribute_data)

# update edge provenance with automat infores, filter empty ones, expand list type resource ids
# update edge provenance with infores,
# filter empty ones, expand list type resource ids
if not node:
kg_items[identifier]["sources"] = self._construct_sources_tree(kg_items[identifier].get("sources", []))
kg_items[identifier]["sources"] = \
self._construct_sources_tree(kg_items[identifier].get("sources", []))

# assign these attribs back to the original attrib list without the core properties
props['attributes'] = new_attribs

return kg_items

def transform_attributes(self, trapi_message, graph_interface: GraphInterface):
def transform_attributes(self, trapi_message):
self.format_attribute_trapi(trapi_message.get('knowledge_graph', {}).get('nodes', {}), node=True)
self.format_attribute_trapi(trapi_message.get('knowledge_graph', {}).get('edges', {}))
for r in trapi_message.get("results", []):
Expand Down Expand Up @@ -197,7 +261,7 @@ async def answer(self, graph_interface: GraphInterface):
}
)
results_dict = graph_interface.convert_to_dict(results)
self._question_json.update(self.transform_attributes(results_dict[0], graph_interface))
self._question_json.update(self.transform_attributes(results_dict[0]))
self._question_json = Question.apply_attribute_constraints(self._question_json)
return self._question_json

Expand Down
Loading
Loading