diff --git a/README.rst b/README.rst index a20406805..c2500c1cd 100644 --- a/README.rst +++ b/README.rst @@ -390,17 +390,27 @@ Keys to take special care are the ones needed to configure Kafka and advertised_ - Filename to a private key for the Karapace server in HTTPS mode. * - ``registry_host`` - ``127.0.0.1`` - - Kafka Registry host, used by Kafka Rest for Avro related requests. + - Schema Registry host, used by Kafka Rest for schema related requests. If running both in the same process, it should be left to its default value * - ``registry_port`` - ``8081`` - - Kafka Registry port, used by Kafka Rest for Avro related requests. + - Schema Registry port, used by Kafka Rest for schema related requests. If running both in the same process, it should be left to its default value + * - ``registry_user`` + - ``None`` + - Schema Registry user for authentication, used by Kafka Rest for schema related requests. + * - ``registry_password`` + - ``None`` + - Schema Registry password for authentication, used by Kafka Rest for schema related requests. * - ``registry_ca`` - ``/path/to/cafile`` - Kafka Registry CA certificate, used by Kafka Rest for Avro related requests. If this is set, Kafka Rest will use HTTPS to connect to the registry. If running both in the same process, it should be left to its default value + * - ``registry_authfile`` + - ``/path/to/authfile.json`` + - Filename to specify users and access control rules for Karapace Schema Registry. + If this is set, Schema Segistry requires authentication for most of the endpoints and applies per endpoint authorization rules. * - ``metadata_max_age_ms`` - ``60000`` - Period of time in milliseconds after Kafka metadata is force refreshed. @@ -424,6 +434,105 @@ Keys to take special care are the ones needed to configure Kafka and advertised_ - ``lowest`` - Decides on what basis the Karapace cluster master is chosen (only relevant in a multi node setup) + +Authentication and authorization of Karapace Schema Registry REST API +===================================================================== + +To enable HTTP Basic Authentication and user authorization the authorization configuration file is set in the main configuration key ``registry_authfile`` of the Karapace. + +Karapace Schema Registry authorization file is an optional JSON configuration, which contains a list of authorized users in ``users`` and a list of access control rules in ``permissions``. + +Each user entry contains following attributes: + +.. list-table:: + :header-rows: 1 + + * - Parameter + - Description + * - ``username`` + - A string + * - ``algorithm`` + - One of supported hashing algorithms, ``scrypt``, ``sha1``, ``sha256``, or ``sha512`` + * - ``salt`` + - Salt used for hashing the password + * - ``password_hash`` + - Hash string of the password calculated using given algorithm and salt. + +Password hashing can be done using ``karapace_mkpasswd`` tool, if installed, or by invoking directly with ``python -m karapace.auth``. The tool generates JSON entry with these fields. :: + + $ karapace_mkpasswd -u user -a sha512 secret + { + "username": "user", + "algorithm": "sha512", + "salt": "iuLouaExTeg9ypqTxqP-dw", + "password_hash": "R6ghYSXdLGsq6hkQcg8wT4xkD4QToxBhlp7NerTnyB077M+mD2qiN7ZxXCDb4aE+5lExu5P11UpMPYAcVYxSQA==" + } + +Each access control rule contains following attributes: + +.. list-table:: + :header-rows: 1 + + * - Parameter + - Description + * - ``username`` + - A string to match against authenticated user + * - ``operation`` + - Exact value of ``Read`` or ``Write``. Write implies also read permissions. Write includes all mutable operations, e.g. deleting schema versions + * - ``resource`` + - A regular expression used to match against accessed resource. + +Supported resource authorization: + +.. list-table:: + :header-rows: 1 + + * - Resource + - Description + * - ``Config:`` + - Controls authorization to global schema registry configuration. + * - ``Subject:`` + - Controls authorization to subject. The ```` is a regular expression to match against the accessed subject. + +Example of complete authorization file +-------------------------------------- + +:: + + { + "users": [ + { + "username": "admin", + "algorithm": "scrypt", + "salt": "", + "password_hash": "" + }, + { + "username": "plainuser", + "algorithm": "sha256", + "salt": "", + "password_hash": "" + } + ], + "permissions": [ + { + "username": "admin", + "operation": "Write", + "resource": ".*" + }, + { + "username": "plainuser", + "operation": "Read", + "resource": "Subject:general.*" + }, + { + "username": "plainuser", + "operation": "Read", + "resource": "Config:" + } + ] + } + Uninstall ========= diff --git a/karapace.config.json b/karapace.config.json index 3f41510bb..ed4012492 100644 --- a/karapace.config.json +++ b/karapace.config.json @@ -22,6 +22,7 @@ "sasl_plain_password": null, "karapace_rest": true, "karapace_registry": true, + "registry_authfile": null, "topic_name": "_schemas", "protobuf_runtime_directory": "runtime", "session_timeout_ms": 10000 diff --git a/karapace/auth.py b/karapace/auth.py new file mode 100644 index 000000000..44cfe6ae7 --- /dev/null +++ b/karapace/auth.py @@ -0,0 +1,211 @@ +from base64 import b64encode +from dataclasses import dataclass, field +from enum import Enum, unique +from hmac import compare_digest +from karapace.config import InvalidConfiguration +from karapace.rapu import JSON_CONTENT_TYPE +from karapace.statsd import StatsClient +from typing import Optional + +import aiohttp +import aiohttp.web +import argparse +import asyncio +import base64 +import hashlib +import json +import logging +import os +import re +import secrets +import sys + +log = logging.getLogger(__name__) + + +@unique +class Operation(Enum): + Read = "Read" + Write = "Write" + + +@unique +class HashAlgorithm(Enum): + SHA1 = "sha1" + SHA256 = "sha256" + SHA512 = "sha512" + SCRYPT = "scrypt" + + +def hash_password(algorithm: HashAlgorithm, salt: str, plaintext_password: str) -> str: + if algorithm in [HashAlgorithm.SHA1, HashAlgorithm.SHA256, HashAlgorithm.SHA512]: + return b64encode( + hashlib.pbkdf2_hmac(algorithm.value, bytearray(plaintext_password, "UTF-8"), bytearray(salt, "UTF-8"), 5000) + ).decode("ascii") + if algorithm == HashAlgorithm.SCRYPT: + return str( + base64.b64encode( + hashlib.scrypt(bytearray(plaintext_password, "utf-8"), salt=bytearray(salt, "utf-8"), n=16384, r=8, p=1) + ), + encoding="utf-8", + ) + raise NotImplementedError(f"Hash algorithm '{algorithm}' is not implemented") + + +@dataclass +class User: + username: str + algorithm: HashAlgorithm + salt: str + password_hash: str = field(repr=False) + + def compare_password(self, plaintext_password: str) -> bool: + return compare_digest(self.password_hash, hash_password(self.algorithm, self.salt, plaintext_password)) + + +@dataclass(frozen=True) +class ACLEntry: + username: str + operation: Operation + resource: re.Pattern + + +class HTTPAuthorizer: + def __init__(self, filename: str) -> None: + self._auth_filename: str = filename + self._refresh_auth_task: Optional[asyncio.Task] = None + # Once first, can raise if file not valid + self._load_authfile() + + async def start_refresh_task(self, stats: StatsClient) -> None: + """Start authfile refresher task""" + + async def _refresh_authfile() -> None: + """Reload authfile, but keep old auth data if loading fails""" + + last_loaded = os.path.getmtime(self._auth_filename) + + while True: + try: + await asyncio.sleep(5) + last_modified = os.path.getmtime(self._auth_filename) + if last_loaded < last_modified: + self._load_authfile() + last_loaded = last_modified + except asyncio.CancelledError: + log.info("Closing schema registry ACL refresh task") + return + except Exception as ex: # pylint: disable=broad-except + log.exception("Schema registry auth file could not be loaded") + stats.unexpected_exception(ex=ex, where="schema_registry_authfile_reloader") + + self._refresh_auth_task = asyncio.create_task(_refresh_authfile()) + + async def close(self) -> None: + if self._refresh_auth_task is not None: + self._refresh_auth_task.cancel() + self._refresh_auth_task = None + + def _load_authfile(self) -> None: + try: + with open(self._auth_filename, "r") as authfile: + authdata = json.load(authfile) + + users = { + user["username"]: User( + username=user["username"], + algorithm=HashAlgorithm(user["algorithm"]), + salt=user["salt"], + password_hash=user["password_hash"], + ) + for user in authdata["users"] + } + permissions = [ + ACLEntry(entry["username"], Operation(entry["operation"]), re.compile(entry["resource"])) + for entry in authdata["permissions"] + ] + self.userdb = users + log.info( + "Loaded schema registry users: %s", + users, + ) + self.permissions = permissions + log.info( + "Loaded schema registry access control rules: %s", + [(entry.username, entry.operation.value, entry.resource.pattern) for entry in permissions], + ) + except Exception as ex: + raise InvalidConfiguration("Failed to load auth file") from ex + + def check_authorization(self, user: Optional[User], operation: Operation, resource: str) -> bool: + if user is None: + return False + + def check_operation(operation: Operation, aclentry: ACLEntry) -> bool: + """Does ACL entry allow given operation. + + An entry at minimum gives Read permission. Write permission implies Read.""" + return operation == Operation.Read or aclentry.operation == Operation.Write + + def check_resource(resource: str, aclentry: ACLEntry) -> bool: + return aclentry.resource.match(resource) is not None + + for aclentry in self.permissions: + if ( + aclentry.username == user.username + and check_operation(operation, aclentry) + and check_resource(resource, aclentry) + ): + return True + return False + + def authenticate(self, request: aiohttp.web.Request) -> User: + auth_header = request.headers.get("Authorization") + if auth_header is None: + raise aiohttp.web.HTTPUnauthorized( + headers={"WWW-Authenticate": 'Basic realm="Karapace Schema Registry"'}, + text='{"message": "Unauthorized"}', + content_type=JSON_CONTENT_TYPE, + ) + try: + auth = aiohttp.BasicAuth.decode(auth_header) + except ValueError: + # pylint: disable=raise-missing-from + raise aiohttp.web.HTTPUnauthorized( + headers={"WWW-Authenticate": 'Basic realm="Karapace Schema Registry"'}, + text='{"message": "Unauthorized"}', + content_type=JSON_CONTENT_TYPE, + ) + user = self.userdb.get(auth.login) + if user is None or not user.compare_password(auth.password): + raise aiohttp.web.HTTPUnauthorized( + headers={"WWW-Authenticate": 'Basic realm="Karapace Schema Registry"'}, + text='{"message": "Unauthorized"}', + content_type=JSON_CONTENT_TYPE, + ) + + return user + + +def main() -> int: + parser = argparse.ArgumentParser(prog="karapace_mkpasswd", description="Karapace password hasher") + parser.add_argument("-u", "--user", help="Username", type=str) + parser.add_argument( + "-a", "--algorithm", help="Hash algorithm", choices=["sha1", "sha256", "sha512", "scrypt"], default="sha512" + ) + parser.add_argument(metavar="password", dest="plaintext_password", help="Password to hash", type=str) + parser.add_argument("salt", help="Salt for hashing, random generated if not given", nargs="?", type=str) + args = parser.parse_args() + salt: str = args.salt or secrets.token_urlsafe(nbytes=16) + result = {} + if args.user: + result["username"] = args.user + result["algorithm"] = args.algorithm + result["salt"] = salt + result["password_hash"] = hash_password(HashAlgorithm(args.algorithm), salt, args.plaintext_password) + print(json.dumps(result, indent=4)) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/karapace/client.py b/karapace/client.py index bbb69b2a1..ec6cb0e2e 100644 --- a/karapace/client.py +++ b/karapace/client.py @@ -4,7 +4,7 @@ Copyright (c) 2022 Aiven Ltd See LICENSE for details """ -from aiohttp import ClientSession +from aiohttp import BasicAuth, ClientSession from collections.abc import Mapping from karapace.typing import JsonData from typing import Awaitable, Callable, Optional, Union @@ -19,8 +19,8 @@ LOG = logging.getLogger(__name__) -async def _get_aiohttp_client() -> ClientSession: - return ClientSession() +async def _get_aiohttp_client(*, auth: Optional[BasicAuth] = None) -> ClientSession: + return ClientSession(auth=auth) class Result: @@ -49,10 +49,12 @@ class Client: def __init__( self, server_uri: Optional[str] = None, - client_factory: Callable[[], Awaitable[ClientSession]] = _get_aiohttp_client, + client_factory: Callable[..., Awaitable[ClientSession]] = _get_aiohttp_client, server_ca: Optional[str] = None, + session_auth: Optional[BasicAuth] = None, ) -> None: self.server_uri = server_uri or "" + self.session_auth = session_auth # aiohttp requires to be in the same async loop when creating its client and when using it. # Since karapace Client object is initialized before creating the async context, (in # kafka_rest_api main, when KafkaRest is created), we can't create the aiohttp here. @@ -79,7 +81,7 @@ async def close(self) -> None: async def get_client(self) -> ClientSession: if self._client is None: - self._client = await self.client_factory() + self._client = await self.client_factory(auth=self.session_auth) return self._client @@ -88,6 +90,7 @@ async def get( path: Path, json: JsonData = None, headers: Optional[Headers] = None, + auth: Optional[BasicAuth] = None, ) -> Result: path = self.path_for(path) if not headers: @@ -97,6 +100,7 @@ async def get( path, json=json, headers=headers, + auth=auth, ssl=self.ssl_mode, ) as res: # required for forcing the response body conversion to json despite missing valid Accept headers @@ -107,6 +111,7 @@ async def delete( self, path: Path, headers: Optional[Headers] = None, + auth: Optional[BasicAuth] = None, ) -> Result: path = self.path_for(path) if not headers: @@ -115,6 +120,7 @@ async def delete( async with client.delete( path, headers=headers, + auth=auth, ssl=self.ssl_mode, ) as res: json_result = {} if res.status == 204 else await res.json() @@ -125,6 +131,7 @@ async def post( path: Path, json: JsonData, headers: Optional[Headers] = None, + auth: Optional[BasicAuth] = None, ) -> Result: path = self.path_for(path) if not headers: @@ -134,6 +141,7 @@ async def post( async with client.post( path, headers=headers, + auth=auth, json=json, ssl=self.ssl_mode, ) as res: @@ -145,6 +153,7 @@ async def put( path: Path, json: JsonData, headers: Optional[Headers] = None, + auth: Optional[BasicAuth] = None, ) -> Result: path = self.path_for(path) if not headers: @@ -154,6 +163,7 @@ async def put( async with client.put( path, headers=headers, + auth=auth, json=json, ssl=self.ssl_mode, ) as res: @@ -165,12 +175,14 @@ async def put_with_data( path: Path, data: JsonData, headers: Optional[Headers], + auth: Optional[BasicAuth] = None, ) -> Result: path = self.path_for(path) client = await self.get_client() async with client.put( path, headers=headers, + auth=auth, data=data, ssl=self.ssl_mode, ) as res: diff --git a/karapace/config.py b/karapace/config.py index 5713e656b..83e399348 100644 --- a/karapace/config.py +++ b/karapace/config.py @@ -41,7 +41,10 @@ "server_tls_keyfile": None, "registry_host": "127.0.0.1", "registry_port": 8081, + "registry_user": None, + "registry_password": None, "registry_ca": None, + "registry_authfile": None, "log_level": "DEBUG", "log_format": "%(name)-20s\t%(threadName)s\t%(levelname)-8s\t%(message)s", "master_eligibility": True, diff --git a/karapace/rapu.py b/karapace/rapu.py index 0b9238883..dc859b5b0 100644 --- a/karapace/rapu.py +++ b/karapace/rapu.py @@ -78,7 +78,7 @@ def __init__( self.accepts = accepts self.path_for_stats = path_for_stats self.method = method - self.json = None + self.json: Optional[dict] = None @overload def get_header(self, header: str) -> Optional[str]: @@ -255,6 +255,7 @@ async def _handle_request( callback_with_request=False, json_request=False, rest_request=False, + user=None, ): start_time = time.monotonic() resp = None @@ -316,6 +317,9 @@ async def _handle_request( content_type = self.check_schema_headers(rapu_request) callback_kwargs["content_type"] = content_type + if user is not None: + callback_kwargs["user"] = user + try: data = await callback(**callback_kwargs) status = HTTPStatus.OK @@ -394,7 +398,18 @@ async def _handle_request( return resp - def route(self, path, *, callback, method, schema_request=False, with_request=None, json_body=None, rest_request=False): + def route( + self, + path, + *, + callback, + method, + schema_request=False, + with_request=None, + json_body=None, + rest_request=False, + auth=None, + ): # pretty path for statsd reporting path_for_stats = re.sub(r"<[\w:]+>", "x", path) @@ -410,6 +425,11 @@ def route(self, path, *, callback, method, schema_request=False, with_request=No json_body = True async def wrapped_callback(request): + if auth is not None: + user = auth.authenticate(request) + else: + user = None + return await self._handle_request( request=request, path_for_stats=path_for_stats, @@ -418,6 +438,7 @@ async def wrapped_callback(request): callback_with_request=with_request, json_request=json_body, rest_request=rest_request, + user=user, ) async def wrapped_cors(request): diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index 59b9f3219..ab0679f89 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -3,6 +3,7 @@ from enum import Enum, unique from http import HTTPStatus from kafka import KafkaProducer +from karapace.auth import HTTPAuthorizer, Operation, User from karapace.compatibility import check_compatibility, CompatibilityModes from karapace.compatibility.jsonschema.checks import is_incompatible from karapace.config import Config @@ -10,17 +11,21 @@ from karapace.master_coordinator import MasterCoordinator from karapace.rapu import HTTPRequest, JSON_CONTENT_TYPE, SERVER_NAME from karapace.schema_models import InvalidSchema, InvalidSchemaType, ValidatedTypedSchema -from karapace.schema_reader import KafkaSchemaReader, SchemaType, TypedSchema +from karapace.schema_reader import KafkaSchemaReader, SchemaType, SubjectData, TypedSchema from karapace.typing import JsonData from karapace.utils import json_encode, KarapaceKafkaClient -from typing import Any, Dict, NoReturn, Optional, Tuple +from typing import Any, Dict, NoReturn, Optional, Tuple, Union import aiohttp +import aiohttp.web import async_timeout import asyncio import json +import logging import time +log = logging.getLogger(__name__) + @unique class SchemaErrorCodes(Enum): @@ -55,6 +60,12 @@ class SchemaErrorMessages(Enum): class KarapaceSchemaRegistry(KarapaceBase): def __init__(self, config: Config) -> None: super().__init__(config=config) + + self._auth: Optional[HTTPAuthorizer] = None + if self.config["registry_authfile"] is not None: + self._auth = HTTPAuthorizer(str(self.config["registry_authfile"])) + self.app.on_startup.append(self._start_authorizer) + self._add_schema_registry_routes() self.producer = self._create_producer() self.mc = MasterCoordinator(config=self.config) @@ -64,12 +75,17 @@ def __init__(self, config: Config) -> None: self.schema_lock = asyncio.Lock() self._master_lock = asyncio.Lock() - self._forward_client = None + self._forward_client: Optional[aiohttp.ClientSession] = None self.app.on_startup.append(self._create_forward_client) - async def _create_forward_client(self, app): # pylint: disable=unused-argument + async def _create_forward_client(self, app: aiohttp.web.Application) -> None: # pylint: disable=unused-argument + """Callback for aiohttp.Application.on_startup""" self._forward_client = aiohttp.ClientSession(headers={"User-Agent": SERVER_NAME}) + async def _start_authorizer(self, app: aiohttp.web.Application) -> None: # pylint: disable=unused-argument + """Callback for aiohttp.Application.on_startup""" + await self._auth.start_refresh_task(self.stats) + def _create_producer(self) -> KafkaProducer: while True: try: @@ -92,12 +108,18 @@ def _create_producer(self) -> KafkaProducer: self.log.exception("Unable to create producer, retrying") time.sleep(1) + def _check_authorization(self, user: Optional[User], operation: Operation, resource: str) -> None: + if self._auth: + if not self._auth.check_authorization(user, operation, resource): + self.r(body={"message": "Forbidden"}, content_type=JSON_CONTENT_TYPE, status=HTTPStatus.FORBIDDEN) + def _add_schema_registry_routes(self) -> None: self.route( "/compatibility/subjects//versions/", callback=self.compatibility_check, method="POST", schema_request=True, + auth=self._auth, ) self.route( "/config/", @@ -106,26 +128,78 @@ def _add_schema_registry_routes(self) -> None: schema_request=True, with_request=True, json_body=False, + auth=self._auth, + ) + self.route( + "/config/", + callback=self.config_subject_set, + method="PUT", + schema_request=True, + auth=self._auth, + ) + self.route( + "/config", + callback=self.config_get, + method="GET", + schema_request=True, + auth=self._auth, + ) + self.route( + "/config", + callback=self.config_set, + method="PUT", + schema_request=True, + auth=self._auth, + ) + self.route( + "/schemas/ids//versions", + callback=self.schemas_get_versions, + method="GET", + schema_request=True, + auth=self._auth, + ) + self.route( + "/schemas/ids/", + callback=self.schemas_get, + method="GET", + schema_request=True, + auth=self._auth, + ) + self.route("/schemas/types", callback=self.schemas_types, method="GET", schema_request=True, auth=None) + self.route( + "/subjects", + callback=self.subjects_list, + method="GET", + schema_request=True, + auth=self._auth, + ) + self.route( + "/subjects//versions", + callback=self.subject_post, + method="POST", + schema_request=True, + auth=self._auth, ) - self.route("/config/", callback=self.config_subject_set, method="PUT", schema_request=True) - self.route("/config", callback=self.config_get, method="GET", schema_request=True) - self.route("/config", callback=self.config_set, method="PUT", schema_request=True) self.route( - "/schemas/ids//versions", callback=self.schemas_get_versions, method="GET", schema_request=True + "/subjects/", + callback=self.subjects_schema_post, + method="POST", + schema_request=True, + auth=self._auth, ) - self.route("/schemas/ids/", callback=self.schemas_get, method="GET", schema_request=True) - self.route("/schemas/types", callback=self.schemas_types, method="GET", schema_request=True) - self.route("/subjects", callback=self.subjects_list, method="GET", schema_request=True) - self.route("/subjects//versions", callback=self.subject_post, method="POST", schema_request=True) - self.route("/subjects/", callback=self.subjects_schema_post, method="POST", schema_request=True) self.route( - "/subjects//versions", callback=self.subject_versions_list, method="GET", schema_request=True + "/subjects//versions", + callback=self.subject_versions_list, + method="GET", + schema_request=True, + auth=self._auth, ) self.route( "/subjects//versions/", callback=self.subject_version_get, method="GET", schema_request=True, + auth=self._auth, ) self.route( "/subjects//versions/", # needs @@ -134,12 +208,14 @@ def _add_schema_registry_routes(self) -> None: schema_request=True, with_request=True, json_body=False, + auth=self._auth, ) self.route( "/subjects//versions//schema", callback=self.subject_version_schema_get, method="GET", schema_request=True, + auth=self._auth, ) self.route( "/subjects/", @@ -148,6 +224,7 @@ def _add_schema_registry_routes(self) -> None: schema_request=True, with_request=True, json_body=False, + auth=self._auth, ) async def close(self) -> None: @@ -158,8 +235,10 @@ async def close(self) -> None: stack.enter_context(closing(self.producer)) if self._forward_client: stack.push_async_callback(self._forward_client.close) + if self._auth is not None: + stack.push_async_callback(self._auth.close) - def _subject_get(self, subject, content_type, include_deleted=False) -> Dict[str, Any]: + def _subject_get(self, subject: str, content_type: str, include_deleted: bool = False) -> Dict[str, Any]: subject_data = self.ksr.subjects.get(subject) if not subject_data: self.r( @@ -186,7 +265,7 @@ def _subject_get(self, subject, content_type, include_deleted=False) -> Dict[str subject_data["schemas"] = schemas return subject_data - def _validate_version(self, content_type, version): # pylint: disable=inconsistent-return-statements + def _validate_version(self, content_type: str, version: str): # pylint: disable=inconsistent-return-statements try: version_number = int(version) if version_number > 0: @@ -206,7 +285,7 @@ def _validate_version(self, content_type, version): # pylint: disable=inconsist status=HTTPStatus.UNPROCESSABLE_ENTITY, ) - def _get_compatibility_mode(self, subject, content_type) -> CompatibilityModes: + def _get_compatibility_mode(self, subject: SubjectData, content_type: str) -> CompatibilityModes: compatibility = subject.get("compatibility", self.ksr.config["compatibility"]) try: @@ -294,8 +373,13 @@ def send_delete_subject_message(self, subject, version): value = '{{"subject":"{}","version":{}}}'.format(subject, version) return self.send_kafka_message(key, value) - async def compatibility_check(self, content_type, *, subject, version, request): + async def compatibility_check( + self, content_type: str, *, subject: str, version: str, request: HTTPRequest, user: Optional[User] = None + ) -> None: """Check for schema compatibility""" + + self._check_authorization(user, Operation.Read, f"Subject:{subject}") + body = request.json schema_type = self._validate_schema_type(content_type=content_type, data=body) try: @@ -309,7 +393,7 @@ async def compatibility_check(self, content_type, *, subject, version, request): content_type=content_type, status=HTTPStatus.UNPROCESSABLE_ENTITY, ) - old = await self.subject_version_get(content_type=content_type, subject=subject, version=version, return_dict=True) + old = await self._subject_version_get(content_type=content_type, subject=subject, version=version) old_schema_type = self._validate_schema_type(content_type=content_type, data=old) try: old_schema = ValidatedTypedSchema.parse(old_schema_type, old["schema"]) @@ -334,7 +418,7 @@ async def compatibility_check(self, content_type, *, subject, version, request): self.r({"is_compatible": False}, content_type) self.r({"is_compatible": True}, content_type) - async def schemas_get(self, content_type, *, schema_id): + async def schemas_get(self, content_type: str, *, user: Optional[User] = None, schema_id: str) -> None: try: schema_id_int = int(schema_id) except ValueError: @@ -348,6 +432,26 @@ async def schemas_get(self, content_type, *, schema_id): ) with self.ksr.id_lock: schema = self.ksr.schemas.get(schema_id_int) + + def _has_subject_with_id() -> bool: + with self.ksr.id_lock: + for subject, val in self.ksr.subjects.items(): + if "schemas" not in val: + continue + for schema in val["schemas"].values(): + if ( + int(schema["id"]) == schema_id_int + and not schema["deleted"] + and self._auth is not None + and self._auth.check_authorization(user, Operation.Read, f"Subject:{subject}") + ): + return True + return False + + if self._auth: + if not _has_subject_with_id(): + schema = None + if not schema: self.r( body={ @@ -362,7 +466,7 @@ async def schemas_get(self, content_type, *, schema_id): response_body["schemaType"] = schema.schema_type self.r(response_body, content_type) - async def schemas_get_versions(self, content_type, *, schema_id): + async def schemas_get_versions(self, content_type: str, *, schema_id: str, user: Optional[User] = None) -> None: try: schema_id_int = int(schema_id) except ValueError: @@ -378,6 +482,9 @@ async def schemas_get_versions(self, content_type, *, schema_id): subject_versions = [] with self.ksr.id_lock: for subject, val in self.ksr.subjects.items(): + if self._auth and not self._auth.check_authorization(user, Operation.Read, f"Subject:{subject}"): + continue + if self.ksr.get_schemas(subject) and "schemas" in val: schemas = val["schemas"] for version, schema in schemas.items(): @@ -386,15 +493,19 @@ async def schemas_get_versions(self, content_type, *, schema_id): subject_versions = sorted(subject_versions, key=lambda s: (s["subject"], s["version"])) self.r(subject_versions, content_type) - async def schemas_types(self, content_type): + async def schemas_types(self, content_type: str) -> None: self.r(["JSON", "AVRO", "PROTOBUF"], content_type) - async def config_get(self, content_type): + async def config_get(self, content_type: str, *, user: Optional[User] = None) -> None: + self._check_authorization(user, Operation.Read, "Config:") + # Note: The format sent by the user differs from the return value, this # is for compatibility reasons. self.r({"compatibilityLevel": self.ksr.config["compatibility"]}, content_type) - async def config_set(self, content_type, *, request): + async def config_set(self, content_type: str, *, request: HTTPRequest, user: Optional[User] = None) -> None: + self._check_authorization(user, Operation.Write, "Config:") + body = request.json try: @@ -416,11 +527,15 @@ async def config_set(self, content_type, *, request): self.no_master_error(content_type) else: url = f"{master_url}/config" - await self._forward_request_remote(body=body, url=url, content_type=content_type, method="PUT") + await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type, method="PUT") self.r({"compatibility": self.ksr.config["compatibility"]}, content_type) - async def config_subject_get(self, content_type, subject: str, *, request: HTTPRequest): + async def config_subject_get( + self, content_type: str, subject: str, *, request: HTTPRequest, user: Optional[User] = None + ) -> None: + self._check_authorization(user, Operation.Read, f"Subject:{subject}") + # Config for a subject can exist without schemas so no need to check for their existence assert self.ksr, "KarapaceSchemaRegistry not initialized. Missing call to _init" subject_data = self.ksr.subjects.get(subject, {}) @@ -447,7 +562,11 @@ async def config_subject_get(self, content_type, subject: str, *, request: HTTPR status=HTTPStatus.NOT_FOUND, ) - async def config_subject_set(self, content_type, *, request, subject): + async def config_subject_set( + self, content_type: str, *, request: HTTPRequest, user: Optional[User] = None, subject: str + ) -> None: + self._check_authorization(user, Operation.Write, f"Subject:{subject}") + try: compatibility_level = CompatibilityModes(request.json["compatibility"]) except (ValueError, KeyError): @@ -467,15 +586,24 @@ async def config_subject_set(self, content_type, *, request, subject): self.no_master_error(content_type) else: url = f"{master_url}/config/{subject}" - await self._forward_request_remote(body=request.json, url=url, content_type=content_type, method="PUT") + await self._forward_request_remote( + request=request, body=request.json, url=url, content_type=content_type, method="PUT" + ) self.r({"compatibility": compatibility_level.value}, content_type) - async def subjects_list(self, content_type): - subjects_list = [key for key, val in self.ksr.subjects.items() if self.ksr.get_schemas(key)] - self.r(subjects_list, content_type, status=HTTPStatus.OK) + async def subjects_list(self, content_type: str, *, user: Optional[User] = None) -> None: + subjects = [key for key, val in self.ksr.subjects.items() if self.ksr.get_schemas(key)] + if self._auth is not None: + subjects = list( + filter( + lambda subject: self._auth.check_authorization(user, Operation.Read, f"Subject:{subject}"), + subjects, + ) + ) + self.r(subjects, content_type, status=HTTPStatus.OK) - async def _subject_delete_local(self, content_type: str, subject: str, permanent: bool): + async def _subject_delete_local(self, content_type: str, subject: str, permanent: bool) -> None: subject_data = self._subject_get(subject, content_type, include_deleted=permanent) if permanent and [version for version, value in subject_data["schemas"].items() if not value.get("deleted", False)]: @@ -503,7 +631,11 @@ async def _subject_delete_local(self, content_type: str, subject: str, permanent self.send_delete_subject_message(subject, latest_schema_id) self.r(version_list, content_type, status=HTTPStatus.OK) - async def subject_delete(self, content_type, *, subject, request: HTTPRequest): + async def subject_delete( + self, content_type: str, *, subject: str, request: HTTPRequest, user: Optional[User] = None + ) -> None: + self._check_authorization(user, Operation.Write, f"Subject:{subject}") + permanent = request.query.get("permanent", "false").lower() == "true" are_we_master, master_url = await self.get_master() @@ -514,9 +646,9 @@ async def subject_delete(self, content_type, *, subject, request: HTTPRequest): self.no_master_error(content_type) else: url = f"{master_url}/subjects/{subject}?permanent={permanent}" - await self._forward_request_remote(body={}, url=url, content_type=content_type, method="DELETE") + await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type, method="DELETE") - async def subject_version_get(self, content_type, *, subject, version, return_dict=False): + async def _subject_version_get(self, *, content_type: str, subject: str, version: str) -> dict: self._validate_version(content_type, version) subject_data = self._subject_get(subject, content_type) schema_data = None @@ -555,14 +687,20 @@ async def subject_version_get(self, content_type, *, subject, version, return_di } if schema.schema_type is not SchemaType.AVRO: ret["schemaType"] = schema.schema_type - if return_dict: - # Return also compatibility information to compatibility check - if subject_data.get("compatibility"): - ret["compatibility"] = subject_data.get("compatibility") - return ret - self.r(ret, content_type) - - async def _subject_version_delete_local(self, content_type: str, subject: str, version: int, permanent: bool): + # Return also compatibility information to compatibility check + if subject_data.get("compatibility"): + ret["compatibility"] = subject_data.get("compatibility") + return ret + + async def subject_version_get( + self, content_type: str, *, subject: str, version: str, user: Optional[User] = None + ) -> None: + self._check_authorization(user, Operation.Read, f"Subject:{subject}") + + result = await self._subject_version_get(content_type=content_type, subject=subject, version=version) + self.r(result, content_type) + + async def _subject_version_delete_local(self, content_type: str, subject: str, version: int, permanent: bool) -> None: subject_data = self._subject_get(subject, content_type, include_deleted=True) subject_schema_data = subject_data["schemas"].get(version, None) @@ -605,21 +743,28 @@ async def _subject_version_delete_local(self, content_type: str, subject: str, v ) self.r(str(version), content_type, status=HTTPStatus.OK) - async def subject_version_delete(self, content_type, *, subject, version, request: HTTPRequest): - version = int(version) + async def subject_version_delete( + self, content_type: str, *, subject: str, version: str, request: HTTPRequest, user: Optional[User] = None + ) -> None: + self._check_authorization(user, Operation.Write, f"Subject:{subject}") + version_number = int(version) permanent = request.query.get("permanent", "false").lower() == "true" are_we_master, master_url = await self.get_master() if are_we_master: async with self.schema_lock: - await self._subject_version_delete_local(content_type, subject, version, permanent) + await self._subject_version_delete_local(content_type, subject, version_number, permanent) elif not master_url: self.no_master_error(content_type) else: url = f"{master_url}/subjects/{subject}/versions/{version}?permanent={permanent}" - await self._forward_request_remote(body={}, url=url, content_type=content_type, method="DELETE") + await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type, method="DELETE") + + async def subject_version_schema_get( + self, content_type: str, *, subject: str, version: str, user: Optional[User] = None + ) -> None: + self._check_authorization(user, Operation.Read, f"Subject:{subject}") - async def subject_version_schema_get(self, content_type, *, subject, version): self._validate_version(content_type, version) subject_data = self._subject_get(subject, content_type) @@ -639,9 +784,12 @@ async def subject_version_schema_get(self, content_type, *, subject, version): ) self.r(schema_data["schema"].schema_str, content_type) - async def subject_versions_list(self, content_type, *, subject): + async def subject_versions_list(self, content_type: str, *, subject: str, user: Optional[User] = None) -> None: + self._check_authorization(user, Operation.Read, f"Subject:{subject}") + subject_data = self._subject_get(subject, content_type) - self.r(list(subject_data["schemas"]), content_type, status=HTTPStatus.OK) + schemas = list(subject_data["schemas"]) + self.r(schemas, content_type, status=HTTPStatus.OK) async def get_master(self) -> Tuple[bool, Optional[str]]: async with self._master_lock: @@ -655,7 +803,7 @@ async def get_master(self) -> Tuple[bool, Optional[str]]: return are_we_master, master_url await asyncio.sleep(1.0) - def _validate_schema_request_body(self, content_type, body) -> None: + def _validate_schema_request_body(self, content_type: str, body: Union[dict, Any]) -> None: if not isinstance(body, dict): self.r( body={ @@ -665,12 +813,12 @@ def _validate_schema_request_body(self, content_type, body) -> None: content_type=content_type, status=HTTPStatus.INTERNAL_SERVER_ERROR, ) - for field in body: - if field not in {"schema", "schemaType"}: + for attr in body: + if attr not in {"schema", "schemaType"}: self.r( body={ "error_code": SchemaErrorCodes.HTTP_UNPROCESSABLE_ENTITY.value, - "message": f"Unrecognized field: {field}", + "message": f"Unrecognized field: {attr}", }, content_type=content_type, status=HTTPStatus.UNPROCESSABLE_ENTITY, @@ -691,7 +839,7 @@ def _validate_schema_type(self, content_type: str, data: JsonData) -> SchemaType ) return schema_type - def _validate_schema_key(self, content_type, body) -> None: + def _validate_schema_key(self, content_type: str, body: dict) -> None: if "schema" not in body: self.r( body={ @@ -702,7 +850,11 @@ def _validate_schema_key(self, content_type, body) -> None: status=HTTPStatus.UNPROCESSABLE_ENTITY, ) - async def subjects_schema_post(self, content_type, *, subject, request): + async def subjects_schema_post( + self, content_type: str, *, subject: str, request: HTTPRequest, user: Optional[User] = None + ) -> None: + self._check_authorization(user, Operation.Write, f"Subject:{subject}") + body = request.json self._validate_schema_request_body(content_type, body) subject_data = self._subject_get(subject, content_type) @@ -756,7 +908,11 @@ async def subjects_schema_post(self, content_type, *, subject, request): status=HTTPStatus.NOT_FOUND, ) - async def subject_post(self, content_type, *, subject, request): + async def subject_post( + self, content_type: str, *, subject: str, request: HTTPRequest, user: Optional[User] = None + ) -> None: + self._check_authorization(user, Operation.Write, f"Subject:{subject}") + body = request.json self.log.debug("POST with subject: %r, request: %r", subject, body) self._validate_schema_request_body(content_type, body) @@ -770,7 +926,7 @@ async def subject_post(self, content_type, *, subject, request): self.no_master_error(content_type) else: url = f"{master_url}/subjects/{subject}/versions" - await self._forward_request_remote(body=body, url=url, content_type=content_type, method="POST") + await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type, method="POST") def write_new_schema_local( self, @@ -903,14 +1059,21 @@ def write_new_schema_local( ) self.r({"id": schema_id}, content_type) - async def _forward_request_remote(self, *, body, url, content_type, method="POST"): + async def _forward_request_remote( + self, *, request: HTTPRequest, body: Optional[dict], url: str, content_type: str, method: str = "POST" + ) -> None: assert self._forward_client is not None, "Server must be initialized" - self.log.info("Writing new schema to remote url: %r since we're not the master", url) + self.log.info("Forwarding %s request to remote url: %r since we're not the master", method, url) timeout = 60.0 func = getattr(self._forward_client, method.lower()) + auth_header = request.headers.get("Authorization") + headers = {} + if auth_header is not None: + headers["Authorization"] = auth_header + with async_timeout.timeout(timeout): - async with func(url, json=body) as response: + async with func(url, headers=headers, json=body) as response: if response.headers.get("content-type", "").startswith(JSON_CONTENT_TYPE): resp_content = await response.json() else: @@ -918,7 +1081,7 @@ async def _forward_request_remote(self, *, body, url, content_type, method="POST self.r(body=resp_content, content_type=content_type, status=HTTPStatus(response.status)) - def no_master_error(self, content_type): + def no_master_error(self, content_type: str) -> None: self.r( body={ "error_code": SchemaErrorCodes.NO_MASTER_ERROR.value, diff --git a/karapace/serialization.py b/karapace/serialization.py index 306c3114f..5d870a6c1 100644 --- a/karapace/serialization.py +++ b/karapace/serialization.py @@ -1,3 +1,4 @@ +from aiohttp import BasicAuth from avro.io import BinaryDecoder, BinaryEncoder, DatumReader, DatumWriter from google.protobuf.message import DecodeError from jsonschema import ValidationError @@ -65,8 +66,13 @@ def topic_record_name_strategy(topic_name: str, record_name: str) -> str: class SchemaRegistryClient: - def __init__(self, schema_registry_url: str = "http://localhost:8081", server_ca: Optional[str] = None): - self.client = Client(server_uri=schema_registry_url, server_ca=server_ca) + def __init__( + self, + schema_registry_url: str = "http://localhost:8081", + server_ca: Optional[str] = None, + session_auth: Optional[BasicAuth] = None, + ): + self.client = Client(server_uri=schema_registry_url, server_ca=server_ca, session_auth=session_auth) self.base_url = schema_registry_url async def post_new_schema(self, subject: str, schema: ValidatedTypedSchema) -> int: @@ -118,12 +124,17 @@ def __init__( ) -> None: self.config = config self.state_lock = asyncio.Lock() + session_auth: Optional[BasicAuth] = None + if self.config.get("registry_user") and self.config.get("registry_password"): + session_auth = BasicAuth(self.config.get("registry_user"), self.config.get("registry_password"), encoding="utf8") if self.config.get("registry_ca"): registry_url = f"https://{self.config['registry_host']}:{self.config['registry_port']}" - registry_client = SchemaRegistryClient(registry_url, server_ca=self.config["registry_ca"]) + registry_client = SchemaRegistryClient( + registry_url, server_ca=self.config["registry_ca"], session_auth=session_auth + ) else: registry_url = f"http://{self.config['registry_host']}:{self.config['registry_port']}" - registry_client = SchemaRegistryClient(registry_url) + registry_client = SchemaRegistryClient(registry_url, session_auth=session_auth) self.subject_name_strategy = NAME_STRATEGIES[name_strategy] self.registry_client: Optional[SchemaRegistryClient] = registry_client self.ids_to_schemas: Dict[int, TypedSchema] = {} diff --git a/setup.py b/setup.py index 5be67c065..67730a928 100644 --- a/setup.py +++ b/setup.py @@ -46,6 +46,7 @@ "console_scripts": [ "karapace = karapace.karapace_all:main", "karapace_schema_backup = karapace.schema_backup:main", + "karapace_mkpasswd = karapace.auth:main", ], }, author="Hannu Valtonen", diff --git a/tests/integration/config/karapace.auth.json b/tests/integration/config/karapace.auth.json new file mode 100644 index 000000000..28aed41cd --- /dev/null +++ b/tests/integration/config/karapace.auth.json @@ -0,0 +1,39 @@ +{ + "users": [ + { + "username": "admin", + "algorithm": "scrypt", + "salt": "adminsalt", + "password_hash": "AG7+ttm/skHjqEZ90yzEkgjy75yaqoucNnQPIP2S++1rYOPXEtm4+AF7WdDGj+J0sIPFdYC9nXYsvkB1xYDYkw==" + }, + { + "username": "aladdin", + "algorithm": "sha512", + "salt": "salt", + "password_hash": "uofa7Uw+IolY0GDeCQApTVAUPlBthFYPFQ+QN+kKtXBzSwddiyLrNnIC2UQcA5X+hCL/mHNNqEMA30oIM4oqxA==" + }, + { + "username": "reader", + "algorithm": "sha1", + "salt": "third-salt", + "password_hash": "SeUTCvKCE8rj6lHFkyFkhopktuY=" + } + ], + "permissions": [ + { + "username": "admin", + "operation": "Write", + "resource": ".*" + }, + { + "username": "aladdin", + "operation": "Write", + "resource": "Subject:cave-.*" + }, + { + "username": "reader", + "operation": "Read", + "resource": "Subject:carpet-.*" + } + ] +} diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 33fe815db..ad4550ae5 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -5,8 +5,8 @@ See LICENSE for details """ from _pytest.fixtures import SubRequest -from aiohttp import ClientSession from aiohttp.pytest_plugin import AiohttpClient +from aiohttp.test_utils import TestClient from contextlib import closing, ExitStack from dataclasses import asdict from filelock import FileLock @@ -29,6 +29,7 @@ from tests.integration.utils.zookeeper import configure_and_start_zk from tests.utils import repeat_until_successful_request from typing import AsyncIterator, Iterator, List, Optional +from urllib.parse import urlparse import asyncio import json @@ -258,7 +259,7 @@ async def fixture_rest_async_client( client = Client(server_uri=rest_url) else: - async def get_client() -> ClientSession: + async def get_client(**kwargs) -> TestClient: # pylint: disable=unused-argument return await aiohttp_client(rest_async.app) client = Client(client_factory=get_client) @@ -279,6 +280,77 @@ async def get_client() -> ClientSession: await client.close() +@pytest.fixture(scope="function", name="rest_async_registry_auth") +async def fixture_rest_async_registry_auth( + request: SubRequest, + loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument + kafka_servers: KafkaServers, + registry_async_client_auth: Client, +) -> AsyncIterator[Optional[KafkaRest]]: + + # Do not start a REST api when the user provided an external service. Doing + # so would cause this node to join the existing group and participate in + # the election process. Without proper configuration for the listeners that + # won't work and will cause test failures. + rest_url = request.config.getoption("rest_url") + if rest_url: + yield None + return + + registry = urlparse(registry_async_client_auth.server_uri) + config = set_config_defaults( + { + "bootstrap_uri": kafka_servers.bootstrap_servers, + "admin_metadata_max_age": 2, + "registry_host": registry.hostname, + "registry_port": registry.port, + "registry_user": "admin", + "registry_password": "admin", + } + ) + rest = KafkaRest(config=config) + + try: + yield rest + finally: + await rest.close() + + +@pytest.fixture(scope="function", name="rest_async_client_registry_auth") +async def fixture_rest_async_client_registry_auth( + request: SubRequest, + loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument + rest_async_registry_auth: KafkaRest, + aiohttp_client: AiohttpClient, +) -> AsyncIterator[Client]: + rest_url = request.config.getoption("rest_url") + + # client and server_uri are incompatible settings. + if rest_url: + client = Client(server_uri=rest_url) + else: + + async def get_client(**kwargs) -> TestClient: # pylint: disable=unused-argument + return await aiohttp_client(rest_async_registry_auth.app) + + client = Client(client_factory=get_client) + + try: + # wait until the server is listening, otherwise the tests may fail + await repeat_until_successful_request( + client.get, + "brokers", + json_data=None, + headers=None, + error_msg="REST API is unreachable", + timeout=10, + sleep=0.3, + ) + yield client + finally: + await client.close() + + @pytest.fixture(scope="function", name="registry_async_pair") async def fixture_registry_async_pair( request: SubRequest, @@ -286,7 +358,7 @@ async def fixture_registry_async_pair( session_logdir: Path, kafka_servers: KafkaServers, port_range: PortRangeInclusive, -) -> Iterator[List[str]]: +) -> AsyncIterator[List[str]]: """Starts a cluster of two Schema Registry servers and returns their URL endpoints.""" config1: Config = {"bootstrap_uri": kafka_servers.bootstrap_servers} @@ -435,3 +507,86 @@ async def fixture_registry_async_client_tls( yield client finally: await client.close() + + +@pytest.fixture(scope="function", name="registry_http_auth_endpoint") +async def fixture_registry_http_auth_endpoint( + request: SubRequest, + loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument + session_logdir: Path, + kafka_servers: KafkaServers, + port_range: PortRangeInclusive, +) -> AsyncIterator[str]: + # Do not start a registry when the user provided an external service. Doing + # so would cause this node to join the existing group and participate in + # the election process. Without proper configuration for the listeners that + # won't work and will cause test failures. + registry_url = request.config.getoption("registry_url") + if registry_url: + yield registry_url + return + + config = { + "bootstrap_uri": kafka_servers.bootstrap_servers, + "registry_authfile": "tests/integration/config/karapace.auth.json", + } + async with start_schema_registry_cluster( + config_templates=[config], + data_dir=session_logdir / _clear_test_name(request.node.name), + port_range=port_range, + ) as servers: + yield servers[0].endpoint.to_url() + + +@pytest.fixture(scope="function", name="registry_async_client_auth") +async def fixture_registry_async_client_auth( + request: SubRequest, + loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument + registry_http_auth_endpoint: str, +) -> AsyncIterator[Client]: + client = Client( + server_uri=registry_http_auth_endpoint, + server_ca=request.config.getoption("server_ca"), + ) + + try: + # wait until the server is listening, otherwise the tests may fail + await repeat_until_successful_request( + client.get, + "schemas/types", + json_data=None, + headers=None, + error_msg=f"Registry API {registry_http_auth_endpoint} is unreachable", + timeout=10, + sleep=0.3, + ) + yield client + finally: + await client.close() + + +@pytest.fixture(scope="function", name="registry_async_auth_pair") +async def fixture_registry_async_auth_pair( + request: SubRequest, + loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument + session_logdir: Path, + kafka_servers: KafkaServers, + port_range: PortRangeInclusive, +) -> AsyncIterator[List[str]]: + """Starts a cluster of two Schema Registry servers with authentication enabled and returns their URL endpoints.""" + + config1: Config = { + "bootstrap_uri": kafka_servers.bootstrap_servers, + "registry_authfile": "tests/integration/config/karapace.auth.json", + } + config2: Config = { + "bootstrap_uri": kafka_servers.bootstrap_servers, + "registry_authfile": "tests/integration/config/karapace.auth.json", + } + + async with start_schema_registry_cluster( + config_templates=[config1, config2], + data_dir=session_logdir / _clear_test_name(request.node.name), + port_range=port_range, + ) as endpoints: + yield [server.endpoint.to_url() for server in endpoints] diff --git a/tests/integration/test_schema_registry_auth.py b/tests/integration/test_schema_registry_auth.py new file mode 100644 index 000000000..4d5a52edc --- /dev/null +++ b/tests/integration/test_schema_registry_auth.py @@ -0,0 +1,216 @@ +""" +karapace - schema registry authentication and authorization tests + +Copyright (c) 2022 Aiven Ltd +See LICENSE for details +""" +from karapace.client import Client +from karapace.kafka_rest_apis import KafkaRestAdminClient +from karapace.schema_models import SchemaType, ValidatedTypedSchema +from tests.utils import ( + new_random_name, + new_topic, + schema_avro_json, + schema_jsonschema_json, + test_objects_avro, + wait_for_topics, +) +from typing import List +from urllib.parse import quote + +import aiohttp +import asyncio +import requests + +NEW_TOPIC_TIMEOUT = 10 + +admin = aiohttp.BasicAuth("admin", "admin") +aladdin = aiohttp.BasicAuth("aladdin", "opensesame") +reader = aiohttp.BasicAuth("reader", "secret") + + +async def test_sr_auth(registry_async_client_auth: Client) -> None: + subject = new_random_name("cave-") + + res = await registry_async_client_auth.post(f"subjects/{quote(subject)}/versions", json={"schema": schema_avro_json}) + assert res.status_code == 401 + + res = await registry_async_client_auth.post( + f"subjects/{quote(subject)}/versions", json={"schema": schema_avro_json}, auth=aladdin + ) + assert res.status_code == 200 + sc_id = res.json()["id"] + assert sc_id >= 0 + + res = await registry_async_client_auth.get(f"subjects/{quote(subject)}/versions/latest") + assert res.status_code == 401 + res = await registry_async_client_auth.get(f"subjects/{quote(subject)}/versions/latest", auth=aladdin) + assert res.status_code == 200 + assert sc_id == res.json()["id"] + assert ValidatedTypedSchema.parse(SchemaType.AVRO, schema_avro_json) == ValidatedTypedSchema.parse( + SchemaType.AVRO, res.json()["schema"] + ) + + +async def test_sr_auth_endpoints(registry_async_client_auth: Client) -> None: + """Test endpoints for authorization""" + + subject = new_random_name("any-") + + res = await registry_async_client_auth.post( + f"compatibility/subjects/{quote(subject)}/versions/1", json={"schema": schema_avro_json} + ) + assert res.status_code == 401 + + res = await registry_async_client_auth.get(f"config/{quote(subject)}") + assert res.status_code == 401 + + res = await registry_async_client_auth.put(f"config/{quote(subject)}", json={"compatibility": "NONE"}) + assert res.status_code == 401 + + res = await registry_async_client_auth.get("config") + assert res.status_code == 401 + + res = await registry_async_client_auth.put("config", json={"compatibility": "NONE"}) + assert res.status_code == 401 + + res = await registry_async_client_auth.get("schemas/ids/1/versions") + assert res.status_code == 401 + + # This is an exception that does not require authorization + res = await registry_async_client_auth.get("schemas/types") + assert res.status_code == 200 + + # but let's verify it answers normally if sending authorization header + res = await registry_async_client_auth.get("schemas/types", auth=admin) + assert res.status_code == 200 + + res = await registry_async_client_auth.post(f"subjects/{quote(subject)}/versions", json={"schema": schema_avro_json}) + assert res.status_code == 401 + + res = await registry_async_client_auth.delete(f"subjects/{quote(subject)}/versions/1") + assert res.status_code == 401 + + res = await registry_async_client_auth.get(f"subjects/{quote(subject)}/versions/1/schema") + assert res.status_code == 401 + + res = await registry_async_client_auth.delete(f"subjects/{quote(subject)}") + assert res.status_code == 401 + + +async def test_sr_list_subjects(registry_async_client_auth: Client) -> None: + cavesubject = new_random_name("cave-") + carpetsubject = new_random_name("carpet-") + + res = await registry_async_client_auth.post( + f"subjects/{quote(cavesubject)}/versions", json={"schema": schema_avro_json}, auth=aladdin + ) + assert res.status_code == 200 + sc_id = res.json()["id"] + assert sc_id >= 0 + + res = await registry_async_client_auth.post( + f"subjects/{quote(carpetsubject)}/versions", json={"schema": schema_avro_json}, auth=admin + ) + assert res.status_code == 200 + + res = await registry_async_client_auth.get("subjects", auth=admin) + assert res.status_code == 200 + assert [cavesubject, carpetsubject] == res.json() + + res = await registry_async_client_auth.get(f"subjects/{quote(carpetsubject)}/versions") + assert res.status_code == 401 + + res = await registry_async_client_auth.get(f"subjects/{quote(carpetsubject)}/versions", auth=admin) + assert res.status_code == 200 + assert [sc_id] == res.json() + + res = await registry_async_client_auth.get("subjects", auth=aladdin) + assert res.status_code == 200 + assert [cavesubject] == res.json() + + res = await registry_async_client_auth.get(f"subjects/{quote(carpetsubject)}/versions", auth=aladdin) + assert res.status_code == 403 + + res = await registry_async_client_auth.get("subjects", auth=reader) + assert res.status_code == 200 + assert [carpetsubject] == res.json() + + res = await registry_async_client_auth.get(f"subjects/{quote(carpetsubject)}/versions", auth=reader) + assert res.status_code == 200 + assert [1] == res.json() + + +async def test_sr_ids(registry_async_client_auth: Client) -> None: + cavesubject = new_random_name("cave-") + carpetsubject = new_random_name("carpet-") + + res = await registry_async_client_auth.post( + f"subjects/{quote(cavesubject)}/versions", json={"schema": schema_avro_json}, auth=aladdin + ) + assert res.status_code == 200 + avro_sc_id = res.json()["id"] + assert avro_sc_id >= 0 + + res = await registry_async_client_auth.post( + f"subjects/{quote(carpetsubject)}/versions", + json={"schemaType": "JSON", "schema": schema_jsonschema_json}, + auth=admin, + ) + assert res.status_code == 200 + jsonschema_sc_id = res.json()["id"] + assert jsonschema_sc_id >= 0 + + res = await registry_async_client_auth.get(f"schemas/ids/{avro_sc_id}", auth=aladdin) + assert res.status_code == 200 + + res = await registry_async_client_auth.get(f"schemas/ids/{jsonschema_sc_id}", auth=aladdin) + assert res.status_code == 404 + assert {"error_code": 40403, "message": "Schema not found"} == res.json() + + res = await registry_async_client_auth.get(f"schemas/ids/{avro_sc_id}", auth=reader) + assert res.status_code == 404 + assert {"error_code": 40403, "message": "Schema not found"} == res.json() + + res = await registry_async_client_auth.get(f"schemas/ids/{jsonschema_sc_id}", auth=reader) + assert res.status_code == 200 + + +async def test_sr_auth_forwarding(registry_async_auth_pair: List[str]) -> None: + auth = requests.auth.HTTPBasicAuth("admin", "admin") + + # Test primary/replica forwarding with global config setting + primary_url, replica_url = registry_async_auth_pair + max_tries, counter = 5, 0 + wait_time = 0.5 + for compat in ["FULL", "BACKWARD", "FORWARD", "NONE"]: + resp = requests.put(f"{replica_url}/config", json={"compatibility": compat}, auth=auth) + assert resp.ok + while True: + if counter >= max_tries: + raise Exception("Compat update not propagated") + resp = requests.get(f"{primary_url}/config", auth=auth) + if not resp.ok: + continue + data = resp.json() + if "compatibilityLevel" not in data: + counter += 1 + await asyncio.sleep(wait_time) + continue + if data["compatibilityLevel"] != compat: + counter += 1 + await asyncio.sleep(wait_time) + continue + break + + +# Test that Kafka REST API works when configured with Schema Registry requiring authorization +async def test_rest_api_with_sr_auth(rest_async_client_registry_auth: Client, admin_client: KafkaRestAdminClient) -> None: + client = rest_async_client_registry_auth + + topic = new_topic(admin_client, prefix="cave-rest-") + await wait_for_topics(client, topic_names=[topic], timeout=NEW_TOPIC_TIMEOUT, sleep=1) + + payload = {"value_schema": schema_avro_json, "records": [{"value": o} for o in test_objects_avro]} + res = await client.post(f"topics/{topic}", payload, headers={"Content-Type": "application/vnd.kafka.avro.v1+json"}) + assert res.ok