Skip to content

Commit

Permalink
export metrics via uvicorn asgi app (#576)
Browse files Browse the repository at this point in the history
* extract ThreadingHTTPServer to separate module `logprep.util.http`
* add security best practices
* update changelog

---------

Co-authored-by: dtrai2 <95028228+dtrai2@users.noreply.github.com>
  • Loading branch information
ekneg54 and dtrai2 authored Apr 29, 2024
1 parent 3376751 commit 0e5198d
Show file tree
Hide file tree
Showing 10 changed files with 247 additions and 176 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

### Features

* expose metrics via uvicorn webserver
* makes all uvicorn configuration options possible
* add security best practices to server configuration

### Improvements

### Bugfix
Expand Down
182 changes: 41 additions & 141 deletions logprep/connector/http/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,20 +77,16 @@
* Responds with 405
"""

import inspect
import logging
import multiprocessing as mp
import queue
import re
import threading
from abc import ABC
from base64 import b64encode
from logging import Logger
from typing import Callable, Mapping, Tuple, Union

import falcon.asgi
import msgspec
import uvicorn
from attrs import define, field, validators
from falcon import ( # pylint: disable=no-name-in-module
HTTP_200,
Expand All @@ -100,14 +96,9 @@
)

from logprep.abc.input import FatalInputError, Input
from logprep.util import defaults
from logprep.util import http
from logprep.util.credentials import CredentialsFactory

uvicorn_parameter_keys = inspect.signature(uvicorn.Config).parameters.keys()
UVICORN_CONFIG_KEYS = [
parameter for parameter in uvicorn_parameter_keys if parameter not in ["app", "log_level"]
]


def decorator_basic_auth(func: Callable):
"""Decorator to check basic authentication.
Expand Down Expand Up @@ -272,113 +263,6 @@ async def __call__(self, req, resp, **kwargs): # pylint: disable=arguments-diff
self.messages.put({**event, **metadata}, block=False)


class ThreadingHTTPServer: # pylint: disable=too-many-instance-attributes
"""Singleton Wrapper Class around Uvicorn Thread that controls
lifecycle of Uvicorn HTTP Server. During Runtime this singleton object
is stateful and therefore we need to check for some attributes during
__init__ when multiple consecutive reconfigurations are happening.
Parameters
----------
connector_config: Input.Config
Holds full connector config for config change checks
endpoints_config: dict
Endpoint paths as key and initiated endpoint objects as
value
log_level: str
Log level to be set for uvicorn server
"""

_instance = None
_lock = threading.Lock()

def __new__(cls, *args, **kwargs):
if cls._instance is None:
with cls._lock:
if not cls._instance:
cls._instance = super(ThreadingHTTPServer, cls).__new__(cls)
return cls._instance

def __init__(
self,
connector_config: Input.Config,
endpoints_config: dict,
log_level: str,
) -> None:
"""Creates object attributes with necessary configuration.
As this class creates a singleton object, the existing server
will be stopped and restarted on consecutively creations"""
super().__init__()

self.connector_config = connector_config
self.endpoints_config = endpoints_config
self.log_level = log_level

if hasattr(self, "thread"):
if self.thread.is_alive(): # pylint: disable=access-member-before-definition
self._stop()
self._start()

def _start(self):
"""Collect all configs, initiate application server and webserver
and run thread with uvicorn+falcon http server and wait
until it is up (started)"""
self.uvicorn_config = self.connector_config.uvicorn_config
self._init_web_application_server(self.endpoints_config)
log_config = self._init_log_config()
self.compiled_config = uvicorn.Config(
**self.uvicorn_config,
app=self.app,
log_level=self.log_level,
log_config=log_config,
)
self.server = uvicorn.Server(self.compiled_config)
self._override_runtime_logging()
self.thread = threading.Thread(daemon=False, target=self.server.run)
self.thread.start()
while not self.server.started:
continue

def _stop(self):
"""Stop thread with uvicorn+falcon http server, wait for uvicorn
to exit gracefully and join the thread"""
if self.thread.is_alive():
self.server.should_exit = True
while self.thread.is_alive():
continue
self.thread.join()

def _init_log_config(self) -> dict:
"""Use for Uvicorn same log formatter like for Logprep"""
log_config = uvicorn.config.LOGGING_CONFIG
log_config["formatters"]["default"]["fmt"] = defaults.DEFAULT_LOG_FORMAT
log_config["formatters"]["access"]["fmt"] = defaults.DEFAULT_LOG_FORMAT
log_config["handlers"]["default"]["stream"] = "ext://sys.stdout"
return log_config

def _override_runtime_logging(self):
"""Uvicorn doesn't provide API to change name and handler beforehand
needs to be done during runtime"""
http_server_name = logging.getLogger("Logprep").name + " HTTPServer"
for logger_name in ["uvicorn", "uvicorn.access"]:
logging.getLogger(logger_name).removeHandler(logging.getLogger(logger_name).handlers[0])
logging.getLogger(logger_name).addHandler(
logging.getLogger("Logprep").parent.handlers[0]
)
logging.getLogger("uvicorn.access").name = http_server_name
logging.getLogger("uvicorn.error").name = http_server_name

def _init_web_application_server(self, endpoints_config: dict) -> None:
"Init falcon application server and setting endpoint routes"
self.app = falcon.asgi.App() # pylint: disable=attribute-defined-outside-init
for endpoint_path, endpoint in endpoints_config.items():
self.app.add_sink(endpoint, prefix=route_compile_helper(endpoint_path))

def shut_down(self):
"""Shutdown method to trigger http server shutdown externally"""
self._stop()


class HttpConnector(Input):
"""Connector to accept log messages as http post requests"""

Expand All @@ -390,15 +274,32 @@ class Config(Input.Config):
validator=[
validators.instance_of(dict),
validators.deep_mapping(
key_validator=validators.in_(UVICORN_CONFIG_KEYS),
# lamba xyz tuple necessary because of input structure
key_validator=validators.in_(http.UVICORN_CONFIG_KEYS),
# lambda xyz tuple necessary because of input structure
value_validator=lambda x, y, z: True,
),
]
)

