Skip to content

Commit

Permalink
Merge pull request #406 from aiven/schema-registry-auth
Browse files Browse the repository at this point in the history
Schema registry auth
  • Loading branch information
ivanyu authored Jun 21, 2022
2 parents b14e429 + 90ad8c9 commit 9f2ddf3
Show file tree
Hide file tree
Showing 12 changed files with 1,020 additions and 78 deletions.
113 changes: 111 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -390,17 +390,27 @@ Keys to take special care are the ones needed to configure Kafka and advertised_
- Filename to a private key for the Karapace server in HTTPS mode.
* - ``registry_host``
- ``127.0.0.1``
- Kafka Registry host, used by Kafka Rest for Avro related requests.
- Schema Registry host, used by Kafka Rest for schema related requests.
If running both in the same process, it should be left to its default value
* - ``registry_port``
- ``8081``
- Kafka Registry port, used by Kafka Rest for Avro related requests.
- Schema Registry port, used by Kafka Rest for schema related requests.
If running both in the same process, it should be left to its default value
* - ``registry_user``
- ``None``
- Schema Registry user for authentication, used by Kafka Rest for schema related requests.
* - ``registry_password``
- ``None``
- Schema Registry password for authentication, used by Kafka Rest for schema related requests.
* - ``registry_ca``
- ``/path/to/cafile``
- Kafka Registry CA certificate, used by Kafka Rest for Avro related requests.
If this is set, Kafka Rest will use HTTPS to connect to the registry.
If running both in the same process, it should be left to its default value
* - ``registry_authfile``
- ``/path/to/authfile.json``
- Filename to specify users and access control rules for Karapace Schema Registry.
If this is set, Schema Segistry requires authentication for most of the endpoints and applies per endpoint authorization rules.
* - ``metadata_max_age_ms``
- ``60000``
- Period of time in milliseconds after Kafka metadata is force refreshed.
Expand All @@ -424,6 +434,105 @@ Keys to take special care are the ones needed to configure Kafka and advertised_
- ``lowest``
- Decides on what basis the Karapace cluster master is chosen (only relevant in a multi node setup)


Authentication and authorization of Karapace Schema Registry REST API
=====================================================================

To enable HTTP Basic Authentication and user authorization the authorization configuration file is set in the main configuration key ``registry_authfile`` of the Karapace.

Karapace Schema Registry authorization file is an optional JSON configuration, which contains a list of authorized users in ``users`` and a list of access control rules in ``permissions``.

Each user entry contains following attributes:

.. list-table::
:header-rows: 1

* - Parameter
- Description
* - ``username``
- A string
* - ``algorithm``
- One of supported hashing algorithms, ``scrypt``, ``sha1``, ``sha256``, or ``sha512``
* - ``salt``
- Salt used for hashing the password
* - ``password_hash``
- Hash string of the password calculated using given algorithm and salt.

Password hashing can be done using ``karapace_mkpasswd`` tool, if installed, or by invoking directly with ``python -m karapace.auth``. The tool generates JSON entry with these fields. ::

$ karapace_mkpasswd -u user -a sha512 secret
{
"username": "user",
"algorithm": "sha512",
"salt": "iuLouaExTeg9ypqTxqP-dw",
"password_hash": "R6ghYSXdLGsq6hkQcg8wT4xkD4QToxBhlp7NerTnyB077M+mD2qiN7ZxXCDb4aE+5lExu5P11UpMPYAcVYxSQA=="
}

Each access control rule contains following attributes:

.. list-table::
:header-rows: 1

* - Parameter
- Description
* - ``username``
- A string to match against authenticated user
* - ``operation``
- Exact value of ``Read`` or ``Write``. Write implies also read permissions. Write includes all mutable operations, e.g. deleting schema versions
* - ``resource``
- A regular expression used to match against accessed resource.

Supported resource authorization:

.. list-table::
:header-rows: 1

