Skip to content

Commit

Permalink
Merge pull request #388 from aiven/hacka-schema-reader-loop-improvements
Browse files Browse the repository at this point in the history
Schema reader loop improvements
  • Loading branch information
tvainika authored Apr 21, 2022
2 parents 27f10b9 + c996932 commit 3aca4d3
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 105 deletions.
4 changes: 2 additions & 2 deletions karapace/schema_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from karapace import constants
from karapace.anonymize_schemas import anonymize_avro
from karapace.config import Config, read_config
from karapace.schema_reader import KafkaSchemaReader
from karapace.schema_reader import new_schema_topic_from_config
from karapace.utils import json_encode, KarapaceKafkaClient, Timeout
from typing import Dict, List, Optional, Tuple

Expand Down Expand Up @@ -108,7 +108,7 @@ def _create_schema_topic_if_needed(self):
if time.monotonic() - start_time > wait_time:
raise Timeout(f"Timeout ({wait_time}) on creating admin client")

schema_topic = KafkaSchemaReader.get_new_schema_topic(self.config)
schema_topic = new_schema_topic_from_config(self.config)
try:
LOG.info("Creating schema topic: %r", schema_topic)
self.admin_client.create_topics([schema_topic], timeout_ms=constants.TOPIC_CREATION_TIMEOUT_MS)
Expand Down
213 changes: 110 additions & 103 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
Copyright (c) 2019 Aiven Ltd
See LICENSE for details
"""
from contextlib import closing, ExitStack
from kafka import KafkaConsumer
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import NoBrokersAvailable, NodeNotReadyError, TopicAlreadyExistsError
from kafka.errors import KafkaConfigurationError, NoBrokersAvailable, NodeNotReadyError, TopicAlreadyExistsError
from karapace import constants
from karapace.config import Config
from karapace.master_coordinator import MasterCoordinator
Expand All @@ -17,7 +18,6 @@
from typing import Any, Dict, Optional

import logging
import time
import ujson

Offset = int
Expand All @@ -32,6 +32,58 @@
OFFSET_EMPTY = -1
LOG = logging.getLogger(__name__)

KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS = 2.0
SCHEMA_TOPIC_CREATION_TIMEOUT_SECONDS = 5.0


def _create_consumer_from_config(config: Config) -> KafkaConsumer:
# Group not set on purpose, all consumers read the same data
session_timeout_ms = config["session_timeout_ms"]
request_timeout_ms = max(session_timeout_ms, KafkaConsumer.DEFAULT_CONFIG["request_timeout_ms"])
return KafkaConsumer(
config["topic_name"],
enable_auto_commit=False,
api_version=(1, 0, 0),
bootstrap_servers=config["bootstrap_uri"],
client_id=config["client_id"],
security_protocol=config["security_protocol"],
ssl_cafile=config["ssl_cafile"],
ssl_certfile=config["ssl_certfile"],
ssl_keyfile=config["ssl_keyfile"],
sasl_mechanism=config["sasl_mechanism"],
sasl_plain_username=config["sasl_plain_username"],
sasl_plain_password=config["sasl_plain_password"],
auto_offset_reset="earliest",
session_timeout_ms=session_timeout_ms,
request_timeout_ms=request_timeout_ms,
kafka_client=KarapaceKafkaClient,
metadata_max_age_ms=config["metadata_max_age_ms"],
)


def _create_admin_client_from_config(config: Config) -> KafkaAdminClient:
return KafkaAdminClient(
api_version_auto_timeout_ms=constants.API_VERSION_AUTO_TIMEOUT_MS,
bootstrap_servers=config["bootstrap_uri"],
client_id=config["client_id"],
security_protocol=config["security_protocol"],
ssl_cafile=config["ssl_cafile"],
ssl_certfile=config["ssl_certfile"],
ssl_keyfile=config["ssl_keyfile"],
sasl_mechanism=config["sasl_mechanism"],
sasl_plain_username=config["sasl_plain_username"],
sasl_plain_password=config["sasl_plain_password"],
)


def new_schema_topic_from_config(config: Config) -> NewTopic:
return NewTopic(
name=config["topic_name"],
num_partitions=constants.SCHEMA_TOPIC_NUM_PARTITIONS,
replication_factor=config["replication_factor"],
topic_configs={"cleanup.policy": "compact"},
)


class OffsetsWatcher:
"""Synchronization container for threads to wait until an offset is seen.
Expand Down Expand Up @@ -89,11 +141,9 @@ def __init__(
self.schemas: Dict[int, TypedSchema] = {}
self.global_schema_id = 0
self.admin_client: Optional[KafkaAdminClient] = None
self.schema_topic = None
self.topic_replication_factor = self.config["replication_factor"]
self.consumer: Optional[KafkaConsumer] = None
self.offset_watcher = OffsetsWatcher()
self.running = True
self.id_lock = Lock()
self.stats = StatsClient(
sentry_config=config["sentry"], # type: ignore[arg-type]
Expand All @@ -113,80 +163,9 @@ def __init__(
self.offset = OFFSET_EMPTY
self.ready = False

def init_consumer(self) -> None:
# Group not set on purpose, all consumers read the same data
session_timeout_ms = self.config["session_timeout_ms"]
request_timeout_ms = max(session_timeout_ms, KafkaConsumer.DEFAULT_CONFIG["request_timeout_ms"])
self.consumer = KafkaConsumer(
self.config["topic_name"],
enable_auto_commit=False,
api_version=(1, 0, 0),
bootstrap_servers=self.config["bootstrap_uri"],
client_id=self.config["client_id"],
security_protocol=self.config["security_protocol"],
ssl_cafile=self.config["ssl_cafile"],
ssl_certfile=self.config["ssl_certfile"],
ssl_keyfile=self.config["ssl_keyfile"],
sasl_mechanism=self.config["sasl_mechanism"],
sasl_plain_username=self.config["sasl_plain_username"],
sasl_plain_password=self.config["sasl_plain_password"],
auto_offset_reset="earliest",
session_timeout_ms=session_timeout_ms,
request_timeout_ms=request_timeout_ms,
kafka_client=KarapaceKafkaClient,
metadata_max_age_ms=self.config["metadata_max_age_ms"],
)

def init_admin_client(self) -> bool:
try:
self.admin_client = KafkaAdminClient(
api_version_auto_timeout_ms=constants.API_VERSION_AUTO_TIMEOUT_MS,
bootstrap_servers=self.config["bootstrap_uri"],
client_id=self.config["client_id"],
security_protocol=self.config["security_protocol"],
ssl_cafile=self.config["ssl_cafile"],
ssl_certfile=self.config["ssl_certfile"],
ssl_keyfile=self.config["ssl_keyfile"],
sasl_mechanism=self.config["sasl_mechanism"],
sasl_plain_username=self.config["sasl_plain_username"],
sasl_plain_password=self.config["sasl_plain_password"],
)
return True
except (NodeNotReadyError, NoBrokersAvailable, AssertionError):
LOG.warning("No Brokers available yet, retrying init_admin_client()")
time.sleep(2.0)
except: # pylint: disable=bare-except
LOG.exception("Failed to initialize admin client, retrying init_admin_client()")
time.sleep(2.0)
return False

@staticmethod
def get_new_schema_topic(config: dict) -> NewTopic:
return NewTopic(
name=config["topic_name"],
num_partitions=constants.SCHEMA_TOPIC_NUM_PARTITIONS,
replication_factor=config["replication_factor"],
topic_configs={"cleanup.policy": "compact"},
)

def create_schema_topic(self) -> bool:
assert self.admin_client is not None, "Thread must be started"

schema_topic = self.get_new_schema_topic(self.config)
try:
LOG.info("Creating topic: %r", schema_topic)
self.admin_client.create_topics([schema_topic], timeout_ms=constants.TOPIC_CREATION_TIMEOUT_MS)
LOG.info("Topic: %r created successfully", self.config["topic_name"])
self.schema_topic = schema_topic
return True
except TopicAlreadyExistsError:
LOG.warning("Topic: %r already exists", self.config["topic_name"])
self.schema_topic = schema_topic
return True
except: # pylint: disable=bare-except
LOG.exception("Failed to create topic: %r, retrying create_schema_topic()", self.config["topic_name"])
time.sleep(5)
return False
# This event controls when the Reader should stop running, it will be
# set by another thread (e.g. `KarapaceSchemaRegistry`)
self._stop = Event()

def get_schema_id(self, new_schema: TypedSchema) -> int:
with self.id_lock:
Expand All @@ -198,34 +177,62 @@ def get_schema_id(self, new_schema: TypedSchema) -> int:

def close(self) -> None:
LOG.info("Closing schema_reader")
self.running = False
self._stop.set()

def run(self) -> None:
while self.running:
try:
if not self.admin_client:
if self.init_admin_client() is False:
continue
if not self.schema_topic:
if self.create_schema_topic() is False:
continue
if not self.consumer:
self.init_consumer()
self.handle_messages()
LOG.info("Status: offset: %r, ready: %r", self.offset, self.ready)
except Exception as e: # pylint: disable=broad-except
if self.stats:
with ExitStack() as stack:
while not self._stop.is_set() and self.admin_client is None:
try:
self.admin_client = _create_admin_client_from_config(self.config)
stack.enter_context(closing(self.admin_client))
except (NodeNotReadyError, NoBrokersAvailable, AssertionError):
LOG.warning("[Admin Client] No Brokers available yet. Retrying")
self._stop.wait(timeout=KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS)
except KafkaConfigurationError:
LOG.exception("[Admin Client] Invalid configuration. Bailing")
raise
except Exception as e: # pylint: disable=broad-except
LOG.exception("[Admin Client] Unexpected exception. Retrying")
self.stats.unexpected_exception(ex=e, where="admin_client_instantiation")
self._stop.wait(timeout=2.0)

while not self._stop.is_set() and self.consumer is None:
try:
self.consumer = _create_consumer_from_config(self.config)
stack.enter_context(closing(self.consumer))
except (NodeNotReadyError, NoBrokersAvailable, AssertionError):
LOG.warning("[Consumer] No Brokers available yet. Retrying")
self._stop.wait(timeout=2.0)
except KafkaConfigurationError:
LOG.exception("[Consumer] Invalid configuration. Bailing")
raise
except Exception as e: # pylint: disable=broad-except
LOG.exception("[Consumer] Unexpected exception. Retrying")
self.stats.unexpected_exception(ex=e, where="consumer_instantiation")
self._stop.wait(timeout=KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS)

schema_topic_exists = False
schema_topic = new_schema_topic_from_config(self.config)
schema_topic_create = [schema_topic]
while not self._stop.is_set() and not schema_topic_exists:
try:
LOG.info("[Schema Topic] Creating %r", schema_topic)
self.admin_client.create_topics(schema_topic_create, timeout_ms=constants.TOPIC_CREATION_TIMEOUT_MS)
LOG.info("[Schema Topic] Successfully created %r", schema_topic.name)
schema_topic_exists = True
except TopicAlreadyExistsError:
LOG.warning("[Schema Topic] Already exists %r", schema_topic.name)
schema_topic_exists = True
except: # pylint: disable=bare-except
LOG.exception("[Schema Topic] Failed to create %r, retrying", schema_topic.name)
self._stop.wait(timeout=SCHEMA_TOPIC_CREATION_TIMEOUT_SECONDS)

while not self._stop.is_set():
try:
self.handle_messages()
except Exception as e: # pylint: disable=broad-except
self.stats.unexpected_exception(ex=e, where="schema_reader_loop")
LOG.exception("Unexpected exception in schema reader loop")
try:
if self.admin_client:
self.admin_client.close()
if self.consumer:
self.consumer.close()
except Exception as e: # pylint: disable=broad-except
if self.stats:
self.stats.unexpected_exception(ex=e, where="schema_reader_exit")
LOG.exception("Unexpected exception closing schema reader")
LOG.exception("Unexpected exception in schema reader loop")

def handle_messages(self) -> None:
assert self.consumer is not None, "Thread must be started"
Expand Down

0 comments on commit 3aca4d3

Please sign in to comment.