"""Configure uvicorn server. For possible settings see
`uvicorn settings page <https://www.uvicorn.org/settings>`_.
.. security-best-practice::
:title: Uvicorn Webserver Configuration
:location: uvicorn_config
:suggested-value: uvicorn_config.access_log: true, uvicorn_config.server_header: false, uvicorn_config.data_header: false
Additionaly to the below it is recommended to configure `ssl on the metrics server endpoint
<https://www.uvicorn.org/settings/#https>`_
.. code-block:: yaml
uvicorn_config:
access_log: true
server_header: false
date_header: false
workers: 2
"""
endpoints: Mapping[str, str] = field(
validator=[
Expand Down Expand Up @@ -457,15 +358,14 @@ class Config(Input.Config):

def __init__(self, name: str, configuration: "HttpConnector.Config", logger: Logger) -> None:
super().__init__(name, configuration, logger)
internal_uvicorn_config = {
"lifespan": "off",
"loop": "asyncio",
"timeout_graceful_shutdown": 5,
}
self._config.uvicorn_config.update(internal_uvicorn_config)
self.port = self._config.uvicorn_config["port"]
self.host = self._config.uvicorn_config["host"]
self.target = f"http://{self.host}:{self.port}"
port = self._config.uvicorn_config["port"]
host = self._config.uvicorn_config["host"]
ssl_options = any(
setting for setting in self._config.uvicorn_config if setting.startswith("ssl")
)
schema = "https" if ssl_options else "http"
self.target = f"{schema}://{host}:{port}"
self.http_server = None

def setup(self):
"""setup starts the actual functionality of this connector.
Expand Down Expand Up @@ -501,11 +401,19 @@ def setup(self):
self.messages, collect_meta, metafield_name, credentials
)

self.http_server = ThreadingHTTPServer( # pylint: disable=attribute-defined-outside-init
connector_config=self._config,
endpoints_config=endpoints_config,
log_level=self._logger.level,
app = self._get_asgi_app(endpoints_config)
self.http_server = http.ThreadingHTTPServer(
self._config.uvicorn_config, app, daemon=False, logger_name="Logprep HTTPServer"
)
self.http_server.start()

@staticmethod
def _get_asgi_app(endpoints_config: dict) -> falcon.asgi.App:
"""Init falcon application server and setting endpoint routes"""
app = falcon.asgi.App()
for endpoint_path, endpoint in endpoints_config.items():
app.add_sink(endpoint, prefix=route_compile_helper(endpoint_path))
return app

def _get_event(self, timeout: float) -> Tuple:
"""Returns the first message from the queue"""
Expand All @@ -516,16 +424,8 @@ def _get_event(self, timeout: float) -> Tuple:
except queue.Empty:
return None, None

def get_app_instance(self):
"""Return app instance from webserver thread"""
return self.http_server.app

def get_server_instance(self):
"""Return server instance from webserver thread"""
return self.http_server.server

def shut_down(self):
"""Raises Uvicorn HTTP Server internal stop flag and waits to join"""
if not hasattr(self, "http_server"):
if self.http_server is None:
return
self.http_server.shut_down()
25 changes: 17 additions & 8 deletions logprep/metrics/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,29 @@
import shutil
from logging import getLogger

from prometheus_client import REGISTRY, multiprocess, start_http_server
from prometheus_client import REGISTRY, make_asgi_app, multiprocess

from logprep.util import http
from logprep.util.configuration import MetricsConfig


class PrometheusExporter:
"""Used to control the prometheus exporter and to manage the metrics"""

def __init__(self, status_logger_config: MetricsConfig):
def __init__(self, configuration: MetricsConfig):
self.is_running = False
self._logger = getLogger("Prometheus Exporter")
logger_name = "Prometheus Exporter"
self._logger = getLogger(logger_name)
self._logger.debug("Initializing Prometheus Exporter")
self.configuration = status_logger_config
self._port = status_logger_config.port
self.configuration = configuration
self._port = configuration.port
self._app = make_asgi_app(REGISTRY)
self._server = http.ThreadingHTTPServer(
configuration.uvicorn_config | {"port": self._port},
self._app,
daemon=True,
logger_name=logger_name,
)

def _prepare_multiprocessing(self):
"""
Expand All @@ -36,7 +45,7 @@ def cleanup_prometheus_multiprocess_dir(self):
os.remove(os.path.join(root, file))
for directory in dirs:
shutil.rmtree(os.path.join(root, directory), ignore_errors=True)
self._logger.info("Cleaned up %s" % multiprocess_dir)
self._logger.info("Cleaned up %s", multiprocess_dir)

def mark_process_dead(self, pid):
"""
Expand All @@ -53,6 +62,6 @@ def mark_process_dead(self, pid):
def run(self):
"""Starts the default prometheus http endpoint"""
self._prepare_multiprocessing()
start_http_server(self._port)
self._logger.info(f"Prometheus Exporter started on port {self._port}")
self._server.start()
self._logger.info("Prometheus Exporter started on port %s", self._port)
self.is_running = True
40 changes: 38 additions & 2 deletions logprep/util/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@
from logprep.factory import Factory
from logprep.factory_error import FactoryError, InvalidConfigurationError
from logprep.processor.base.exceptions import InvalidRuleDefinitionError
from logprep.util import getter
from logprep.util import getter, http
from logprep.util.credentials import CredentialsEnvNotFoundError, CredentialsFactory
from logprep.util.defaults import (
DEFAULT_CONFIG_LOCATION,
Expand Down Expand Up @@ -306,6 +306,17 @@ class MetricsConfig:

enabled: bool = field(validator=validators.instance_of(bool), default=False)
port: int = field(validator=validators.instance_of(int), default=8000)
uvicorn_config: dict = field(
validator=[
validators.instance_of(dict),
validators.deep_mapping(
key_validator=validators.in_(http.UVICORN_CONFIG_KEYS),
# lambda xyz tuple necessary because of input structure
value_validator=lambda x, y, z: True,
),
],
factory=dict,
)


@define(kw_only=True)
Expand Down Expand Up @@ -397,7 +408,32 @@ class Configuration:
converter=lambda x: MetricsConfig(**x) if isinstance(x, dict) else x,
eq=False,
)
"""Metrics configuration. Defaults to :code:`{"enabled": False, "port": 8000}`."""
"""Metrics configuration. Defaults to
:code:`{"enabled": False, "port": 8000, "uvicorn_config": {}}`.
The key :code:`uvicorn_config` can be configured with any uvicorn config parameters.
For further information see the `uvicorn documentation <https://www.uvicorn.org/settings/>`_.
.. security-best-practice::
:title: Metrics Configuration
:location: config.metrics.uvicorn_config
:suggested-value: metrics.uvicorn_config.access_log: true, metrics.uvicorn_config.server_header: false, metrics.uvicorn_config.data_header: false
Additionaly to the below it is recommended to configure `ssl on the metrics server endpoint
<https://www.uvicorn.org/settings/#https>`_
.. code-block:: yaml
metrics:
enabled: true
port: 9000
uvicorn_config:
access_log: true
server_header: false
date_header: false
workers: 1
"""
profile_pipelines: bool = field(default=False, eq=False)
"""Start the profiler to profile the pipeline. Defaults to :code:`False`."""
print_auto_test_stack_trace: bool = field(default=False, eq=False)
Expand Down
Loading

0 comments on commit 0e5198d

Please sign in to comment.