* - Resource
- Description
* - ``Config:``
- Controls authorization to global schema registry configuration.
* - ``Subject:<subject_name>``
- Controls authorization to subject. The ``<subject_name>`` is a regular expression to match against the accessed subject.

Example of complete authorization file
--------------------------------------

::

{
"users": [
{
"username": "admin",
"algorithm": "scrypt",
"salt": "<put salt for randomized hashing here>",
"password_hash": "<put hashed password here>"
},
{
"username": "plainuser",
"algorithm": "sha256",
"salt": "<put salt for randomized hashing here>",
"password_hash": "<put hashed password here>"
}
],
"permissions": [
{
"username": "admin",
"operation": "Write",
"resource": ".*"
},
{
"username": "plainuser",
"operation": "Read",
"resource": "Subject:general.*"
},
{
"username": "plainuser",
"operation": "Read",
"resource": "Config:"
}
]
}

Uninstall
=========

Expand Down
1 change: 1 addition & 0 deletions karapace.config.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"sasl_plain_password": null,
"karapace_rest": true,
"karapace_registry": true,
"registry_authfile": null,
"topic_name": "_schemas",
"protobuf_runtime_directory": "runtime",
"session_timeout_ms": 10000
Expand Down
211 changes: 211 additions & 0 deletions karapace/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
from base64 import b64encode
from dataclasses import dataclass, field
from enum import Enum, unique
from hmac import compare_digest
from karapace.config import InvalidConfiguration
from karapace.rapu import JSON_CONTENT_TYPE
from karapace.statsd import StatsClient
from typing import Optional

import aiohttp
import aiohttp.web
import argparse
import asyncio
import base64
import hashlib
import json
import logging
import os
import re
import secrets
import sys

log = logging.getLogger(__name__)


@unique
class Operation(Enum):
Read = "Read"
Write = "Write"


@unique
class HashAlgorithm(Enum):
SHA1 = "sha1"
SHA256 = "sha256"
SHA512 = "sha512"
SCRYPT = "scrypt"


def hash_password(algorithm: HashAlgorithm, salt: str, plaintext_password: str) -> str:
if algorithm in [HashAlgorithm.SHA1, HashAlgorithm.SHA256, HashAlgorithm.SHA512]:
return b64encode(
hashlib.pbkdf2_hmac(algorithm.value, bytearray(plaintext_password, "UTF-8"), bytearray(salt, "UTF-8"), 5000)
).decode("ascii")
if algorithm == HashAlgorithm.SCRYPT:
return str(
base64.b64encode(
hashlib.scrypt(bytearray(plaintext_password, "utf-8"), salt=bytearray(salt, "utf-8"), n=16384, r=8, p=1)
),
encoding="utf-8",
)
raise NotImplementedError(f"Hash algorithm '{algorithm}' is not implemented")


@dataclass
class User:
username: str
algorithm: HashAlgorithm
salt: str
password_hash: str = field(repr=False)

def compare_password(self, plaintext_password: str) -> bool:
return compare_digest(self.password_hash, hash_password(self.algorithm, self.salt, plaintext_password))


@dataclass(frozen=True)
class ACLEntry:
username: str
operation: Operation
resource: re.Pattern


class HTTPAuthorizer:
def __init__(self, filename: str) -> None:
self._auth_filename: str = filename
self._refresh_auth_task: Optional[asyncio.Task] = None
# Once first, can raise if file not valid
self._load_authfile()

async def start_refresh_task(self, stats: StatsClient) -> None:
"""Start authfile refresher task"""

async def _refresh_authfile() -> None:
"""Reload authfile, but keep old auth data if loading fails"""

last_loaded = os.path.getmtime(self._auth_filename)

while True:
try:
await asyncio.sleep(5)
last_modified = os.path.getmtime(self._auth_filename)
if last_loaded < last_modified:
self._load_authfile()
last_loaded = last_modified
except asyncio.CancelledError:
log.info("Closing schema registry ACL refresh task")
return
except Exception as ex: # pylint: disable=broad-except
log.exception("Schema registry auth file could not be loaded")
stats.unexpected_exception(ex=ex, where="schema_registry_authfile_reloader")

