Skip to content

Commit

Permalink
Merge branch 'master' into message_hash
Browse files Browse the repository at this point in the history
  • Loading branch information
CameronDevine committed Sep 19, 2024
2 parents fb34085 + af8eb8d commit 86ecf4e
Show file tree
Hide file tree
Showing 12 changed files with 312 additions and 113 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
with:
python-version: ${{ matrix.python-version }}
- name: Dependencies
run: python -m pip install pytest
run: python -m pip install pytest pytest-mock
- name: Install
run: pip install .
- name: Test
Expand Down
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,24 @@

# Pigeon

Pigeon is a combination of a [STOMP client](https://pypi.org/project/stomp-py/), and a message definition system using [Pydantic](https://docs.pydantic.dev/latest/) models.
Pigeon is a combination of a [STOMP client](https://pypi.org/project/stomp-py/), and a message definition system using [Pydantic](https://docs.pydantic.dev/latest/) models.

## Message Definitions

Messages are defined by writing a Pydantic model for each topic. These messages can be registered at runtime, or placed in a Python package and automatically loaded when Pigeon is imported using an [entrypoint](https://packaging.python.org/en/latest/specifications/entry-points/).

## Logging

When a Pigeon client is instantiated, a logger is created. If desired, logs can also be sent to [Grafana Loki](https://grafana.com/oss/loki/) by setting environment variables.

| Variable | Documentation |
| ------------- | ----------------------------------------------------------------------------- |
| LOKI_URL | The URL of the location of the Loki Server |
| LOKI_TAGS | A mapping using colons to split tags and values, and commas to separate pairs |
| LOKI_USERNAME | The username to use when connecting to the server |
| LOKI_PASSWORD | The password to use when connecting to the server |
| LOKI_VERSION | The version of the Loki Emitter to use |

## Templates

To ease the creation of services using Pigeon, a [Cookiecutter](https://cookiecutter.readthedocs.io/en/stable/) [template](https://github.com/AllenInstitute/pigeon-service-cookiecutter) is available. Similarly, a [template](https://github.com/AllenInstitute/pigeon-msgs-cookiecutter) for a message definition package is available.
2 changes: 1 addition & 1 deletion examples/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from pigeon.client import Pigeon
from pigeon.utils import setup_logging
from pigeon.base_msg import BaseMessage
from pigeon import BaseMessage


class TestMsg(BaseMessage):
Expand Down
6 changes: 4 additions & 2 deletions examples/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from pigeon.client import Pigeon
from pigeon.utils import setup_logging
from pigeon.base_msg import BaseMessage
from pigeon import BaseMessage

logger = setup_logging("subscriber")

Expand All @@ -18,7 +18,9 @@ def handle_test_message(topic, message):
logger.info(f"Received {topic} message: {message}")


connection = Pigeon("Subscriber", host=host, port=port, logger=logger, load_topics=False)
connection = Pigeon(
"Subscriber", host=host, port=port, logger=logger, load_topics=False
)
connection.register_topic("test", TestMsg)
connection.connect(username="admin", password="password")
connection.subscribe("test", handle_test_message)
Expand Down
2 changes: 1 addition & 1 deletion pigeon/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from .client import Pigeon
from .base_msg import BaseMessage
from .messages import BaseMessage
70 changes: 57 additions & 13 deletions pigeon/client.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import logging
import os
import socket
import time
from importlib.metadata import entry_points
from typing import Callable, Dict

import stomp
from typing import Callable, Dict
from stomp.utils import Frame
import stomp.exception
from importlib.metadata import entry_points
from pydantic import ValidationError
from stomp.utils import Frame

from . import messages
from . import exceptions
from .utils import get_message_hash, call_with_correct_args

Expand Down Expand Up @@ -61,6 +64,31 @@ def __init__(
)
self._logger = logger if logger is not None else self._configure_logging()

self._pid = os.getpid()
self._hostname = socket.gethostname().split(".")[0]
self._name = f"{self._service}_{self._pid}_{self._hostname}"
self.register_topics(messages.core_topics)

def _announce(self, connected=True):
self.send(
"&_announce_connection",
name=self._name,
pid=self._pid,
hostname=self._hostname,
service=self._service,
connected=connected,
)

def _update_state(self):
self.send(
"&_update_state",
name=self._name,
pid=self._pid,
hostname=self._hostname,
service=self._service,
subscribed_to=list(self._callbacks.keys()),
)

@staticmethod
def _configure_logging() -> logging.Logger:
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -102,10 +130,10 @@ def connect(
Args:
username (str, optional): The username to authenticate with. Defaults to None.
password (str, optional): The password to authenticate with. Defaults to None.
retry_limit (int, optional): Number of times to attempt connection
Raises:
stomp.exception.ConnectFailedException: If the connection to the server fails.
"""
retries = 0
while retries < retry_limit:
Expand All @@ -124,6 +152,9 @@ def connect(
f"Could not connect to server: {e}"
) from e

self.subscribe("&_request_state", self._update_state)
self._announce()

def send(self, topic: str, **data):
"""
Sends data to the specified topic.
Expand All @@ -134,12 +165,15 @@ def send(self, topic: str, **data):
Raises:
exceptions.NoSuchTopicException: If the specified topic is not defined.
"""
self._ensure_topic_exists(topic)
serialized_data = self._topics[topic](**data).serialize()

headers = dict(
source=self._name,
service=self._service,
hostname=self._hostname,
pid=self._pid,
hash=self._hashes[topic],
sent_at=get_str_time_ms(),
)
Expand Down Expand Up @@ -186,36 +220,45 @@ def _handle_message(self, message_frame: Frame):
f"Callback for topic '{topic}' failed with error:", exc_info=True
)

def subscribe(self, topic: str, callback: Callable):
def subscribe(self, topic: str, callback: Callable, send_update=True):
"""
Subscribes to a topic and associates a callback function to handle incoming messages.
Args:
topic (str): The topic to subscribe to.
callback (Callable): The callback function to handle incoming
messages. It may accept up to three arguments. In order, the
arguments are, the recieved message, the topic the message was
recieved on, and the message headers.
arguments are, the received message, the topic the message was
received on, and the message headers.
Raises:
NoSuchTopicException: If the specified topic is not defined.
"""
self._ensure_topic_exists(topic)
if topic not in self._callbacks:
self._connection.subscribe(destination=topic, id=topic)
self._callbacks[topic] = callback
self._logger.info(f"Subscribed to {topic} with {callback}.")
if send_update:
self._update_state()

def subscribe_all(self, callback: Callable):
def subscribe_all(self, callback: Callable, include_core=False):
"""Subscribes to all registered topics.
Args:
callback: The function to call when a message is recieved. It must
callback: The function to call when a message is received. It must
accept two arguments, the topic and the message data.
include_core (bool): If true, subscribe all will subscribe the client to core messages.
"""

# Additional logic here is to avoid subscribe_all changing behavior and always subscribing to core topics.
for topic in self._topics:
self.subscribe(topic, callback)
if topic in messages.topics and not include_core:
continue
if topic is "&_request_state":
continue
self.subscribe(topic, callback, send_update=False)
self._update_state()

def unsubscribe(self, topic: str):
"""Unsubscribes from a given topic.
Expand All @@ -230,6 +273,7 @@ def unsubscribe(self, topic: str):
def disconnect(self):
"""Disconnect from the STOMP message broker."""
if self._connection.is_connected():
self._announce(connected=False)
self._connection.disconnect()
self._logger.info("Disconnected from STOMP server.")

Expand All @@ -239,5 +283,5 @@ def __init__(self, callback: Callable):
self.callback = callback

def on_message(self, frame):
frame.headers["recieved_at"] = get_str_time_ms()
frame.headers["received_at"] = get_str_time_ms()
self.callback(frame)
26 changes: 26 additions & 0 deletions pigeon/base_msg.py → pigeon/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,29 @@ def deserialize(cls, data: str):
An instantiation of the model using the JSON data.
"""
return cls.model_validate_json(data)


class AnnounceConnection(BaseMessage):
name: str
service: str
pid: int
hostname: str
connected: bool


class RequestState(BaseMessage): ...


class UpdateState(BaseMessage):
name: str
service: str
pid: int
hostname: str
subscribed_to: list[str]


core_topics = {
"&_announce_connection": AnnounceConnection,
"&_request_state": RequestState,
"&_update_state": UpdateState,
}
33 changes: 29 additions & 4 deletions pigeon/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,44 @@
from copy import copy
import hashlib
from typing import Callable
import os
from logging_loki import LokiQueueHandler
from multiprocessing import Queue

from .exceptions import SignatureException


def setup_logging(logger_name: str, log_level: int = logging.INFO):
logger = logging.getLogger(logger_name)
handler = logging.StreamHandler()
formatter = logging.Formatter(
stream_handler = logging.StreamHandler()
stream_formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
stream_handler.setFormatter(stream_formatter)
logger.addHandler(stream_handler)
logger.setLevel(log_level)
if "LOKI_URL" in os.environ:
loki_handler = LokiQueueHandler(
Queue(-1),
url=os.environ.get("LOKI_URL"),
tags=(
dict(
[val.strip() for val in tag.split(":")]
for tag in os.environ.get("LOKI_TAGS").split(",")
)
if "LOKI_TAGS" in os.environ
else None
),
auth=(
(os.environ.get("LOKI_USERNAME"), os.environ.get("LOKI_PASSWORD"))
if "LOKI_USERNAME" in os.environ or "LOKI_PASSWORD" in os.environ
else None
),
version=os.environ.get("LOKI_VERSION"),
)
loki_formatter = logging.Formatter("%(message)s")
loki_handler.setFormatter(loki_formatter)
logger.addHandler(loki_handler)
return logger


Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pydantic
stomp-py
pyyaml
pyyaml
python-logging-loki
84 changes: 0 additions & 84 deletions tests/test_call_with_correct_args.py

This file was deleted.

Loading

0 comments on commit 86ecf4e

Please sign in to comment.