Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check for service revision existence #680

Merged
merged 9 commits into from
Aug 27, 2024
182 changes: 93 additions & 89 deletions docs/source/inter_service_compatibility.rst

Large diffs are not rendered by default.

24 changes: 18 additions & 6 deletions octue/cloud/pub_sub/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ def services_topic(self):
topic = Topic(name=OCTUE_SERVICES_PREFIX, project_name=self.backend.project_name)

if not topic.exists():
raise octue.exceptions.ServiceNotFound(f"{topic!r} cannot be found.")
raise octue.exceptions.ServiceNotFound(
f"The {topic!r} topic cannot be found. Check that it's been created for this service network."
)

return topic

Expand Down Expand Up @@ -147,7 +149,7 @@ def serve(self, timeout=None, delete_topic_and_subscription_on_exit=False, allow
logger.info("Starting %r.", self)

subscription = Subscription(
name=".".join((OCTUE_SERVICES_PREFIX, self._pub_sub_id)),
name=self._pub_sub_id,
topic=self.services_topic,
filter=f'attributes.recipient = "{self.id}" AND attributes.sender_type = "{PARENT_SENDER_TYPE}"',
expiration_time=None,
Expand Down Expand Up @@ -328,6 +330,8 @@ def ask(
"""
service_namespace, service_name, service_revision_tag = split_service_id(service_id)

# If using a service registry, check that the service revision is registered, or get the default service
# revision if no revision tag is provided.
if self.service_registries:
if service_revision_tag:
raise_if_revision_not_registered(sruid=service_id, service_registries=self.service_registries)
Expand All @@ -338,7 +342,17 @@ def ask(
service_registries=self.service_registries,
)

elif not service_revision_tag:
# If not using a service registry, check that the service revision exists by checking for its subscription.
elif service_revision_tag:
service_revision_subscription = Subscription(
name=convert_service_id_to_pub_sub_form(service_id),
topic=self.services_topic,
)

if not service_revision_subscription.exists():
raise octue.exceptions.ServiceNotFound(f"Service revision {service_id!r} not found.")

else:
raise octue.exceptions.InvalidServiceID(
f"A service revision tag for {service_id!r} must be provided if service registries aren't being used."
)
Expand All @@ -362,10 +376,8 @@ def ask(
if asynchronous and not push_endpoint:
answer_subscription = None
else:
pub_sub_id = convert_service_id_to_pub_sub_form(self.id)

answer_subscription = Subscription(
name=".".join((OCTUE_SERVICES_PREFIX, pub_sub_id, ANSWERS_NAMESPACE, question_uuid)),
name=".".join((self._pub_sub_id, ANSWERS_NAMESPACE, question_uuid)),
topic=self.services_topic,
filter=(
f'attributes.recipient = "{self.id}" '
Expand Down
2 changes: 1 addition & 1 deletion octue/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def __init__(
configuration_manifest=None,
children=None,
output_location=None,
use_signed_urls_for_output_datasets=True,
use_signed_urls_for_output_datasets=False,
**kwargs,
):
self.configuration_values = configuration_values
Expand Down
Loading
Loading