Skip to content

Commit

Permalink
Align processor architecture (#583)
Browse files Browse the repository at this point in the history
* Align processor architecture
* Update changelog
* Clear extra_data in process instead of creating new empty list

---------

Co-authored-by: ekneg54 <ekneg54@pm.me>
  • Loading branch information
ppcad and ekneg54 authored May 23, 2024
1 parent 8329dbf commit 3c22b7c
Show file tree
Hide file tree
Showing 93 changed files with 1,012 additions and 740 deletions.
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,29 @@

### Breaking

* `pseudonymizer` change rule config field `pseudonyms` to `mapping`
* `clusterer` change rule config field `target` to `source_fields`
* `generic_resolver` change rule config field `append_to_list` to `extend_target_list`
* `hyperscan_resolver` change rule config field `append_to_list` to `extend_target_list`
* `calculator` now adds the error tag `_calculator_missing_field_warning` to the events tag field instead of `_calculator_failure` in case of missing field in events
* `domain_label_extractor` now writes `_domain_label_extractor_missing_field_warning` tag to event tags in case of missing fields
* `geoip_enricher` now writes `_geoip_enricher_missing_field_warning` tag to event tags in case of missing fields
* `grokker` now writes `_grokker_missing_field_warning` tag to event tags instead of `_grokker_failure` in case of missing fields
* `requester` now writes `_requester_missing_field_warning` tag to event tags instead of `_requester_failure` in case of missing fields
* `timestamp_differ` now writes `_timestamp_differ_missing_field_warning` tag to event tags instead of `_timestamp_differ_failure` in case of missing fields
* `timestamper` now writes `_timestamper_missing_field_warning` tag to event tags instead of `_timestamper_failure` in case of missing fields

### Features

### Improvements

* remove logger from Components and Factory signatures
* align processor architecture to use methods like `write_to_target`, `add_field_to` and `get_dotted_field_value` when reading and writing from and to events
* required substantial refactoring of the `hyperscan_resolver`, `generic_resolver` and `template_replacer`
* change `pseudonymizer`, `pre_detector`, `selective_extractor` processors and `pipeline` to handle `extra_data` the same way
* refactor `clusterer`, `pre_detector` and `pseudonymizer` processors and change `rule_tree` so that the processor do not require `process` override
* required substantial refactoring of the `clusterer`
* handle missing fields in processors via `_handle_missing_fields` from the field_manager
* add `LogprepMPQueueListener` to outsource logging to a separate process
* add a single `Queuehandler` to root logger to ensure all logs were handled by `LogprepMPQueueListener`

Expand Down
7 changes: 6 additions & 1 deletion logprep/abc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
from abc import abstractmethod
from pathlib import Path
from typing import TYPE_CHECKING, List, Optional
from typing import TYPE_CHECKING, List, Optional, Tuple

from attr import define, field, validators

Expand Down Expand Up @@ -76,13 +76,15 @@ class Config(Component.Config):
"_event",
"_specific_tree",
"_generic_tree",
"_extra_data",
]

rule_class: "Rule"
has_custom_tests: bool
_event: dict
_specific_tree: RuleTree
_generic_tree: RuleTree
_extra_data: List[Tuple[dict, Tuple[dict]]]
_strategy = None

def __init__(self, name: str, configuration: "Processor.Config"):
Expand All @@ -102,6 +104,7 @@ def __init__(self, name: str, configuration: "Processor.Config"):
specific_rules_targets=self._config.specific_rules,
)
self.has_custom_tests = False
self._extra_data = []

@property
def _specific_rules(self):
Expand Down Expand Up @@ -153,9 +156,11 @@ def process(self, event: dict):
A dictionary representing a log event.
"""
self._extra_data.clear()
logger.debug(f"{self.describe()} processing event {event}")
self._process_rule_tree(event, self._specific_tree)
self._process_rule_tree(event, self._generic_tree)
return self._extra_data if self._extra_data else None

def _process_rule_tree(self, event: dict, tree: "RuleTree"):
applied_rules = set()
Expand Down
12 changes: 4 additions & 8 deletions logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,14 +279,10 @@ def process_event(self, event: dict):

def _store_extra_data(self, extra_data: List[tuple]) -> None:
self.logger.debug("Storing extra data")
if isinstance(extra_data, tuple):
documents, outputs = extra_data
for document in documents:
for output in outputs:
for output_name, topic in output.items():
self._output[output_name].store_custom(document, topic)
return
list(map(self._store_extra_data, extra_data))
for document, outputs in extra_data:
for output in outputs:
for output_name, target in output.items():
self._output[output_name].store_custom(document, target)

def _shut_down(self) -> None:
try:
Expand Down
4 changes: 2 additions & 2 deletions logprep/framework/rule_tree/rule_tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def _add_parsed_rule(self, parsed_rule: list):

return current_node

def get_rule_id(self, rule: "Rule") -> int:
def get_rule_id(self, rule: "Rule") -> Optional[int]:
"""Returns ID of given rule.
This function returns the ID of a given rule. It is used by the processors to get the ID of
Expand All @@ -185,7 +185,7 @@ def get_rule_id(self, rule: "Rule") -> int:
The rule's ID.
"""
return self._rule_mapping[rule]
return self._rule_mapping.get(rule)

def get_matching_rules(self, event: dict) -> List["Rule"]:
"""Get all rules in the tree that match given event.
Expand Down
7 changes: 4 additions & 3 deletions logprep/processor/amides/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,17 +99,18 @@
from logprep.processor.amides.detection import MisuseDetector, RuleAttributor
from logprep.processor.amides.normalize import CommandLineNormalizer
from logprep.processor.amides.rule import AmidesRule
from logprep.processor.field_manager.processor import FieldManager
from logprep.util.getter import GetterFactory
from logprep.util.helper import get_dotted_field_value

