Skip to content

Commit

Permalink
Configure Kafka clients based on REST auth method
Browse files Browse the repository at this point in the history
In the previous commit the handling of the authentication header was
added to differentiate between basic and OAuth (Bearer token) based
authentication. With this change, based on the established
configuration, Kafka clients are configured differently.
  • Loading branch information
Mátyás Kuti committed Oct 4, 2023
1 parent 1e21729 commit 029aa99
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 22 deletions.
2 changes: 2 additions & 0 deletions karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class Config(TypedDict):
sasl_mechanism: str | None
sasl_plain_username: str | None
sasl_plain_password: str | None
sasl_oauth_token: str | None
topic_name: str
metadata_max_age_ms: int
admin_metadata_max_age: int
Expand Down Expand Up @@ -131,6 +132,7 @@ class ConfigDefaults(Config, total=False):
"sasl_mechanism": None,
"sasl_plain_username": None,
SASL_PLAIN_PASSWORD: None,
"sasl_oauth_token": None,
"topic_name": DEFAULT_SCHEMA_TOPIC,
"metadata_max_age_ms": 60000,
"admin_metadata_max_age": 5,
Expand Down
10 changes: 3 additions & 7 deletions karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from karapace.config import Config, create_client_ssl_context
from karapace.errors import InvalidSchema
from karapace.kafka_rest_apis.admin import KafkaRestAdminClient
from karapace.kafka_rest_apis.auth_utils import get_auth_config_from_header
from karapace.kafka_rest_apis.auth_utils import get_auth_config_from_header, get_kafka_client_auth_parameters_from_config
from karapace.kafka_rest_apis.consumer_manager import ConsumerManager
from karapace.kafka_rest_apis.error_codes import RESTErrorCodes
from karapace.kafka_rest_apis.schema_cache import TopicSchemaCache
Expand Down Expand Up @@ -461,9 +461,7 @@ async def _maybe_create_async_producer(self) -> AIOKafkaProducer:
metadata_max_age_ms=self.config["metadata_max_age_ms"],
security_protocol=self.config["security_protocol"],
ssl_context=ssl_context,
sasl_mechanism=self.config["sasl_mechanism"],
sasl_plain_username=self.config["sasl_plain_username"],
sasl_plain_password=self.config["sasl_plain_password"],
**get_kafka_client_auth_parameters_from_config(self.config),
)

try:
Expand Down Expand Up @@ -616,13 +614,11 @@ def init_admin_client(self):
ssl_cafile=self.config["ssl_cafile"],
ssl_certfile=self.config["ssl_certfile"],
ssl_keyfile=self.config["ssl_keyfile"],
sasl_mechanism=self.config["sasl_mechanism"],
sasl_plain_username=self.config["sasl_plain_username"],
sasl_plain_password=self.config["sasl_plain_password"],
api_version=(1, 0, 0),
metadata_max_age_ms=self.config["metadata_max_age_ms"],
connections_max_idle_ms=self.config["connections_max_idle_ms"],
kafka_client=KarapaceKafkaClient,
**get_kafka_client_auth_parameters_from_config(self.config, async_client=False),
)
break
except: # pylint: disable=bare-except
Expand Down
52 changes: 46 additions & 6 deletions karapace/kafka_rest_apis/auth_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
import aiohttp
import dataclasses
import enum
from aiokafka.abc import AbstractTokenProvider as AbstractTokenProviderAsync
from http import HTTPStatus
from typing import NoReturn, Optional, TypedDict, Union

from kafka.oauth.abstract import AbstractTokenProvider

from karapace.config import Config
from karapace.rapu import HTTPResponse, JSON_CONTENT_TYPE
from typing import NoReturn, Optional, TypedDict, Union

import aiohttp
import dataclasses
import enum


@enum.unique
Expand Down Expand Up @@ -65,3 +65,43 @@ def get_auth_config_from_header(
}

raise_unauthorized()


@dataclasses.dataclass
class SimpleOauthTokenProvider(AbstractTokenProvider):
_token: str

def token(self) -> str:
return self._token


@dataclasses.dataclass
class SimpleOauthTokenProviderAsync(AbstractTokenProviderAsync):
_token: str

async def token(self) -> str:
return self._token


class SASLOauthParams(TypedDict):
sasl_mechanism: str
sasl_oauth_token_provider: Union[AbstractTokenProvider, AbstractTokenProviderAsync]


def get_kafka_client_auth_parameters_from_config(
config: Config,
*,
async_client: bool = True,
) -> Union[SASLPlainConfig, SASLOauthParams]:
if config["sasl_mechanism"] == "OAUTHBEARER":
token_provider_cls = SimpleOauthTokenProviderAsync if async_client else SimpleOauthTokenProvider
return {
"sasl_mechanism": config["sasl_mechanism"],
"sasl_oauth_token_provider": token_provider_cls(config["sasl_oauth_token"]),
}

