Skip to content

Commit

Permalink
fix multiprocessing logging (#563)
Browse files Browse the repository at this point in the history
* add run logging in extra process
* update Changelog
  • Loading branch information
ekneg54 authored Apr 12, 2024
1 parent 810e933 commit 625bd56
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 97 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
### Bugfix

* fixes a bug where the pipeline index increases on every restart of a failed pipeline
* fixes closed log queue issue by run logging in an extra process

## 11.0.0
### Breaking
Expand Down
24 changes: 19 additions & 5 deletions logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import logging.handlers
import multiprocessing
import multiprocessing.queues

from attr import define, field

Expand All @@ -13,7 +14,18 @@
from logprep.metrics.exporter import PrometheusExporter
from logprep.metrics.metrics import CounterMetric
from logprep.util.configuration import Configuration
from logprep.util.logging import SingleThreadQueueListener


def logger_process(queue: multiprocessing.queues.Queue, logger: logging.Logger):
"""Process log messages from a queue."""
try:
while True:
message = queue.get()
if message is None:
break
logger.handle(message)
except KeyboardInterrupt:
pass


class PipelineManager:
Expand Down Expand Up @@ -51,9 +63,10 @@ def __init__(self, configuration: Configuration):
self.metrics = self.Metrics(labels={"component": "manager"})
self._logger = logging.getLogger("Logprep PipelineManager")
self.log_queue = multiprocessing.Queue(-1)
self._queue_listener = SingleThreadQueueListener(self.log_queue)
self._queue_listener.start()

self._log_process = multiprocessing.Process(
target=logger_process, args=(self.log_queue, self._logger), daemon=True
)
self._log_process.start()
self._pipelines: list[multiprocessing.Process] = []
self._configuration = configuration

Expand Down Expand Up @@ -132,7 +145,8 @@ def stop(self):
self._decrease_to_count(0)
if self.prometheus_exporter:
self.prometheus_exporter.cleanup_prometheus_multiprocess_dir()
self._queue_listener.stop()
self.log_queue.put(None) # signal the logger process to stop
self._log_process.join()
self.log_queue.close()

def restart(self):
Expand Down
84 changes: 0 additions & 84 deletions logprep/util/logging.py

This file was deleted.

10 changes: 2 additions & 8 deletions tests/unit/framework/test_pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,10 @@ def test_prometheus_exporter_is_instanciated_if_metrics_enabled(self):
manager = PipelineManager(config)
assert isinstance(manager.prometheus_exporter, PrometheusExporter)

def test_stop_stops_queue_listener(self):
with mock.patch.object(self.manager, "_queue_listener") as _queue_listener_mock:
self.manager.stop()
_queue_listener_mock.stop.assert_called()

def test_stop_closes_log_queue(self):
with mock.patch.object(self.manager, "log_queue") as log_queue_mock:
with mock.patch.object(self.manager, "_queue_listener"):
self.manager.stop()
log_queue_mock.close.assert_called()
self.manager.stop()
log_queue_mock.close.assert_called()

def test_set_count_increases_number_of_pipeline_starts_metric(self):
self.manager.metrics.number_of_pipeline_starts = 0
Expand Down

0 comments on commit 625bd56

Please sign in to comment.