logger = logging.getLogger("Amides")


class Amides(Processor):
class Amides(FieldManager):
"""Proof-of-concept implementation of the Adaptive Misuse Detection System (AMIDES)."""

@define(kw_only=True)
class Config(Processor.Config):
class Config(FieldManager.Config):
"""Amides processor configuration class."""

max_cache_entries: int = field(default=1048576, validator=validators.instance_of(int))
Expand Down Expand Up @@ -232,7 +233,7 @@ def _load_and_unpack_models(self):

def _apply_rules(self, event: dict, rule: AmidesRule):
cmdline = get_dotted_field_value(event, rule.source_fields[0])
if not cmdline:
if self._handle_missing_fields(event, rule, rule.source_fields, [cmdline]):
return

self.metrics.total_cmdlines += 1
Expand Down
24 changes: 14 additions & 10 deletions logprep/processor/calculator/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,28 +30,32 @@

from pyparsing import ParseException

from logprep.abc.processor import Processor
from logprep.processor.calculator.fourFn import BNF
from logprep.processor.calculator.rule import CalculatorRule
from logprep.processor.field_manager.processor import FieldManager
from logprep.util.decorators import timeout
from logprep.util.helper import get_source_fields_dict


class Calculator(Processor):
class Calculator(FieldManager):
"""A Processor to calculate with and without field values"""

rule_class = CalculatorRule

def _apply_rules(self, event, rule):
source_field_dict = get_source_fields_dict(event, rule)
if not self._has_missing_values(event, rule, source_field_dict):
expression = self._template(rule.calc, source_field_dict)
try:
result = self._calculate(event, rule, expression)
if result is not None:
self._write_target_field(event, rule, result)
except TimeoutError as error:
self._handle_warning_error(event, rule, error)
if self._handle_missing_fields(event, rule, rule.source_fields, source_field_dict.values()):
return
if self._has_missing_values(event, rule, source_field_dict):
return

expression = self._template(rule.calc, source_field_dict)
try:
result = self._calculate(event, rule, expression)
if result is not None:
self._write_target_field(event, rule, result)
except TimeoutError as error:
self._handle_warning_error(event, rule, error)

