Skip to content

Commit

Permalink
advanced logging configuration (#584)
Browse files Browse the repository at this point in the history
* implement dictconfig
* change logging format and logger names
* shorten logger names
* remove queuehandler and own listener process 
---------

Co-authored-by: dtrai2 <95028228+dtrai2@users.noreply.github.com>
  • Loading branch information
ekneg54 and dtrai2 authored May 13, 2024
1 parent 9b406c4 commit 4508d24
Show file tree
Hide file tree
Showing 32 changed files with 421 additions and 191 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ dist/
error_file
experiments
**/_static/*.xlsx
logprep.log
1 change: 1 addition & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ disable=too-few-public-methods
[DESIGN]
min-public-methods=1
max-public-methods=40
max-attributes=12


[CLASSES]
Expand Down
9 changes: 7 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,21 @@

### Features

* add fine-grained logger configuration for OpenSearch/ElasticSearch libraries
* add gzip handling to `http_input` connector
* adds advanced logging configuration
* add configurable log format
* add configurable datetime formate in logs
* makes `hostname` available in custom log formats
* add fine grained log level configuration for every logger instance

### Improvements

* rename `logprep.event_generator` module to `logprep.generator`
* shorten logger instance names

### Bugfix

* fixes exposing OpenSearch/ElasticSearch stacktraces in log when errors happen
* fixes exposing OpenSearch/ElasticSearch stacktraces in log when errors happen by making loglevel configurable for loggers `opensearch` and `elasticsearch`

## 11.2.1

Expand Down
4 changes: 0 additions & 4 deletions logprep/connector/elasticsearch/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,6 @@ class Config(Output.Config):
flush_timeout: Optional[int] = field(validator=validators.instance_of(int), default=60)
"""(Optional) Timeout after :code:`message_backlog` is flushed if
:code:`message_backlog_size` is not reached."""
loglevel: Optional[str] = field(validator=validators.instance_of(str), default="INFO")
"""(Optional) Log level for the underlying library. Enables fine-grained control over the
logging, e.g. stacktraces can be activated or deactivated. Defaults to :code:`INFO`."""

__slots__ = ["_message_backlog", "_size_error_pattern"]

Expand Down Expand Up @@ -172,7 +169,6 @@ def _search_context(self) -> search.Elasticsearch:
elasticsearch.Elasticsearch
the eleasticsearch context
"""
logging.getLogger("elasticsearch").setLevel(self._config.loglevel)
return search.Elasticsearch(
self._config.hosts,
scheme=self.schema,
Expand Down
3 changes: 1 addition & 2 deletions logprep/connector/http/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,6 @@ def setup(self):
raise FatalInputError(
self, "Necessary instance attribute `pipeline_index` could not be found."
)

self._logger.debug(
f"HttpInput Connector started on target {self.target} and "
f"queue {id(self.messages)} "
Expand Down Expand Up @@ -462,7 +461,7 @@ def setup(self):

app = self._get_asgi_app(endpoints_config)
self.http_server = http.ThreadingHTTPServer(
self._config.uvicorn_config, app, daemon=False, logger_name="Logprep HTTPServer"
self._config.uvicorn_config, app, daemon=False, logger_name="HTTPServer"
)
self.http_server.start()

Expand Down
1 change: 0 additions & 1 deletion logprep/connector/opensearch/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ class Config(ElasticsearchOutput.Config):

@cached_property
def _search_context(self):
logging.getLogger("opensearch").setLevel(self._config.loglevel)
return search.OpenSearch(
self._config.hosts,
scheme=self.schema,
Expand Down
2 changes: 1 addition & 1 deletion logprep/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
class Factory:
"""Create components for logprep."""

_logger: "Logger" = logging.getLogger(__name__)
_logger: "Logger" = logging.getLogger("Factory")

@classmethod
def create(cls, configuration: dict, logger: "Logger") -> "Component":
Expand Down
16 changes: 4 additions & 12 deletions logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,6 @@ class Metrics(Component.Metrics):
_logprep_config: Configuration
""" the logprep configuration dict """

_log_queue: multiprocessing.Queue
""" the handler for the logs """

_continue_iterating: Value
""" a flag to signal if iterating continues """

Expand Down Expand Up @@ -160,15 +157,10 @@ def _input(self) -> Input:
return Factory.create(input_connector_config, self.logger)

def __init__(
self,
config: Configuration,
pipeline_index: int = None,
log_queue: multiprocessing.Queue = None,
lock: Lock = None,
self, config: Configuration, pipeline_index: int = None, lock: Lock = None
) -> None:
self._log_queue = log_queue
self.logger = logging.getLogger(f"Logprep Pipeline {pipeline_index}")
self.logger.addHandler(logging.handlers.QueueHandler(log_queue))
self.logger = logging.getLogger("Pipeline")
self.logger.name = f"Pipeline{pipeline_index}"
self._logprep_config = config
self._timeout = config.timeout
self._continue_iterating = Value(c_bool)
Expand Down Expand Up @@ -207,7 +199,7 @@ def _create_processor(self, entry: dict) -> "Processor":
self.logger.debug(f"Created '{processor}' processor")
return processor

def run(self) -> None:
def run(self) -> None: # pylint: disable=method-hidden
"""Start processing processors in the Pipeline."""
with self._continue_iterating.get_lock():
self._continue_iterating.value = True
Expand Down
42 changes: 2 additions & 40 deletions logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,6 @@
from logprep.util.configuration import Configuration


def logger_process(queue: multiprocessing.queues.Queue, logger: logging.Logger):
"""Process log messages from a queue."""

while True:
message = queue.get()
if message is None:
break
logger.handle(message)


class PipelineManager:
"""Manage pipelines via multi-processing."""

Expand Down Expand Up @@ -60,9 +50,8 @@ class Metrics(Component.Metrics):

def __init__(self, configuration: Configuration):
self.metrics = self.Metrics(labels={"component": "manager"})
self._logger = logging.getLogger("Logprep PipelineManager")
self._logger = logging.getLogger("Manager")
if multiprocessing.current_process().name == "MainProcess":
self._start_multiprocess_logger()
self._set_http_input_queue(configuration)
self._pipelines: list[multiprocessing.Process] = []
self._configuration = configuration
Expand All @@ -86,25 +75,6 @@ def _set_http_input_queue(self, configuration):
message_backlog_size = input_config.get("message_backlog_size", 15000)
HttpConnector.messages = multiprocessing.Queue(maxsize=message_backlog_size)

def _start_multiprocess_logger(self):
self.log_queue = multiprocessing.Queue(-1)
self._log_process = multiprocessing.Process(
target=logger_process, args=(self.log_queue, self._logger), daemon=True
)
self._log_process.start()

def get_count(self) -> int:
"""Get the pipeline count.
Parameters
----------
count : int
The pipeline count will be incrementally changed until it reaches this value.
"""
self._logger.debug(f"Getting pipeline count: {len(self._pipelines)}")
return len(self._pipelines)

def set_count(self, count: int):
"""Set the pipeline count.
Expand Down Expand Up @@ -161,9 +131,6 @@ def stop(self):
self._decrease_to_count(0)
if self.prometheus_exporter:
self.prometheus_exporter.cleanup_prometheus_multiprocess_dir()
self.log_queue.put(None) # signal the logger process to stop
self._log_process.join()
self.log_queue.close()

def restart(self):
"""Restarts all pipelines"""
Expand All @@ -175,12 +142,7 @@ def restart(self):
self.prometheus_exporter.run()

def _create_pipeline(self, index) -> multiprocessing.Process:
pipeline = Pipeline(
pipeline_index=index,
config=self._configuration,
log_queue=self.log_queue,
lock=self._lock,
)
pipeline = Pipeline(pipeline_index=index, config=self._configuration, lock=self._lock)
self._logger.info("Created new pipeline")
process = multiprocessing.Process(target=pipeline.run, daemon=True)
process.stop = pipeline.stop
Expand Down
2 changes: 1 addition & 1 deletion logprep/metrics/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class PrometheusExporter:

def __init__(self, configuration: MetricsConfig):
self.is_running = False
logger_name = "Prometheus Exporter"
logger_name = "Exporter"
self._logger = getLogger(logger_name)
self._logger.debug("Initializing Prometheus Exporter")
self.configuration = configuration
Expand Down
39 changes: 13 additions & 26 deletions logprep/run_logprep.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# pylint: disable=logging-fstring-interpolation
"""This module can be used to start the logprep."""
import logging
import logging.config
import os
import signal
import sys
Expand All @@ -12,21 +13,17 @@
from logprep.generator.http.controller import Controller
from logprep.generator.kafka.run_load_tester import LoadTester
from logprep.runner import Runner
from logprep.util import defaults
from logprep.util.auto_rule_tester.auto_rule_corpus_tester import RuleCorpusTester
from logprep.util.auto_rule_tester.auto_rule_tester import AutoRuleTester
from logprep.util.configuration import Configuration, InvalidConfigurationError
from logprep.util.defaults import DEFAULT_LOG_CONFIG
from logprep.util.helper import get_versions_string, print_fcolor
from logprep.util.rule_dry_runner import DryRunner

warnings.simplefilter("always", DeprecationWarning)
logging.captureWarnings(True)

logging.getLogger("filelock").setLevel(logging.ERROR)
logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR)
logging.getLogger("elasticsearch").setLevel(logging.ERROR)


logging.config.dictConfig(DEFAULT_LOG_CONFIG)
logger = logging.getLogger("logprep")
EPILOG_STR = "Check out our docs at https://logprep.readthedocs.io/en/latest/"


Expand All @@ -35,17 +32,13 @@ def _print_version(config: "Configuration") -> None:
sys.exit(0)


def _get_logger(logger_config: dict) -> logging.Logger:
log_level = logger_config.get("level", "INFO")
logging.basicConfig(level=log_level, format=defaults.DEFAULT_LOG_FORMAT)
logger = logging.getLogger("Logprep")
logger.setLevel(log_level)
return logger


def _get_configuration(config_paths: list[str]) -> Configuration:
try:
return Configuration.from_sources(config_paths)
config = Configuration.from_sources(config_paths)
config.logger.setup_logging()
logger = logging.getLogger("root") # pylint: disable=redefined-outer-name
logger.info(f"Log level set to '{logging.getLevelName(logger.level)}'")
return config
except InvalidConfigurationError as error:
print(f"InvalidConfigurationError: {error}", file=sys.stderr)
sys.exit(1)
Expand Down Expand Up @@ -80,8 +73,6 @@ def run(configs: tuple[str], version=None) -> None:
configuration = _get_configuration(configs)
if version:
_print_version(configuration)
logger = _get_logger(configuration.logger)
logger.info(f"Log level set to '{logging.getLevelName(logger.level)}'")
for version in get_versions_string(configuration).split("\n"):
logger.info(version)
logger.debug(f"Metric export enabled: {configuration.metrics.enabled}")
Expand Down Expand Up @@ -150,7 +141,7 @@ def dry_run(configs: tuple[str], events: str, input_type: str, full_output: bool
"""
config = _get_configuration(configs)
json_input = input_type == "json"
dry_runner = DryRunner(events, config, full_output, json_input, logging.getLogger("DryRunner"))
dry_runner = DryRunner(events, config, full_output, json_input)
dry_runner.run()


Expand Down Expand Up @@ -270,7 +261,7 @@ def generate_kafka(config, file):
@click.option(
"--loglevel",
help="Sets the log level for the logger.",
type=click.Choice(logging._levelToName.values()),
type=click.Choice(logging._levelToName.values()), # pylint: disable=protected-access
required=False,
default="INFO",
)
Expand All @@ -286,12 +277,8 @@ def generate_http(**kwargs):
Generates events based on templated sample files stored inside a dataset directory.
The events will be sent to a http endpoint.
"""
log_level = kwargs.get("loglevel")
logging.basicConfig(
level=log_level, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("generator")
logger.info(f"Log level set to '{log_level}'")
generator_logger = logging.getLogger("Generator")
generator_logger.info(f"Log level set to '{logging.getLevelName(generator_logger.level)}'")
generator = Controller(**kwargs)
generator.run()

Expand Down
12 changes: 7 additions & 5 deletions logprep/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def get_runner(configuration: Configuration) -> "Runner":
def __init__(self, configuration: Configuration) -> None:
self._configuration = configuration
self.metrics = self.Metrics(labels={"logprep": "unset", "config": "unset"})
self._logger = logging.getLogger("Logprep Runner")
self._logger = logging.getLogger("Runner")

self._manager = PipelineManager(configuration)
self.scheduler = Scheduler()
Expand All @@ -147,15 +147,17 @@ def start(self):
self._manager.restart()
self._logger.info("Startup complete")
self._logger.debug("Runner iterating")
self._iterate()
self._logger.info("Shutting down")
self._manager.stop()
self._logger.info("Shutdown complete")

def _iterate(self):
for _ in self._keep_iterating():
if self._exit_received:
break
self.scheduler.run_pending()
self._manager.restart_failed_pipeline()
self._logger.info("Shutting down")
self._logger.info("Initiated shutdown")
self._manager.stop()
self._logger.info("Shutdown complete")

def reload_configuration(self):
"""Reloads the configuration"""
Expand Down
Loading

0 comments on commit 4508d24

Please sign in to comment.