return {
"sasl_mechanism": config["sasl_mechanism"],
"sasl_plain_username": config["sasl_plain_username"],
"sasl_plain_password": config["sasl_plain_password"],
}
5 changes: 2 additions & 3 deletions karapace/kafka_rest_apis/consumer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from kafka.errors import GroupAuthorizationFailedError, IllegalStateError, KafkaConfigurationError, KafkaError
from kafka.structs import TopicPartition
from karapace.config import Config, create_client_ssl_context
from karapace.kafka_rest_apis.auth_utils import get_kafka_client_auth_parameters_from_config
from karapace.kafka_rest_apis.error_codes import RESTErrorCodes
from karapace.karapace import empty_response, KarapaceBase
from karapace.serialization import DeserializationError, InvalidMessageHeader, InvalidPayload, SchemaRegistrySerializer
Expand Down Expand Up @@ -205,9 +206,6 @@ async def create_kafka_consumer(self, fetch_min_bytes, group_name, internal_name
client_id=internal_name,
security_protocol=self.config["security_protocol"],
ssl_context=ssl_context,
sasl_mechanism=self.config["sasl_mechanism"],
sasl_plain_username=self.config["sasl_plain_username"],
sasl_plain_password=self.config["sasl_plain_password"],
group_id=group_name,
fetch_min_bytes=max(1, fetch_min_bytes), # Discard earlier negative values
fetch_max_bytes=self.config["consumer_request_max_bytes"],
Expand All @@ -218,6 +216,7 @@ async def create_kafka_consumer(self, fetch_min_bytes, group_name, internal_name
enable_auto_commit=request_data["auto.commit.enable"],
auto_offset_reset=request_data["auto.offset.reset"],
session_timeout_ms=session_timeout_ms,
**get_kafka_client_auth_parameters_from_config(self.config),
)
await c.start()
return c
Expand Down
3 changes: 3 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ ignore_errors = True
# dependencies.
# - Write your own stubs. You don't need to write stubs for the whole library,
# only the parts that Karapace is interacting with.
[mypy-aiokafka.*]
ignore_missing_imports = True

[mypy-kafka.*]
ignore_missing_imports = True

Expand Down
54 changes: 48 additions & 6 deletions tests/unit/test_auth_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
import base64
from http import HTTPStatus
from typing import Optional

import pytest

from karapace.config import set_config_defaults, ConfigDefaults
from karapace.config import ConfigDefaults, set_config_defaults
from karapace.kafka_rest_apis.auth_utils import (
get_auth_config_from_header,
get_kafka_client_auth_parameters_from_config,
SimpleOauthTokenProvider,
SimpleOauthTokenProviderAsync,
)
from karapace.rapu import HTTPResponse, JSON_CONTENT_TYPE
from typing import Optional

import base64
import pytest


@pytest.mark.parametrize(
Expand Down Expand Up @@ -65,3 +65,45 @@ def test_get_auth_config_from_header(
config = set_config_defaults(config_override)
auth_config = get_auth_config_from_header(auth_header, config)
assert auth_config == expected_auth_config


def test_simple_oauth_token_provider_returns_configured_token() -> None:
token_provider = SimpleOauthTokenProvider("TOKEN")
assert token_provider.token() == "TOKEN"


async def test_simple_oauth_token_provider_async_returns_configured_token() -> None:
token_provider = SimpleOauthTokenProviderAsync("TOKEN")
assert await token_provider.token() == "TOKEN"


def test_get_client_auth_parameters_from_config_sasl_plain() -> None:
config = set_config_defaults(
{"sasl_mechanism": "PLAIN", "sasl_plain_username": "username", "sasl_plain_password": "password"}
)

client_auth_params = get_kafka_client_auth_parameters_from_config(config)

assert client_auth_params == {
"sasl_mechanism": "PLAIN",
"sasl_plain_username": "username",
"sasl_plain_password": "password",
}


def test_get_client_auth_parameters_from_config_oauth() -> None:
config = set_config_defaults({"sasl_mechanism": "OAUTHBEARER", "sasl_oauth_token": "TOKEN"})

client_auth_params = get_kafka_client_auth_parameters_from_config(config, async_client=False)

assert client_auth_params["sasl_mechanism"] == "OAUTHBEARER"
assert client_auth_params["sasl_oauth_token_provider"].token() == "TOKEN"


async def test_get_client_auth_parameters_from_config_oauth_async() -> None:
config = set_config_defaults({"sasl_mechanism": "OAUTHBEARER", "sasl_oauth_token": "TOKEN"})

client_auth_params = get_kafka_client_auth_parameters_from_config(config, async_client=True)

assert client_auth_params["sasl_mechanism"] == "OAUTHBEARER"
assert await client_auth_params["sasl_oauth_token_provider"].token() == "TOKEN"

0 comments on commit 029aa99

Please sign in to comment.