@cached_property
def bnf(self) -> BNF:
Expand Down
107 changes: 70 additions & 37 deletions logprep/processor/clusterer/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,23 @@
.. automodule:: logprep.processor.clusterer.rule
"""

from logging import Logger
from typing import List
import math
from typing import Tuple

from attr import define, field, validators
from attrs import define, field, validators

from logprep.abc.processor import Processor
from logprep.processor.base.rule import Rule
from logprep.processor.clusterer.rule import ClustererRule
from logprep.processor.clusterer.signature_calculation.signature_phase import (
LogRecord,
SignatureEngine,
SignaturePhaseStreaming,
)
from logprep.processor.field_manager.processor import FieldManager
from logprep.util.helper import add_field_to, get_dotted_field_value


class Clusterer(Processor):
class Clusterer(FieldManager):
"""Cluster log events using a heuristic."""

@define(kw_only=True)
Expand All @@ -65,43 +66,41 @@ class Config(Processor.Config):
output_field_name: str = field(validator=validators.instance_of(str))
"""defines in which field results of the clustering should be stored."""

__slots__ = ["matching_rules", "sps", "_output_field_name"]

matching_rules: List[Rule]
__slots__ = ["sps"]

sps: SignaturePhaseStreaming

rule_class = ClustererRule

def __init__(self, name: str, configuration: Processor.Config):
super().__init__(name, configuration)
self.matching_rules = []
super().__init__(name=name, configuration=configuration)
self.sps = SignaturePhaseStreaming()
self.has_custom_tests = True

def process(self, event: dict):
self.matching_rules = []
super().process(event)
if self._is_clusterable(event):
self._cluster(event, self.matching_rules)
self._last_rule_id = math.inf
self._last_non_extracted_signature = None

def _apply_rules(self, event, rule):
self.matching_rules.append(rule)
source_field_values = self._get_field_values(event, rule.source_fields)
self._handle_missing_fields(event, rule, rule.source_fields, source_field_values)
if self._is_clusterable(event, rule.source_fields[0]):
self._cluster(event, rule)

def _is_clusterable(self, event: dict):
def _is_clusterable(self, event: dict, source_field: str) -> bool:
# The following blocks have not been extracted into functions for performance reasons
# A message can only be clustered if it exists, despite any other condition
if "message" not in event:
return False
if event["message"] is None:
# A message can only be clustered if it exists or if a clustering step exists
message = get_dotted_field_value(event, source_field)
if message is None and self._last_non_extracted_signature is None:
return False

# Return clusterable state if it exists, since it can be true or false
if "clusterable" in event:
return event["clusterable"]
clusterable = get_dotted_field_value(event, "clusterable")
if clusterable is not None:
return clusterable

# Alternatively, check for a clusterable tag
if "tags" in event and "clusterable" in event["tags"]:
tags = get_dotted_field_value(event, "tags")
if tags and "clusterable" in tags:
return True

# It is clusterable if a syslog with PRI exists even if no clusterable field exists
Expand All @@ -114,28 +113,62 @@ def _is_clusterable(self, event: dict):

@staticmethod
def _syslog_has_pri(event: dict):
return (
"syslog" in event
and "facility" in event["syslog"]
and "event" in event
and "severity" in event["event"]
syslog_value = get_dotted_field_value(event, "syslog")
event_value = get_dotted_field_value(event, "event")
return not (syslog_value is None or event_value is None)

def _cluster(self, event: dict, rule: ClustererRule):
raw_text, sig_text = self._get_text_to_cluster(rule, event)
if raw_text is None:
return

record = (
LogRecord(raw_text=raw_text, sig_text=sig_text)
if sig_text
else LogRecord(raw_text=raw_text)
)

def _cluster(self, event: dict, rules: List[ClustererRule]):
cluster_signature_based_on_message = self.sps.run(
LogRecord(raw_text=event["message"]), rules
)
cluster_signature_based_on_message, sig_text = self.sps.run(record, rule)
if self._syslog_has_pri(event):
cluster_signature = " , ".join(
[
str(event["syslog"]["facility"]),
str(event["event"]["severity"]),
str(get_dotted_field_value(event, "syslog.facility")),
str(get_dotted_field_value(event, "event.severity")),
cluster_signature_based_on_message,
]
)
else:
cluster_signature = cluster_signature_based_on_message
event[self._config.output_field_name] = cluster_signature
add_field_to(
event,
self._config.output_field_name,
cluster_signature,
extends_lists=rule.extend_target_list,
overwrite_output_field=rule.overwrite_target,
)
self._last_non_extracted_signature = sig_text

def _is_new_tree_iteration(self, rule: ClustererRule) -> bool:
for tree in (self._specific_tree, self._generic_tree):
rule_id = tree.get_rule_id(rule)
if rule_id is None:
continue
is_new_iteration = rule_id <= self._last_rule_id
self._last_rule_id = rule_id
return is_new_iteration
return True

def _get_text_to_cluster(self, rule: ClustererRule, event: dict) -> Tuple[str, str]:
sig_text = None
if self._is_new_tree_iteration(rule):
self._last_non_extracted_signature = None
else:
sig_text = self._last_non_extracted_signature
if sig_text is None:
raw_text = get_dotted_field_value(event, rule.source_fields[0])
else:
raw_text = sig_text
return raw_text, sig_text

def test_rules(self):
results = {}
Expand All @@ -144,7 +177,7 @@ def test_rules(self):
results[rule_repr] = []
try:
for test in rule.tests:
result = SignatureEngine.apply_signature_rule(rule, test["raw"])
result = SignatureEngine.apply_signature_rule(test["raw"], rule)
expected_result = test["result"]
results[rule_repr].append((result, expected_result))
except AttributeError:
Expand Down
21 changes: 15 additions & 6 deletions logprep/processor/clusterer/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,20 +105,29 @@

from attrs import define, field, validators

from logprep.processor.base.rule import Rule
from logprep.processor.field_manager.rule import FieldManagerRule


class ClustererRule(Rule):
class ClustererRule(FieldManagerRule):
"""Check if documents match a filter."""

@define(kw_only=True)
class Config(Rule.Config):
class Config(FieldManagerRule.Config):
"""RuleConfig for Clusterer"""

target: str = field(validator=validators.instance_of(str))
"""Defines which field should be used for clustering."""
source_fields: list = field(
validator=[
validators.instance_of(list),
validators.deep_iterable(member_validator=validators.instance_of(str)),
validators.max_len(1),
],
default=["message"],
)
"""The field from where to get the value which should be clustered."""
overwrite_target: bool = field(validator=validators.instance_of(bool), default=True)
"""Overwrite the target field value if exists. Defaults to :code:`True`"""
pattern: Pattern = field(validator=validators.instance_of(Pattern), converter=re.compile)
"""Defines the regex pattern that will be matched on the :code:`clusterer.target`."""
"""Defines the regex pattern that will be matched on the :code:`clusterer.source_fields`."""
repl: str = field(validator=validators.instance_of(str))
"""Anything within a capture group in :code:`clusterer.pattern` will be substituted with
values defined in :code:`clusterer.repl`.
Expand Down
Loading

0 comments on commit 3c22b7c

Please sign in to comment.