-
Notifications
You must be signed in to change notification settings - Fork 71
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replace sync Kafka Producers with confluent_kafka one #765
Conversation
cad2885
to
69b591d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is extracted from the pre-existing karapace/kafka_admin.py
, with the exception of the translate_from_kafkaerror
function.
self._errors: set[KafkaError] = set() | ||
self.log = logging.getLogger(f"{self.__module__}.{self.__class__.__qualname__}") | ||
|
||
super().__init__(self._get_config_from_params(bootstrap_servers, **params)) # type: ignore[call-arg] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The type: ignore
is needed because mypy doesn't know the mixin will be used alongside classes that can handle this __init__
call. Unfortunately protocols don't help either, due to the mixin's methods calling each other.
def _activate_callbacks(self) -> None: | ||
# Any client in the `confluent_kafka` library needs `poll` called to | ||
# trigger any callbacks registered (eg. for errors, OAuth tokens, etc.) | ||
self.poll(timeout=0.0) # type: ignore[attr-defined] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The type: ignore
is needed because mypy doesn't know the mixin will be used alongside classes that can handle this poll
call. Unfortunately protocols don't help either, due to the mixin's methods calling each other.
""" | ||
for _ in range(3): | ||
try: | ||
self.list_topics(timeout=1) # type: ignore[attr-defined] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The type: ignore
is needed because mypy doesn't know the mixin will be used alongside classes that can handle this list_topics
call. Unfortunately protocols don't help either, due to the mixin's methods calling each other.
karapace/kafka/admin.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is moved from the pre-existing karapace/kafka_admin.py
, with some of the code moved to karapace/kafka/common.py
karapace/kafka_admin.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved into karapace/kafka/admin.py
and karapace/kafka/common.py
69b591d
to
54aae1e
Compare
This change replaces all synchronous Kafka producers (from the kafka-python library), with a new implementation based on confluent-kafka-python's `Producer`. The aim is to keep the same interface as much as possible. The change so far doesn't intend to fully remove all references to kafka-python, most notably developer friendly errors and exceptions are still coming from kafka-python. A new `karapace.kafka` module is introduced, splitting the previously added admin client and the new producer into their own modules. Resources: * confluent-kafka-python documentation: https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html# * librdkafka configuration documentation: https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
54aae1e
to
b606514
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, nice work 👍
).add_errback(producer_error_callback) | ||
except (KafkaError, AssertionError) as ex: | ||
raise BackupDataRestorationError("Error while calling send on restoring messages") from ex | ||
headers=[(key.decode(), value) for key, value in instruction.headers if key is not None], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note for posterity, this was discussed out of bands: header keys cannot be null, so it's correct to simplify handling here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why doing that instead of asserting that isn't None
?
In that way if the assumptions isn't true we are skipping a record, IMO we should fail if an assumption is violated rather than proving that currently isn't violated and if the implementation changes in the future we have a skip instead of an error
"ResourceType", | ||
"TopicMetadata", | ||
) | ||
|
||
class AdminClient: | ||
def __init__(self, config: dict[str, str | int | Callable]) -> None: ... | ||
def poll(self, timeout: float) -> int: ... | ||
def poll(self, timeout: float = -1) -> int: ... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def poll(self, timeout: float = -1) -> int: ... | |
def poll(self, timeout: float = ...) -> int: ... |
nit (non-blocking): Default values have no effect in type hints, and are usually omitted to eliminate duplication (see last paragraph of this section for reference).
Previously in #765 skipping None header keys was added. While now it's not causing any problems, in the future it could be an issue if we suppress/skip error-cases instead of explicit failure.
Previously in #765 skipping None header keys was added. While now it's not causing any problems, in the future it could be an issue if we suppress/skip error-cases instead of explicit failure.
About this change - What it does
conlfuent_kafka
-based producer clientKafkaAdminClient
karapace.kafka
to contain all Kafka client codekafka-python
's exception classesWhy this way
kafka-python
in the code can be done after sync consumers are replaced withconfluent-kafka
-based ones too, as well as removing the reliance onkafka-python
's exceptions