Skip to content

Commit

Permalink
make configuration immutable (#661)
Browse files Browse the repository at this point in the history
* update changelog

* make config object frozen
  • Loading branch information
ekneg54 authored Sep 10, 2024
1 parent 4cbb01d commit feb5ca0
Show file tree
Hide file tree
Showing 24 changed files with 182 additions and 90 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
### Improvements
### Bugfix

* ensure `logprep.abc.Component.Config` is immutable and can be applied multiple times

## 13.1.2
### Bugfix

Expand Down
15 changes: 10 additions & 5 deletions logprep/abc/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
class Component(ABC):
"""Abstract Component Class to define the Interface"""

@define(kw_only=True, slots=False)
@define(kw_only=True, slots=False, frozen=True)
class Config:
"""Common Configurations"""
"""Common Configurations
This class is used to define the configuration of the component.
It is frozen because the configuration should not be changed after initialization.
"""

type: str = field(validator=validators.instance_of(str))
"""Type of the component"""
Expand All @@ -39,12 +42,14 @@ def __attrs_post_init__(self):
attribute.init_tracker()

# __dict__ is added to support functools.cached_property
__slots__ = ["name", "_logger", "_config", "__dict__"]
__slots__ = ["name", "_config", "__dict__"]

# instance attributes
name: str
_scheduler = Scheduler()

_config: Config

# class attributes
_scheduler = Scheduler()
_decoder: msgspec.json.Decoder = msgspec.json.Decoder()
_encoder: msgspec.json.Encoder = msgspec.json.Encoder()

Expand Down
24 changes: 17 additions & 7 deletions logprep/connector/confluent_kafka/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,14 +243,14 @@ def __init__(self, name: str, configuration: "Connector.Config") -> None:
super().__init__(name, configuration)
self._last_valid_records = {}

@cached_property
def _consumer(self) -> Consumer:
"""configures and returns the consumer
@property
def _kafka_config(self) -> dict:
"""Get the kafka configuration.
Returns
-------
Consumer
confluent_kafka consumer object
dict
The kafka configuration.
"""
injected_config = {
"logger": logger,
Expand All @@ -259,8 +259,18 @@ def _consumer(self) -> Consumer:
"error_cb": self._error_callback,
}
DEFAULTS.update({"client.id": getfqdn()})
self._config.kafka_config = DEFAULTS | self._config.kafka_config
consumer = Consumer(self._config.kafka_config | injected_config)
return DEFAULTS | self._config.kafka_config | injected_config

@cached_property
def _consumer(self) -> Consumer:
"""configures and returns the consumer
Returns
-------
Consumer
confluent_kafka consumer object
"""
consumer = Consumer(self._kafka_config)
consumer.subscribe(
[self._config.topic],
on_assign=self._assign_callback,
Expand Down
18 changes: 14 additions & 4 deletions logprep/connector/confluent_kafka/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,16 +175,26 @@ class Config(Output.Config):
"""

@cached_property
def _producer(self):
@property
def _kafka_config(self) -> dict:
"""Get the kafka configuration.
Returns
-------
dict
The kafka configuration.
"""
injected_config = {
"logger": logger,
"stats_cb": self._stats_callback,
"error_cb": self._error_callback,
}
DEFAULTS.update({"client.id": getfqdn()})
self._config.kafka_config = DEFAULTS | self._config.kafka_config
return Producer(self._config.kafka_config | injected_config)
return DEFAULTS | self._config.kafka_config | injected_config

@cached_property
def _producer(self):
return Producer(self._kafka_config)

def _error_callback(self, error: KafkaException):
"""Callback for generic/global error events, these errors are typically
Expand Down
4 changes: 0 additions & 4 deletions logprep/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,8 @@ def create(cls, configuration: dict) -> "Component":
)
if not isinstance(component_configuration_dict, dict):
raise InvalidConfigSpecificationError(component_name)
metric_labels = {}
if "metric_labels" in configuration[component_name]:
metric_labels = configuration[component_name].pop("metric_labels")
component = Configuration.get_class(component_name, component_configuration_dict)
component_configuration = Configuration.create(
component_name, component_configuration_dict
)
component_configuration.metric_labels = copy.deepcopy(metric_labels)
return component(component_name, component_configuration)
11 changes: 5 additions & 6 deletions logprep/processor/amides/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,17 +215,16 @@ def setup(self):
)

def _load_and_unpack_models(self):
if not Path(self._config.models_path).exists():
models_path = self._config.models_path
if not Path(models_path).exists():
logger.debug("Getting AMIDES models archive...")
models_archive = Path(f"{current_process().name}-{self.name}.zip")
models_archive.touch()
models_archive.write_bytes(
GetterFactory.from_string(str(self._config.models_path)).get_raw()
)
models_archive.write_bytes(GetterFactory.from_string(models_path).get_raw())
logger.debug("Finished getting AMIDES models archive...")
self._config.models_path = str(models_archive.absolute())
models_path = str(models_archive.absolute())

with ZipFile(self._config.models_path, mode="r") as zip_file:
with ZipFile(models_path, mode="r") as zip_file:
with zip_file.open("model", mode="r") as models_file:
models = joblib.load(models_file)

Expand Down
3 changes: 1 addition & 2 deletions logprep/processor/domain_label_extractor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
from logprep.processor.domain_label_extractor.rule import DomainLabelExtractorRule
from logprep.processor.field_manager.processor import FieldManager
from logprep.util.getter import GetterFactory
from logprep.util.helper import add_field_to, get_dotted_field_value, add_and_overwrite
from logprep.util.helper import add_and_overwrite, add_field_to, get_dotted_field_value
from logprep.util.validators import list_of_urls_validator

logger = logging.getLogger("DomainLabelExtractor")
Expand Down Expand Up @@ -101,7 +101,6 @@ def setup(self):
list_path.touch()
list_path.write_bytes(GetterFactory.from_string(tld_list).get_raw())
downloaded_tld_lists_paths.append(f"file://{str(list_path.absolute())}")
self._config.tld_lists = downloaded_tld_lists_paths
logger.debug("finished tldlists download...")

def _apply_rules(self, event, rule: DomainLabelExtractorRule):
Expand Down
1 change: 0 additions & 1 deletion logprep/processor/domain_resolver/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ def setup(self):
list_path.touch()
list_path.write_bytes(GetterFactory.from_string(tld_list).get_raw())
downloaded_tld_lists_paths.append(f"file://{str(list_path.absolute())}")
self._config.tld_lists = downloaded_tld_lists_paths
logger.debug("finished tldlists download...")

def _apply_rules(self, event, rule):
Expand Down
11 changes: 6 additions & 5 deletions logprep/processor/geoip_enricher/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@ class Config(FieldManager.Config):

@cached_property
def _city_db(self):
return database.Reader(self._config.db_path)

def setup(self):
super().setup()
db_path = Path(self._config.db_path)
if not db_path.exists():
logger.debug("start geoip database download...")
Expand All @@ -84,8 +80,13 @@ def setup(self):
db_path_file.write_bytes(
GetterFactory.from_string(str(self._config.db_path)).get_raw()
)
db_path = str(db_path_file.absolute())
logger.debug("finished geoip database download.")
self._config.db_path = str(db_path_file.absolute())
return database.Reader(db_path)

def setup(self):
super().setup()
_ = self._city_db # trigger download

def _try_getting_geoip_data(self, ip_string):
try:
Expand Down
6 changes: 3 additions & 3 deletions tests/acceptance/test_amides.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
logger = getLogger("Logprep-Test")


@pytest.fixture
@pytest.fixture(name="configuration")
def config():
config_dict = {
"process_count": 1,
Expand Down Expand Up @@ -51,9 +51,9 @@ def config():
return Configuration(**config_dict)


def test_amides(tmp_path: Path, config: Configuration):
def test_amides(tmp_path: Path, configuration: Configuration):
config_path = tmp_path / "generated_config.yml"
config_path.write_text(config.as_yaml())
config_path.write_text(configuration.as_yaml())

test_output = get_test_output(str(config_path))
test_output_documents = [event for event in test_output[0] if event.get("amides")]
Expand Down
6 changes: 6 additions & 0 deletions tests/unit/component/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
from typing import Callable, Iterable
from unittest import mock

import pytest
from attrs import asdict
from attrs.exceptions import FrozenInstanceError
from prometheus_client import Counter, Gauge, Histogram

from logprep.abc.component import Component
Expand Down Expand Up @@ -122,3 +124,7 @@ def test_all_metric_attributes_are_tested(self):
def test_setup_populates_cached_properties(self, mock_getmembers):
self.object.setup()
mock_getmembers.assert_called_with(self.object)

def test_config_is_immutable(self):
with pytest.raises(FrozenInstanceError):
self.object._config.type = "new_type"
2 changes: 1 addition & 1 deletion tests/unit/connector/test_confluent_kafka_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class CommonConfluentKafkaTestCase:

def test_client_id_is_set_to_hostname(self):
self.object.setup()
assert self.object._config.kafka_config.get("client.id") == getfqdn()
assert self.object._kafka_config.get("client.id") == getfqdn()

def test_create_fails_for_unknown_option(self):
kafka_config = deepcopy(self.CONFIG)
Expand Down
8 changes: 5 additions & 3 deletions tests/unit/connector/test_confluent_kafka_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,14 +229,16 @@ def test_get_event_raises_exception_if_input_not_utf(self, _):
self.object._get_event(0.001)

def test_setup_raises_fatal_input_error_on_invalid_config(self):
config = {
kafka_config = {
"bootstrap.servers": "testinstance:9092",
"group.id": "sapsal",
"myconfig": "the config",
}
self.object._config.kafka_config = config
config = deepcopy(self.CONFIG)
config["kafka_config"] = kafka_config
connector = Factory.create({"test": config})
with pytest.raises(FatalInputError, match="No such configuration property"):
self.object.setup()
connector.setup()

def test_get_next_raises_critical_input_parsing_error(self):
return_value = b'{"invalid": "json'
Expand Down
8 changes: 5 additions & 3 deletions tests/unit/connector/test_confluent_kafka_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,12 @@ def test_store_calls_batch_finished_callback(self, _): # pylint: disable=argume
self.object.input_connector.batch_finished_callback.assert_called()

def test_setup_raises_fatal_output_error_on_invalid_config(self):
config = {"myconfig": "the config", "bootstrap.servers": "testserver:9092"}
self.object._config.kafka_config = config
kafka_config = {"myconfig": "the config", "bootstrap.servers": "testserver:9092"}
config = deepcopy(self.CONFIG)
config.update({"kafka_config": kafka_config})
connector = Factory.create({"test connector": config})
with pytest.raises(FatalOutputError, match="No such configuration property"):
self.object.setup()
connector.setup()

def test_raises_value_error_if_mandatory_parameters_not_set(self):
config = deepcopy(self.CONFIG)
Expand Down
14 changes: 10 additions & 4 deletions tests/unit/connector/test_dummy_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@ def test_fails_with_disconnected_error_if_input_was_empty(self):
self.object.get_next(self.timeout)

def test_returns_documents_in_order_provided(self):
self.object._config.documents = [{"order": 0}, {"order": 1}, {"order": 2}]
config = copy.deepcopy(self.CONFIG)
config["documents"] = [{"order": 0}, {"order": 1}, {"order": 2}]
self.object = Factory.create({"Test Instance Name": config})
for order in range(0, 3):
event, _ = self.object.get_next(self.timeout)
assert event.get("order") == order

def test_raises_exceptions_instead_of_returning_them_in_document(self):
self.object._config.documents = [{"order": 0}, DummyError, {"order": 1}]
config = copy.deepcopy(self.CONFIG)
config["documents"] = [{"order": 0}, DummyError, {"order": 1}]
self.object = Factory.create({"Test Instance Name": config})
event, _ = self.object.get_next(self.timeout)
assert event.get("order") == 0
with raises(DummyError):
Expand All @@ -39,15 +43,17 @@ def test_raises_exceptions_instead_of_returning_them_in_document(self):
assert event.get("order") == 1

def test_raises_exceptions_instead_of_returning_them(self):
self.object._config.documents = [BaseException]
config = copy.deepcopy(self.CONFIG)
config["documents"] = [BaseException]
self.object = Factory.create({"Test Instance Name": config})
with raises(BaseException):
self.object.get_next(self.timeout)

def test_repeat_documents_repeats_documents(self):
config = copy.deepcopy(self.CONFIG)
config["repeat_documents"] = True
config["documents"] = [{"order": 0}, {"order": 1}, {"order": 2}]
connector = Factory.create(configuration={"Test Instance Name": config})
connector._config.documents = [{"order": 0}, {"order": 1}, {"order": 2}]

for order in range(0, 9):
event, _ = connector.get_next(self.timeout)
Expand Down
8 changes: 6 additions & 2 deletions tests/unit/connector/test_jsonl_output.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
# pylint: disable=missing-docstring
# pylint: disable=attribute-defined-outside-init
# pylint: disable=protected-access
import copy
import tempfile
from unittest import mock

from logprep.connector.jsonl.output import JsonlOutput
from logprep.factory import Factory
from tests.unit.connector.base import BaseOutputTestCase


Expand Down Expand Up @@ -92,8 +94,10 @@ def test_write_json_writes_to_file(self, mock_open):

@mock.patch("builtins.open")
def test_setup_creates_single_file_if_only_output_file(self, mock_open):
self.object._config.output_file_custom = ""
self.object._config.output_file_error = ""
config = copy.deepcopy(self.CONFIG)
config["output_file_custom"] = ""
config["output_file_error"] = ""
self.object = Factory.create({"Test Instance Name": config})
self.object.setup()
mock_open.assert_called()
assert mock_open.call_count == 1
Expand Down
Loading

0 comments on commit feb5ca0

Please sign in to comment.