From 55bf4c1b977b69f419321ba59e2202d574d0e15a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Zimmermann?= <101292599+ekneg54@users.noreply.github.com> Date: Mon, 9 Dec 2024 15:55:01 +0100 Subject: [PATCH] add shut_down method to opensearch output connector (#724) * add shut_down method to opensearch output connector closes #537 --- CHANGELOG.md | 1 + logprep/connector/opensearch/output.py | 4 ++++ .../unit/connector/test_opensearch_output.py | 24 ++++++------------- 3 files changed, 12 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ac45040c8..7ae95f337 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ the list is now fixed inside the packaged logprep * fix incorrect timezones for log arrival time and delta time in input preprocessing * fix `_get_value` in `FilterExpression` so that keys don't match on values * fix `auto_rule_tester` to work with `LOGPREP_BYPASS_RULE_TREE` enabled +* fix `opensearch_output` not draining `message_backlog` on shutdown ## 14.0.0 ### Breaking diff --git a/logprep/connector/opensearch/output.py b/logprep/connector/opensearch/output.py index 4dcf9c2f8..0f4c67567 100644 --- a/logprep/connector/opensearch/output.py +++ b/logprep/connector/opensearch/output.py @@ -313,3 +313,7 @@ def health(self) -> bool: self.metrics.number_of_errors += 1 return False return super().health() and resp.get("status") in self._config.desired_cluster_status + + def shut_down(self): + self._write_backlog() + return super().shut_down() diff --git a/tests/unit/connector/test_opensearch_output.py b/tests/unit/connector/test_opensearch_output.py index ab9836042..f6a7ff118 100644 --- a/tests/unit/connector/test_opensearch_output.py +++ b/tests/unit/connector/test_opensearch_output.py @@ -5,34 +5,18 @@ # pylint: disable=attribute-defined-outside-init # pylint: disable=too-many-arguments import copy -import json -import os -import re -import time -import uuid from unittest import mock -import opensearchpy as search import pytest from opensearchpy import OpenSearchException as SearchException from opensearchpy import helpers from logprep.abc.component import Component -from logprep.abc.output import CriticalOutputError, FatalOutputError -from logprep.connector.opensearch.output import OpensearchOutput +from logprep.abc.output import CriticalOutputError from logprep.factory import Factory -from logprep.util.time import TimeParser from tests.unit.connector.base import BaseOutputTestCase - -class NotJsonSerializableMock: - pass - - -in_ci = os.environ.get("GITHUB_ACTIONS") == "true" - helpers.parallel_bulk = mock.MagicMock() -helpers.bulk = mock.MagicMock() class TestOpenSearchOutput(BaseOutputTestCase): @@ -173,3 +157,9 @@ def test_write_backlog_creates_failed_event(self): self.object._write_backlog() assert error.value.message == "failed to index" assert error.value.raw_input == [{"errors": error_message, "event": event}] + + def test_shut_down_clears_message_backlog(self): + self.object._message_backlog = [{"some": "event"}] + with mock.patch("logprep.connector.opensearch.output.OpensearchOutput._bulk"): + self.object.shut_down() + assert len(self.object._message_backlog) == 0, "Message backlog should be cleared"