Skip to content

Commit

Permalink
Merge pull request #660 from aiven/giuseppelillo/fix-producer-error
Browse files Browse the repository at this point in the history
fix: Correctly raise producer exceptions
  • Loading branch information
jjaakola-aiven authored Jun 16, 2023
2 parents b13553a + 86fac38 commit ec75a34
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 22 deletions.
51 changes: 29 additions & 22 deletions karapace/backup/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.consumer.fetcher import ConsumerRecord
from kafka.errors import KafkaError, TopicAlreadyExistsError
from kafka.future import Future
from kafka.structs import PartitionMetadata, TopicPartition
from karapace import constants
from karapace.backup.backends.v1 import SchemaBackupV1Reader
Expand All @@ -40,7 +39,7 @@
from pathlib import Path
from rich.console import Console
from tenacity import retry, retry_if_exception_type, RetryCallState, stop_after_delay, wait_fixed
from typing import AbstractSet, Callable, Collection, Iterator, Literal, Mapping, NewType, NoReturn, TypeVar
from typing import AbstractSet, Callable, Collection, Iterator, Literal, Mapping, NewType, TypeVar

import contextlib
import datetime
Expand Down Expand Up @@ -263,16 +262,6 @@ def _consumer(config: Config, topic: str) -> Iterator[KafkaConsumer]:
yield consumer


@contextlib.contextmanager
def _enable_producer_callback_errors() -> Iterator[None]:
global_value = Future.error_on_callbacks
Future.error_on_callbacks = True
try:
yield None
finally:
Future.error_on_callbacks = global_value


@contextlib.contextmanager
def _producer(config: Config, topic: str) -> Iterator[KafkaProducer]:
"""Creates an automatically closing Kafka producer client.
Expand All @@ -282,10 +271,9 @@ def _producer(config: Config, topic: str) -> Iterator[KafkaProducer]:
:raises PartitionCountError: if the topic does not have exactly one partition.
:raises Exception: if client creation fails, concrete exception types are unknown, see Kafka implementation.
"""
with _enable_producer_callback_errors():
with kafka_producer_from_config(config) as producer:
__check_partition_count(topic, producer.partitions_for)
yield producer
with kafka_producer_from_config(config) as producer:
__check_partition_count(topic, producer.partitions_for)
yield producer


def _normalize_location(input_location: str) -> Path | StdOut:
Expand Down Expand Up @@ -390,13 +378,10 @@ def _handle_restore_topic(
raise BackupTopicAlreadyExists(f"Topic to restore '{instruction.topic_name}' already exists")


def _raise_backup_error(exception: Exception) -> NoReturn:
raise BackupDataRestorationError("Error while producing restored messages") from exception


def _handle_producer_send(
instruction: ProducerSend,
producer: KafkaProducer,
producer_error_callback: Callable[[Exception], None],
) -> None:
LOG.debug(
"Sending kafka msg key: %r, value: %r",
Expand All @@ -411,7 +396,7 @@ def _handle_producer_send(
partition=instruction.partition_index,
headers=[(key.decode() if key is not None else None, value) for key, value in instruction.headers],
timestamp_ms=instruction.timestamp,
).add_errback(_raise_backup_error)
).add_errback(producer_error_callback)
except (KafkaError, AssertionError) as ex:
raise BackupDataRestorationError("Error while calling send on restoring messages") from ex

Expand Down Expand Up @@ -446,11 +431,23 @@ def restore_backup(
LOG.info("Identified backup backend: %s", backend.__class__.__name__)
LOG.info("Starting backup restore for topic: %r", topic_name)

# Stores the latest exception raised by the error callback set on producer.send()
_producer_exception = None

# We set up an ExitStack context, so that we can enter the producer context only
# after processing a RestoreTopic instruction.
with contextlib.ExitStack() as stack:
producer = None

def _producer_error_callback(exception: Exception) -> None:
LOG.error("Producer error", exc_info=exception)
nonlocal _producer_exception
_producer_exception = exception

def _check_producer_exception() -> None:
if _producer_exception is not None:
raise BackupDataRestorationError("Error while producing restored messages") from _producer_exception

for instruction in backend.read(backup_location, topic_name):
if isinstance(instruction, RestoreTopicLegacy):
_handle_restore_topic_legacy(instruction, config, skip_topic_creation)
Expand All @@ -461,10 +458,20 @@ def restore_backup(
elif isinstance(instruction, ProducerSend):
if producer is None:
raise RuntimeError("Backend has not yet sent RestoreTopic.")
_handle_producer_send(instruction, producer)
_handle_producer_send(instruction, producer, _producer_error_callback)
# Immediately check if producer.send() generated an exception. This call is
# only an optimization, as producing is asynchronous and no sends might
# have been executed once we reach this line.
_check_producer_exception()
else:
assert_never(instruction)

# Check if an exception was raised after the producer was flushed and closed
# by `kafka_producer_from_config` context manager. As opposed to the previous
# call, this one is essential for correct behavior, as when we reach this point the
# producer can no longer be sending messages (it has been flushed and closed).
_check_producer_exception()


def create_backup(
config: Config,
Expand Down
32 changes: 32 additions & 0 deletions tests/integration/backup/test_v3_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,38 @@ def __exit__(self, exc_type, exc_value, exc_traceback):
)


def test_producer_raises_exceptions(
admin_client: KafkaAdminClient,
kafka_servers: KafkaServers,
) -> None:
topic_name = "596ddf6b"
backup_directory = Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_single_partition" / topic_name
metadata_path = backup_directory / f"{topic_name}.metadata"

# Make sure topic doesn't exist beforehand.
try:
admin_client.delete_topics([topic_name])
except UnknownTopicOrPartitionError:
print("No previously existing topic.")
else:
print("Deleted topic from previous run.")

config = set_config_defaults(
{
"bootstrap_uri": kafka_servers.bootstrap_servers,
}
)

with patch("kafka.producer.record_accumulator.RecordAccumulator.append") as p:
p.side_effect = UnknownTopicOrPartitionError()
with pytest.raises(BackupDataRestorationError):
api.restore_backup(
config=config,
backup_location=metadata_path,
topic_name=TopicName(topic_name),
)


def no_color_env() -> dict[str, str]:
env = os.environ.copy()
try:
Expand Down

0 comments on commit ec75a34

Please sign in to comment.