Skip to content

Commit

Permalink
add shut_down method to opensearch output connector (#724)
Browse files Browse the repository at this point in the history
* add shut_down method to opensearch output connector

closes #537
  • Loading branch information
ekneg54 authored Dec 9, 2024
1 parent 239cb21 commit 55bf4c1
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ the list is now fixed inside the packaged logprep
* fix incorrect timezones for log arrival time and delta time in input preprocessing
* fix `_get_value` in `FilterExpression` so that keys don't match on values
* fix `auto_rule_tester` to work with `LOGPREP_BYPASS_RULE_TREE` enabled
* fix `opensearch_output` not draining `message_backlog` on shutdown

## 14.0.0
### Breaking
Expand Down
4 changes: 4 additions & 0 deletions logprep/connector/opensearch/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,3 +313,7 @@ def health(self) -> bool:
self.metrics.number_of_errors += 1
return False
return super().health() and resp.get("status") in self._config.desired_cluster_status

def shut_down(self):
self._write_backlog()
return super().shut_down()
24 changes: 7 additions & 17 deletions tests/unit/connector/test_opensearch_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,18 @@
# pylint: disable=attribute-defined-outside-init
# pylint: disable=too-many-arguments
import copy
import json
import os
import re
import time
import uuid
from unittest import mock

import opensearchpy as search
import pytest
from opensearchpy import OpenSearchException as SearchException
from opensearchpy import helpers

from logprep.abc.component import Component
from logprep.abc.output import CriticalOutputError, FatalOutputError
from logprep.connector.opensearch.output import OpensearchOutput
from logprep.abc.output import CriticalOutputError
from logprep.factory import Factory
from logprep.util.time import TimeParser
from tests.unit.connector.base import BaseOutputTestCase


class NotJsonSerializableMock:
pass


in_ci = os.environ.get("GITHUB_ACTIONS") == "true"

helpers.parallel_bulk = mock.MagicMock()
helpers.bulk = mock.MagicMock()


class TestOpenSearchOutput(BaseOutputTestCase):
Expand Down Expand Up @@ -173,3 +157,9 @@ def test_write_backlog_creates_failed_event(self):
self.object._write_backlog()
assert error.value.message == "failed to index"
assert error.value.raw_input == [{"errors": error_message, "event": event}]

def test_shut_down_clears_message_backlog(self):
self.object._message_backlog = [{"some": "event"}]
with mock.patch("logprep.connector.opensearch.output.OpensearchOutput._bulk"):
self.object.shut_down()
assert len(self.object._message_backlog) == 0, "Message backlog should be cleared"

0 comments on commit 55bf4c1

Please sign in to comment.