-
Notifications
You must be signed in to change notification settings - Fork 71
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fixed inconsistent schema during message produce, from now on it is p…
…ossible to produce a message only if the schema sent with the record is registered to the topic.
- Loading branch information
Showing
22 changed files
with
487 additions
and
105 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,3 +19,4 @@ venv | |
.run | ||
.python-version | ||
.hypothesis/ | ||
.DS_Store |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
""" | ||
Copyright (c) 2023 Aiven Ltd | ||
See LICENSE for details | ||
""" | ||
|
||
from abc import ABC, abstractmethod | ||
from cachetools import TTLCache | ||
from karapace.schema_models import TypedSchema | ||
from karapace.typing import SchemaId, Subject | ||
from typing import Dict, Final, MutableMapping, Optional | ||
|
||
import hashlib | ||
|
||
|
||
class SchemaCacheProtocol(ABC): | ||
@abstractmethod | ||
def get_schema_id(self, schema: TypedSchema) -> Optional[SchemaId]: | ||
pass | ||
|
||
@abstractmethod | ||
def has_schema_id(self, schema_id: SchemaId) -> bool: | ||
pass | ||
|
||
@abstractmethod | ||
def set_schema(self, schema_id: SchemaId, schema: TypedSchema) -> None: | ||
pass | ||
|
||
@abstractmethod | ||
def get_schema(self, schema_id: SchemaId) -> Optional[TypedSchema]: | ||
pass | ||
|
||
@abstractmethod | ||
def get_schema_str(self, schema_id: SchemaId) -> Optional[str]: | ||
pass | ||
|
||
|
||
class TopicSchemaCache: | ||
def __init__(self) -> None: | ||
self._topic_cache: Dict[Subject, SchemaCache] = {} | ||
self._empty_schema_cache: Final = EmptySchemaCache() | ||
|
||
def get_schema_id(self, topic: Subject, schema: TypedSchema) -> Optional[SchemaId]: | ||
return self._topic_cache.get(topic, self._empty_schema_cache).get_schema_id(schema) | ||
|
||
def has_schema_id(self, topic: Subject, schema_id: SchemaId) -> bool: | ||
return self._topic_cache.get(topic, self._empty_schema_cache).has_schema_id(schema_id) | ||
|
||
def set_schema(self, topic: str, schema_id: SchemaId, schema: TypedSchema) -> None: | ||
schema_cache_with_defaults = self._topic_cache.setdefault(Subject(topic), SchemaCache()) | ||
schema_cache_with_defaults.set_schema(schema_id, schema) | ||
|
||
def get_schema(self, topic: Subject, schema_id: SchemaId) -> Optional[TypedSchema]: | ||
schema_cache = self._topic_cache.get(topic, self._empty_schema_cache) | ||
return schema_cache.get_schema(schema_id) | ||
|
||
def get_schema_str(self, topic: Subject, schema_id: SchemaId) -> Optional[str]: | ||
schema_cache = self._topic_cache.get(topic, self._empty_schema_cache) | ||
return schema_cache.get_schema_str(schema_id) | ||
|
||
|
||
class SchemaCache(SchemaCacheProtocol): | ||
def __init__(self) -> None: | ||
self._schema_hash_str_to_id: Dict[str, SchemaId] = {} | ||
self._id_to_schema_str: MutableMapping[SchemaId, TypedSchema] = TTLCache(maxsize=10000, ttl=600) | ||
|
||
def get_schema_id(self, schema: TypedSchema) -> Optional[SchemaId]: | ||
fingerprint = hashlib.sha1(str(schema).encode("utf8")).hexdigest() | ||
|
||
maybe_id = self._schema_hash_str_to_id.get(fingerprint) | ||
|
||
if maybe_id is not None and maybe_id not in self._id_to_schema_str: | ||
del self._schema_hash_str_to_id[fingerprint] | ||
return None | ||
|
||
return maybe_id | ||
|
||
def has_schema_id(self, schema_id: SchemaId) -> bool: | ||
return schema_id in self._id_to_schema_str | ||
|
||
def set_schema(self, schema_id: SchemaId, schema: TypedSchema) -> None: | ||
fingerprint = hashlib.sha1(str(schema).encode("utf8")).hexdigest() | ||
self._schema_hash_str_to_id[fingerprint] = schema_id | ||
self._id_to_schema_str[schema_id] = schema | ||
|
||
def get_schema(self, schema_id: SchemaId) -> Optional[TypedSchema]: | ||
return self._id_to_schema_str.get(schema_id) | ||
|
||
def get_schema_str(self, schema_id: SchemaId) -> Optional[str]: | ||
maybe_schema = self.get_schema(schema_id) | ||
return None if maybe_schema is None else str(maybe_schema) | ||
|
||
|
||
class EmptySchemaCache(SchemaCacheProtocol): | ||
def get_schema_id(self, schema: TypedSchema) -> None: | ||
return None | ||
|
||
def has_schema_id(self, schema_id: SchemaId) -> bool: | ||
return False | ||
|
||
def set_schema(self, schema_id: SchemaId, schema: TypedSchema) -> None: | ||
raise NotImplementedError("Empty schema cache. Cannot set schemas.") | ||
|
||
def get_schema(self, schema_id: SchemaId) -> None: | ||
return None | ||
|
||
def get_schema_str(self, schema_id: SchemaId) -> None: | ||
return None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.