Skip to content

Commit

Permalink
Add full typing support to the library. (#87)
Browse files Browse the repository at this point in the history
In detail:
 - MyPy and available type stubs added to dev dependencies.
 - Type annotations are used everywhere. Abstract types (Mapping etc.) are preferred. The library passes mypy --strict tests.
 - Minor logic changes and new asserts to support typing.
 - Introduction of abstract methods in DrainBase.
 - 'parameter_extraction_cache_capacity' is now correctly parsed as an int.
 - py.typed marker added (fixes #84).

Signed-off-by: Simon Dierl <simon.dierl@cs.tu-dortmund.de>
Co-authored-by: Superskyyy <Superskyyy@outlook.com>
  • Loading branch information
no-preserve-root and Superskyyy authored Apr 24, 2024
1 parent 76d12de commit c7496f6
Show file tree
Hide file tree
Showing 14 changed files with 509 additions and 150 deletions.
122 changes: 82 additions & 40 deletions drain3/drain.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
# This file implements the Drain algorithm for log parsing.
# Based on https://github.com/logpai/logparser/blob/master/logparser/Drain/Drain.py by LogPAI team

from typing import List, Dict, Sequence
from abc import ABC, abstractmethod
from typing import cast, Collection, IO, Iterable, MutableMapping, MutableSequence, Optional, Sequence, Tuple, \
TYPE_CHECKING, TypeVar, Union

from cachetools import LRUCache, Cache

Expand All @@ -12,28 +14,36 @@
class LogCluster:
__slots__ = ["log_template_tokens", "cluster_id", "size"]

def __init__(self, log_template_tokens: list, cluster_id: int):
def __init__(self, log_template_tokens: Iterable[str], cluster_id: int) -> None:
self.log_template_tokens = tuple(log_template_tokens)
self.cluster_id = cluster_id
self.size = 1

def get_template(self):
def get_template(self) -> str:
return ' '.join(self.log_template_tokens)

def __str__(self):
def __str__(self) -> str:
return f"ID={str(self.cluster_id).ljust(5)} : size={str(self.size).ljust(10)}: {self.get_template()}"


class LogClusterCache(LRUCache):
_T = TypeVar("_T")
if TYPE_CHECKING:
class _LRUCache(LRUCache[int, Optional[LogCluster]]):
# see https://github.com/python/mypy/issues/4148 for this hack
...
else:
_LRUCache = LRUCache

class LogClusterCache(_LRUCache):
"""
Least Recently Used (LRU) cache which allows callers to conditionally skip
cache eviction algorithm when accessing elements.
"""

def __missing__(self, key):
def __missing__(self, key: int) -> None:
return None

def get(self, key):
def get(self, key: int, _: Union[Optional[LogCluster], _T] = None) -> Optional[LogCluster]:
"""
Returns the value of the item with the specified key without updating
the cache eviction algorithm.
Expand All @@ -44,21 +54,21 @@ def get(self, key):
class Node:
__slots__ = ["key_to_child_node", "cluster_ids"]

def __init__(self):
self.key_to_child_node: Dict[str, Node] = {}
self.cluster_ids: List[int] = []
def __init__(self) -> None:
self.key_to_child_node: MutableMapping[str, Node] = {}
self.cluster_ids: Sequence[int] = []


class DrainBase:
class DrainBase(ABC):
def __init__(self,
depth=4,
sim_th=0.4,
max_children=100,
max_clusters=None,
extra_delimiters=(),
depth: int = 4,
sim_th: float = 0.4,
max_children: int = 100,
max_clusters: Optional[int] = None,
extra_delimiters: Sequence[str] = (),
profiler: Profiler = NullProfiler(),
param_str="<*>",
parametrize_numeric_tokens=True):
param_str: str = "<*>",
parametrize_numeric_tokens: bool = True) -> None:
"""
Create a new Drain instance.
Expand Down Expand Up @@ -91,19 +101,23 @@ def __init__(self,
self.param_str = param_str
self.parametrize_numeric_tokens = parametrize_numeric_tokens

# key: int, value: LogCluster
self.id_to_cluster = {} if max_clusters is None else LogClusterCache(maxsize=max_clusters)
self.id_to_cluster: MutableMapping[int, Optional[LogCluster]] = \
{} if max_clusters is None else LogClusterCache(maxsize=max_clusters)
self.clusters_counter = 0

@property
def clusters(self):
return self.id_to_cluster.values()
def clusters(self) -> Collection[LogCluster]:
return cast(Collection[LogCluster], self.id_to_cluster.values())

@staticmethod
def has_numbers(s):
def has_numbers(s: Iterable[str]) -> bool:
return any(char.isdigit() for char in s)

def fast_match(self, cluster_ids: Sequence, tokens: list, sim_th: float, include_params: bool):
def fast_match(self,
cluster_ids: Collection[int],
tokens: Sequence[str],
sim_th: float,
include_params: bool) -> Optional[LogCluster]:
"""
Find the best match for a log message (represented as tokens) versus a list of clusters
:param cluster_ids: List of clusters to match against (represented by their IDs)
Expand All @@ -114,7 +128,7 @@ def fast_match(self, cluster_ids: Sequence, tokens: list, sim_th: float, include
"""
match_cluster = None

max_sim = -1
max_sim: Union[int, float] = -1
max_param_count = -1
max_cluster = None

Expand All @@ -135,10 +149,10 @@ def fast_match(self, cluster_ids: Sequence, tokens: list, sim_th: float, include

return match_cluster

def print_tree(self, file=None, max_clusters=5):
def print_tree(self, file: Optional[IO[str]] = None, max_clusters: int = 5) -> None:
self.print_node("root", self.root_node, 0, file, max_clusters)

def print_node(self, token, node, depth, file, max_clusters):
def print_node(self, token: str, node: Node, depth: int, file: Optional[IO[str]], max_clusters: int) -> None:
out_str = '\t' * depth

if depth == 0:
Expand All @@ -164,14 +178,14 @@ def print_node(self, token, node, depth, file, max_clusters):
out_str = '\t' * (depth + 1) + str(cluster)
print(out_str, file=file)

def get_content_as_tokens(self, content):
def get_content_as_tokens(self, content: str) -> Sequence[str]:
content = content.strip()
for delimiter in self.extra_delimiters:
content = content.replace(delimiter, " ")
content_tokens = content.split()
return content_tokens

def add_log_message(self, content: str):
def add_log_message(self, content: str) -> Tuple[LogCluster, str]:
content_tokens = self.get_content_as_tokens(content)

if self.profiler:
Expand Down Expand Up @@ -211,19 +225,19 @@ def add_log_message(self, content: str):

return match_cluster, update_type

def get_total_cluster_size(self):
def get_total_cluster_size(self) -> int:
size = 0
for c in self.id_to_cluster.values():
size += c.size
size += cast(LogCluster, c).size
return size

def get_clusters_ids_for_seq_len(self, seq_fir):
def get_clusters_ids_for_seq_len(self, seq_fir: Union[int, str]) -> Collection[int]:
"""
seq_fir: int/str - the first token of the sequence
Return all clusters with the specified count of tokens
"""

def append_clusters_recursive(node: Node, id_list_to_fill: list):
def append_clusters_recursive(node: Node, id_list_to_fill: MutableSequence[int]) -> None:
id_list_to_fill.extend(node.cluster_ids)
for child_node in node.key_to_child_node.values():
append_clusters_recursive(child_node, id_list_to_fill)
Expand All @@ -234,14 +248,42 @@ def append_clusters_recursive(node: Node, id_list_to_fill: list):
if cur_node is None:
return []

target = []
target: MutableSequence[int] = []
append_clusters_recursive(cur_node, target)
return target

@abstractmethod
def tree_search(self,
root_node: Node,
tokens: Sequence[str],
sim_th: float,
include_params: bool) -> Optional[LogCluster]:
...

@abstractmethod
def add_seq_to_prefix_tree(self, root_node: Node, cluster: LogCluster) -> None:
...

@abstractmethod
def get_seq_distance(self, seq1: Sequence[str], seq2: Sequence[str], include_params: bool) -> Tuple[float, int]:
...

@abstractmethod
def create_template(self, seq1: Sequence[str], seq2: Sequence[str]) -> Sequence[str]:
...

@abstractmethod
def match(self, content: str, full_search_strategy: str = "never") -> Optional[LogCluster]:
...


class Drain(DrainBase):

def tree_search(self, root_node: Node, tokens: list, sim_th: float, include_params: bool):
def tree_search(self,
root_node: Node,
tokens: Sequence[str],
sim_th: float,
include_params: bool) -> Optional[LogCluster]:

# at first level, children are grouped by token (word) count
token_count = len(tokens)
Expand Down Expand Up @@ -279,7 +321,7 @@ def tree_search(self, root_node: Node, tokens: list, sim_th: float, include_para
cluster = self.fast_match(cur_node.cluster_ids, tokens, sim_th, include_params)
return cluster

def add_seq_to_prefix_tree(self, root_node, cluster: LogCluster):
def add_seq_to_prefix_tree(self, root_node: Node, cluster: LogCluster) -> None:
token_count = len(cluster.log_template_tokens)
token_count_str = str(token_count)
if token_count_str not in root_node.key_to_child_node:
Expand Down Expand Up @@ -346,7 +388,7 @@ def add_seq_to_prefix_tree(self, root_node, cluster: LogCluster):
current_depth += 1

# seq1 is a template, seq2 is the log to match
def get_seq_distance(self, seq1, seq2, include_params: bool):
def get_seq_distance(self, seq1: Sequence[str], seq2: Sequence[str], include_params: bool) -> Tuple[float, int]:
assert len(seq1) == len(seq2)

# sequences are empty - full match
Expand All @@ -370,7 +412,7 @@ def get_seq_distance(self, seq1, seq2, include_params: bool):

return ret_val, param_count

def create_template(self, seq1, seq2):
def create_template(self, seq1: Sequence[str], seq2: Sequence[str]) -> Sequence[str]:
assert len(seq1) == len(seq2)
ret_val = list(seq2)

Expand All @@ -380,7 +422,7 @@ def create_template(self, seq1, seq2):

return ret_val

def match(self, content: str, full_search_strategy="never"):
def match(self, content: str, full_search_strategy: str = "never") -> Optional[LogCluster]:
"""
Match log message against an already existing cluster.
Match shall be perfect (sim_th=1.0).
Expand Down Expand Up @@ -410,7 +452,7 @@ def match(self, content: str, full_search_strategy="never"):
# wildcard match). This will be both accurate and more efficient than the linear full search
# also fast match can be optimized when exact match is required by early
# quitting on less than exact cluster matches.
def full_search():
def full_search() -> Optional[LogCluster]:
all_ids = self.get_clusters_ids_for_seq_len(len(content_tokens))
cluster = self.fast_match(all_ids, content_tokens, required_sim_th, include_params=True)
return cluster
Expand Down
7 changes: 4 additions & 3 deletions drain3/file_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@

import os
import pathlib
from typing import Optional

from drain3.persistence_handler import PersistenceHandler


class FilePersistence(PersistenceHandler):
def __init__(self, file_path):
def __init__(self, file_path: str) -> None:
self.file_path = file_path

def save_state(self, state):
def save_state(self, state: bytes) -> None:
pathlib.Path(self.file_path).write_bytes(state)

def load_state(self):
def load_state(self) -> Optional[bytes]:
if not os.path.exists(self.file_path):
return None

Expand Down
20 changes: 13 additions & 7 deletions drain3/jaccard_drain.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# This file implements the Drain algorithm for log parsing.
# Based on https://github.com/logpai/logparser/blob/master/logparser/Drain/Drain.py by LogPAI team

from typing import Optional, Sequence, Tuple

from drain3.drain import DrainBase, LogCluster, Node


Expand All @@ -12,7 +14,11 @@ class JaccardDrain(DrainBase):
Drain that uses Jaccard similarity to match log messages.
"""

def tree_search(self, root_node: Node, tokens: list, sim_th: float, include_params: bool):
def tree_search(self,
root_node: Node,
tokens: Sequence[str],
sim_th: float,
include_params: bool) -> Optional[LogCluster]:
# at first level, children are grouped by token (The first word in tokens)
token_count = len(tokens)
# cur_node = root_node.key_to_child_node.get(str(token_count))
Expand Down Expand Up @@ -60,7 +66,7 @@ def tree_search(self, root_node: Node, tokens: list, sim_th: float, include_para

return cluster

def add_seq_to_prefix_tree(self, root_node, cluster: LogCluster):
def add_seq_to_prefix_tree(self, root_node: Node, cluster: LogCluster) -> None:
token_count = len(cluster.log_template_tokens)
# Determine if the string is empty
if not cluster.log_template_tokens:
Expand Down Expand Up @@ -141,7 +147,7 @@ def add_seq_to_prefix_tree(self, root_node, cluster: LogCluster):
current_depth += 1

# seq1 is a template, seq2 is the log to match
def get_seq_distance(self, seq1: tuple, seq2: list, include_params: bool):
def get_seq_distance(self, seq1: Sequence[str], seq2: Sequence[str], include_params: bool) -> Tuple[float, int]:
# Jaccard index, It is used to measure the similarity of two sets.
# The closer its value is to 1, the more common members the two sets have, and the higher the similarity.

Expand Down Expand Up @@ -174,7 +180,7 @@ def get_seq_distance(self, seq1: tuple, seq2: list, include_params: bool):
return ret_val, param_count

# seq1:tonkens->list seq2:template->tuple
def create_template(self, seq1: list, seq2: tuple):
def create_template(self, seq1: Sequence[str], seq2: Sequence[str]) -> Sequence[str]:

inter_set = set(seq1) & set(seq2)

Expand All @@ -188,22 +194,22 @@ def create_template(self, seq1: list, seq2: tuple):
# param_str is updated at the new position with different length
else:
# Take the template with long length
ret_val = seq1 if len(seq1) > len(seq2) else list(seq2)
ret_val = list(seq1) if len(seq1) > len(seq2) else list(seq2)
for i, token in enumerate(ret_val):
if token not in inter_set:
ret_val[i] = self.param_str

return ret_val

def match(self, content: str, full_search_strategy="never"):
def match(self, content: str, full_search_strategy: str = "never") -> Optional[LogCluster]:

assert full_search_strategy in ["always", "never", "fallback"]

# Because the template length and data are not equal in length, Jaccard distance required_sim_th != 1
required_sim_th = 0.8
content_tokens = self.get_content_as_tokens(content)

def full_search():
def full_search() -> Optional[LogCluster]:
all_ids = self.get_clusters_ids_for_seq_len(content_tokens[0])
cluster = self.fast_match(all_ids, content_tokens, required_sim_th, include_params=True)
return cluster
Expand Down
Loading

0 comments on commit c7496f6

Please sign in to comment.