Skip to content

Commit

Permalink
feat: add version of karapace to the healthcheck response
Browse files Browse the repository at this point in the history
  • Loading branch information
eliax1996 committed Oct 23, 2023
1 parent 118ba6b commit bd48eb9
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 22 deletions.
6 changes: 5 additions & 1 deletion karapace/karapace.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from karapace.rapu import HTTPRequest, HTTPResponse, RestApp
from karapace.typing import JsonObject
from karapace.utils import json_encode
from karapace.version import __version__
from typing import Awaitable, Callable, NoReturn
from typing_extensions import TypeAlias

Expand Down Expand Up @@ -79,7 +80,10 @@ async def root_get(self) -> NoReturn:
self.r({}, "application/json")

async def health(self, _request: Request) -> aiohttp.web.Response:
resp: JsonObject = {"process_uptime_sec": int(time.monotonic() - self._process_start_time)}
resp: JsonObject = {
"process_uptime_sec": int(time.monotonic() - self._process_start_time),
"Karapace-Version": __version__,
}
for hook in self.health_hooks:
resp.update(await hook())
return aiohttp.web.Response(
Expand Down
74 changes: 53 additions & 21 deletions tests/integration/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
"""
from __future__ import annotations

from kafka import KafkaProducer
from kafka.errors import UnknownTopicOrPartitionError
from karapace.client import Client
from karapace.kafka_rest_apis import KafkaRest, KafkaRestAdminClient
from karapace.version import __version__
from pytest import raises
from tests.integration.conftest import REST_PRODUCER_MAX_REQUEST_BYTES
from tests.utils import (
Expand All @@ -27,7 +31,7 @@
NEW_TOPIC_TIMEOUT = 10


def check_successful_publish_response(success_response, objects, partition_id=None):
def check_successful_publish_response(success_response, objects, partition_id=None) -> None:
assert success_response.ok
success_response = success_response.json()
for k in ["value_schema_id", "offsets"]:
Expand All @@ -40,15 +44,25 @@ def check_successful_publish_response(success_response, objects, partition_id=No
assert partition_id == o["partition"]


async def test_request_body_too_large(rest_async_client, admin_client):
async def test_health_endpoint(rest_async_client: Client) -> None:
res = await rest_async_client.get("/_health")
assert res.status_code == 200
response = res.json()
assert "process_uptime_sec" in response
assert "Karapace-Version" in response
assert response["process_uptime_sec"] >= 0
assert response["Karapace-Version"] == __version__


async def test_request_body_too_large(rest_async_client: KafkaRestAdminClient, admin_client: Client) -> None:
tn = new_topic(admin_client)
await wait_for_topics(rest_async_client, topic_names=[tn], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
pl = {"records": [{"value": 1_048_576 * "a"}]}
res = await rest_async_client.post(f"/topics/{tn}", pl, headers={"Content-Type": "application/json"})
assert res.status_code == 413


async def test_content_types(rest_async_client, admin_client):
async def test_content_types(rest_async_client: KafkaRestAdminClient, admin_client: Client) -> None:
tn = new_topic(admin_client)
await wait_for_topics(rest_async_client, topic_names=[tn], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
valid_headers = [
Expand Down Expand Up @@ -120,7 +134,7 @@ async def test_content_types(rest_async_client, admin_client):
assert not res.ok


async def test_avro_publish_primitive_schema(rest_async_client, admin_client):
async def test_avro_publish_primitive_schema(rest_async_client: KafkaRestAdminClient, admin_client: Client) -> None:
topic_str = new_topic(admin_client)
topic_int = new_topic(admin_client)
await wait_for_topics(rest_async_client, topic_names=[topic_str, topic_int], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
Expand All @@ -142,7 +156,11 @@ async def test_avro_publish_primitive_schema(rest_async_client, admin_client):
assert "partition" in o


async def test_avro_publish(rest_async_client, registry_async_client, admin_client):
async def test_avro_publish(
rest_async_client: Client,
registry_async_client: Client,
admin_client: KafkaRestAdminClient,
) -> None:
tn = new_topic(admin_client)
other_tn = new_topic(admin_client)

Expand Down Expand Up @@ -186,7 +204,7 @@ async def test_avro_publish(rest_async_client, registry_async_client, admin_clie
# assert res.status_code == 422, f"Expecting schema {second_schema_json} to not match records {test_objects}"


async def test_admin_client(admin_client, producer):
async def test_admin_client(admin_client: KafkaRestAdminClient, producer: KafkaProducer) -> None:
topic_names = [new_topic(admin_client) for i in range(10, 13)]
topic_info = admin_client.cluster_metadata()
retrieved_names = list(topic_info["topics"].keys())
Expand Down Expand Up @@ -226,7 +244,7 @@ async def test_admin_client(admin_client, producer):
admin_client.cluster_metadata(topics=["another_invalid_name"])


async def test_internal(rest_async, admin_client):
async def test_internal(rest_async: KafkaRest | None, admin_client: KafkaRestAdminClient) -> None:
topic_name = new_topic(admin_client)
prepared_records = [
[b"key", b"value", 0],
Expand Down Expand Up @@ -259,7 +277,7 @@ async def test_internal(rest_async, admin_client):
assert rest_async_proxy.all_empty({"records": [{"value": {"foo": "bar"}}]}, "key") is True


async def test_topics(rest_async_client, admin_client):
async def test_topics(rest_async_client: Client, admin_client: KafkaRestAdminClient) -> None:
topic_foo = "foo"
tn = new_topic(admin_client)
await wait_for_topics(rest_async_client, topic_names=[tn], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
Expand All @@ -280,7 +298,7 @@ async def test_topics(rest_async_client, admin_client):
assert res.json()["error_code"] == 40403, "Error code does not match"


async def test_list_topics(rest_async_client, admin_client):
async def test_list_topics(rest_async_client, admin_client) -> None:
tn1 = new_topic(admin_client)
tn2 = new_topic(admin_client)
await wait_for_topics(rest_async_client, topic_names=[tn1, tn2], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
Expand All @@ -299,7 +317,7 @@ async def test_list_topics(rest_async_client, admin_client):
assert tn1 in topic_list and tn2 in topic_list, f"Topic list contains all topics tn1={tn1} and tn2={tn2}"


async def test_publish(rest_async_client, admin_client):
async def test_publish(rest_async_client: Client, admin_client: KafkaRestAdminClient) -> None:
topic = new_topic(admin_client)
await wait_for_topics(rest_async_client, topic_names=[topic], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
topic_url = f"/topics/{topic}"
Expand All @@ -319,7 +337,7 @@ async def test_publish(rest_async_client, admin_client):

# Produce messages to a topic without key and without explicit partition to verify that
# partitioner assigns partition randomly
async def test_publish_random_partitioning(rest_async_client, admin_client):
async def test_publish_random_partitioning(rest_async_client: Client, admin_client: KafkaRestAdminClient) -> None:
topic = new_topic(admin_client, num_partitions=100)
await wait_for_topics(rest_async_client, topic_names=[topic], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
topic_url = f"/topics/{topic}"
Expand All @@ -338,7 +356,7 @@ async def test_publish_random_partitioning(rest_async_client, admin_client):
assert len(partitions_seen) >= 2, "Partitioner should randomly assign to different partitions if no key given"


async def test_publish_malformed_requests(rest_async_client, admin_client):
async def test_publish_malformed_requests(rest_async_client: Client, admin_client: KafkaRestAdminClient) -> None:
topic_name = new_topic(admin_client)
await wait_for_topics(rest_async_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
for url in [f"/topics/{topic_name}", f"/topics/{topic_name}/partitions/0"]:
Expand Down Expand Up @@ -376,7 +394,7 @@ async def test_publish_malformed_requests(rest_async_client, admin_client):
assert res.status_code == 422


async def test_too_large_record(rest_async_client, admin_client):
async def test_too_large_record(rest_async_client: Client, admin_client: KafkaRestAdminClient) -> None:
tn = new_topic(admin_client)
await wait_for_topics(rest_async_client, topic_names=[tn], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
# Record batch overhead is 22 bytes, reduce just above
Expand All @@ -392,7 +410,7 @@ async def test_too_large_record(rest_async_client, admin_client):
)


async def test_publish_to_nonexisting_topic(rest_async_client):
async def test_publish_to_nonexisting_topic(rest_async_client: Client) -> None:
tn = new_random_name("topic-that-should-not-exist")
header = REST_HEADERS["avro"]
# check succeeds with 1 record and brand new schema
Expand All @@ -405,7 +423,11 @@ async def test_publish_to_nonexisting_topic(rest_async_client):
assert res.json()["error_code"] == 40401, "Error code should be for topic not found"


async def test_publish_with_incompatible_data(rest_async_client, registry_async_client, admin_client):
async def test_publish_with_incompatible_data(
rest_async_client: Client,
registry_async_client: Client,
admin_client: KafkaRestAdminClient,
) -> None:
topic_name = new_topic(admin_client)
subject_1 = f"{topic_name}-value"

Expand Down Expand Up @@ -448,7 +470,7 @@ async def test_publish_with_incompatible_data(rest_async_client, registry_async_
assert "Object does not fit to stored schema" in res_json["message"]


async def test_publish_with_incompatible_schema(rest_async_client, admin_client):
async def test_publish_with_incompatible_schema(rest_async_client: Client, admin_client: KafkaRestAdminClient) -> None:
topic_name = new_topic(admin_client)
await wait_for_topics(rest_async_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
url = f"/topics/{topic_name}"
Expand Down Expand Up @@ -493,7 +515,11 @@ async def test_publish_with_incompatible_schema(rest_async_client, admin_client)
assert "Error when registering schema" in res_json["message"]


async def test_publish_with_schema_id_of_another_subject(rest_async_client, registry_async_client, admin_client):
async def test_publish_with_schema_id_of_another_subject(
rest_async_client: Client,
registry_async_client: Client,
admin_client: KafkaRestAdminClient,
) -> None:
"""
Karapace issue 658: https://github.com/aiven/karapace/issues/658
"""
Expand Down Expand Up @@ -560,8 +586,10 @@ async def test_publish_with_schema_id_of_another_subject(rest_async_client, regi


async def test_publish_with_schema_id_of_another_subject_novalidation(
rest_async_novalidation_client, registry_async_client, admin_client
):
rest_async_novalidation_client: Client,
registry_async_client: Client,
admin_client: KafkaRestAdminClient,
) -> None:
"""
Same as above but with name_strategy_validation disabled as config
"""
Expand Down Expand Up @@ -623,13 +651,17 @@ async def test_publish_with_schema_id_of_another_subject_novalidation(
assert res.status_code == 200


async def test_brokers(rest_async_client):
async def test_brokers(rest_async_client: Client) -> None:
res = await rest_async_client.get("/brokers")
assert res.ok
assert len(res.json()) == 1, "Only one broker should be running"


async def test_partitions(rest_async_client, admin_client, producer):
async def test_partitions(
rest_async_client: Client,
admin_client: KafkaRestAdminClient,
producer: KafkaProducer,
) -> None:
# TODO -> This seems to be the only combination accepted by the offsets endpoint
topic_name = new_topic(admin_client)
await wait_for_topics(rest_async_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
Expand Down

0 comments on commit bd48eb9

Please sign in to comment.