self._refresh_auth_task = asyncio.create_task(_refresh_authfile())

async def close(self) -> None:
if self._refresh_auth_task is not None:
self._refresh_auth_task.cancel()
self._refresh_auth_task = None

def _load_authfile(self) -> None:
try:
with open(self._auth_filename, "r") as authfile:
authdata = json.load(authfile)

users = {
user["username"]: User(
username=user["username"],
algorithm=HashAlgorithm(user["algorithm"]),
salt=user["salt"],
password_hash=user["password_hash"],
)
for user in authdata["users"]
}
permissions = [
ACLEntry(entry["username"], Operation(entry["operation"]), re.compile(entry["resource"]))
for entry in authdata["permissions"]
]
self.userdb = users
log.info(
"Loaded schema registry users: %s",
users,
)
self.permissions = permissions
log.info(
"Loaded schema registry access control rules: %s",
[(entry.username, entry.operation.value, entry.resource.pattern) for entry in permissions],
)
except Exception as ex:
raise InvalidConfiguration("Failed to load auth file") from ex

def check_authorization(self, user: Optional[User], operation: Operation, resource: str) -> bool:
if user is None:
return False

def check_operation(operation: Operation, aclentry: ACLEntry) -> bool:
"""Does ACL entry allow given operation.
An entry at minimum gives Read permission. Write permission implies Read."""
return operation == Operation.Read or aclentry.operation == Operation.Write

def check_resource(resource: str, aclentry: ACLEntry) -> bool:
return aclentry.resource.match(resource) is not None

for aclentry in self.permissions:
if (
aclentry.username == user.username
and check_operation(operation, aclentry)
and check_resource(resource, aclentry)
):
return True
return False

def authenticate(self, request: aiohttp.web.Request) -> User:
auth_header = request.headers.get("Authorization")
if auth_header is None:
raise aiohttp.web.HTTPUnauthorized(
headers={"WWW-Authenticate": 'Basic realm="Karapace Schema Registry"'},
text='{"message": "Unauthorized"}',
content_type=JSON_CONTENT_TYPE,
)
try:
auth = aiohttp.BasicAuth.decode(auth_header)
except ValueError:
# pylint: disable=raise-missing-from
raise aiohttp.web.HTTPUnauthorized(
headers={"WWW-Authenticate": 'Basic realm="Karapace Schema Registry"'},
text='{"message": "Unauthorized"}',
content_type=JSON_CONTENT_TYPE,
)
user = self.userdb.get(auth.login)
if user is None or not user.compare_password(auth.password):
raise aiohttp.web.HTTPUnauthorized(
headers={"WWW-Authenticate": 'Basic realm="Karapace Schema Registry"'},
text='{"message": "Unauthorized"}',
content_type=JSON_CONTENT_TYPE,
)

return user


def main() -> int:
parser = argparse.ArgumentParser(prog="karapace_mkpasswd", description="Karapace password hasher")
parser.add_argument("-u", "--user", help="Username", type=str)
parser.add_argument(
"-a", "--algorithm", help="Hash algorithm", choices=["sha1", "sha256", "sha512", "scrypt"], default="sha512"
)
parser.add_argument(metavar="password", dest="plaintext_password", help="Password to hash", type=str)
parser.add_argument("salt", help="Salt for hashing, random generated if not given", nargs="?", type=str)
args = parser.parse_args()
salt: str = args.salt or secrets.token_urlsafe(nbytes=16)
result = {}
if args.user:
result["username"] = args.user
result["algorithm"] = args.algorithm
result["salt"] = salt
result["password_hash"] = hash_password(HashAlgorithm(args.algorithm), salt, args.plaintext_password)
print(json.dumps(result, indent=4))
return 0


if __name__ == "__main__":
sys.exit(main())
Loading

0 comments on commit 9f2ddf3

Please sign in to comment.