diff --git a/karapace/karapace.py b/karapace/karapace.py index c06b6bcb5..646a9a87f 100644 --- a/karapace/karapace.py +++ b/karapace/karapace.py @@ -14,6 +14,7 @@ from karapace.rapu import HTTPRequest, HTTPResponse, RestApp from karapace.typing import JsonObject from karapace.utils import json_encode +from karapace.version import __version__ from typing import Awaitable, Callable, NoReturn from typing_extensions import TypeAlias @@ -79,7 +80,10 @@ async def root_get(self) -> NoReturn: self.r({}, "application/json") async def health(self, _request: Request) -> aiohttp.web.Response: - resp: JsonObject = {"process_uptime_sec": int(time.monotonic() - self._process_start_time)} + resp: JsonObject = { + "process_uptime_sec": int(time.monotonic() - self._process_start_time), + "Karapace-Version": __version__, + } for hook in self.health_hooks: resp.update(await hook()) return aiohttp.web.Response( diff --git a/tests/integration/test_rest.py b/tests/integration/test_rest.py index c7a49813c..01ae6efa2 100644 --- a/tests/integration/test_rest.py +++ b/tests/integration/test_rest.py @@ -4,7 +4,11 @@ """ from __future__ import annotations +from kafka import KafkaProducer from kafka.errors import UnknownTopicOrPartitionError +from karapace.client import Client +from karapace.kafka_rest_apis import KafkaRest, KafkaRestAdminClient +from karapace.version import __version__ from pytest import raises from tests.integration.conftest import REST_PRODUCER_MAX_REQUEST_BYTES from tests.utils import ( @@ -27,7 +31,7 @@ NEW_TOPIC_TIMEOUT = 10 -def check_successful_publish_response(success_response, objects, partition_id=None): +def check_successful_publish_response(success_response, objects, partition_id=None) -> None: assert success_response.ok success_response = success_response.json() for k in ["value_schema_id", "offsets"]: @@ -40,7 +44,17 @@ def check_successful_publish_response(success_response, objects, partition_id=No assert partition_id == o["partition"] -async def test_request_body_too_large(rest_async_client, admin_client): +async def test_health_endpoint(rest_async_client: Client) -> None: + res = await rest_async_client.get("/_health") + assert res.status_code == 200 + response = res.json() + assert "process_uptime_sec" in response + assert "Karapace-Version" in response + assert response["process_uptime_sec"] >= 0 + assert response["Karapace-Version"] == __version__ + + +async def test_request_body_too_large(rest_async_client: KafkaRestAdminClient, admin_client: Client) -> None: tn = new_topic(admin_client) await wait_for_topics(rest_async_client, topic_names=[tn], timeout=NEW_TOPIC_TIMEOUT, sleep=1) pl = {"records": [{"value": 1_048_576 * "a"}]} @@ -48,7 +62,7 @@ async def test_request_body_too_large(rest_async_client, admin_client): assert res.status_code == 413 -async def test_content_types(rest_async_client, admin_client): +async def test_content_types(rest_async_client: KafkaRestAdminClient, admin_client: Client) -> None: tn = new_topic(admin_client) await wait_for_topics(rest_async_client, topic_names=[tn], timeout=NEW_TOPIC_TIMEOUT, sleep=1) valid_headers = [ @@ -120,7 +134,7 @@ async def test_content_types(rest_async_client, admin_client): assert not res.ok -async def test_avro_publish_primitive_schema(rest_async_client, admin_client): +async def test_avro_publish_primitive_schema(rest_async_client: KafkaRestAdminClient, admin_client: Client) -> None: topic_str = new_topic(admin_client) topic_int = new_topic(admin_client) await wait_for_topics(rest_async_client, topic_names=[topic_str, topic_int], timeout=NEW_TOPIC_TIMEOUT, sleep=1) @@ -142,7 +156,11 @@ async def test_avro_publish_primitive_schema(rest_async_client, admin_client): assert "partition" in o -async def test_avro_publish(rest_async_client, registry_async_client, admin_client): +async def test_avro_publish( + rest_async_client: Client, + registry_async_client: Client, + admin_client: KafkaRestAdminClient, +) -> None: tn = new_topic(admin_client) other_tn = new_topic(admin_client) @@ -186,7 +204,7 @@ async def test_avro_publish(rest_async_client, registry_async_client, admin_clie # assert res.status_code == 422, f"Expecting schema {second_schema_json} to not match records {test_objects}" -async def test_admin_client(admin_client, producer): +async def test_admin_client(admin_client: KafkaRestAdminClient, producer: KafkaProducer) -> None: topic_names = [new_topic(admin_client) for i in range(10, 13)] topic_info = admin_client.cluster_metadata() retrieved_names = list(topic_info["topics"].keys()) @@ -226,7 +244,7 @@ async def test_admin_client(admin_client, producer): admin_client.cluster_metadata(topics=["another_invalid_name"]) -async def test_internal(rest_async, admin_client): +async def test_internal(rest_async: KafkaRest | None, admin_client: KafkaRestAdminClient) -> None: topic_name = new_topic(admin_client) prepared_records = [ [b"key", b"value", 0], @@ -259,7 +277,7 @@ async def test_internal(rest_async, admin_client): assert rest_async_proxy.all_empty({"records": [{"value": {"foo": "bar"}}]}, "key") is True -async def test_topics(rest_async_client, admin_client): +async def test_topics(rest_async_client: Client, admin_client: KafkaRestAdminClient) -> None: topic_foo = "foo" tn = new_topic(admin_client) await wait_for_topics(rest_async_client, topic_names=[tn], timeout=NEW_TOPIC_TIMEOUT, sleep=1) @@ -280,7 +298,7 @@ async def test_topics(rest_async_client, admin_client): assert res.json()["error_code"] == 40403, "Error code does not match" -async def test_list_topics(rest_async_client, admin_client): +async def test_list_topics(rest_async_client, admin_client) -> None: tn1 = new_topic(admin_client) tn2 = new_topic(admin_client) await wait_for_topics(rest_async_client, topic_names=[tn1, tn2], timeout=NEW_TOPIC_TIMEOUT, sleep=1) @@ -299,7 +317,7 @@ async def test_list_topics(rest_async_client, admin_client): assert tn1 in topic_list and tn2 in topic_list, f"Topic list contains all topics tn1={tn1} and tn2={tn2}" -async def test_publish(rest_async_client, admin_client): +async def test_publish(rest_async_client: Client, admin_client: KafkaRestAdminClient) -> None: topic = new_topic(admin_client) await wait_for_topics(rest_async_client, topic_names=[topic], timeout=NEW_TOPIC_TIMEOUT, sleep=1) topic_url = f"/topics/{topic}" @@ -319,7 +337,7 @@ async def test_publish(rest_async_client, admin_client): # Produce messages to a topic without key and without explicit partition to verify that # partitioner assigns partition randomly -async def test_publish_random_partitioning(rest_async_client, admin_client): +async def test_publish_random_partitioning(rest_async_client: Client, admin_client: KafkaRestAdminClient) -> None: topic = new_topic(admin_client, num_partitions=100) await wait_for_topics(rest_async_client, topic_names=[topic], timeout=NEW_TOPIC_TIMEOUT, sleep=1) topic_url = f"/topics/{topic}" @@ -338,7 +356,7 @@ async def test_publish_random_partitioning(rest_async_client, admin_client): assert len(partitions_seen) >= 2, "Partitioner should randomly assign to different partitions if no key given" -async def test_publish_malformed_requests(rest_async_client, admin_client): +async def test_publish_malformed_requests(rest_async_client: Client, admin_client: KafkaRestAdminClient) -> None: topic_name = new_topic(admin_client) await wait_for_topics(rest_async_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1) for url in [f"/topics/{topic_name}", f"/topics/{topic_name}/partitions/0"]: @@ -376,7 +394,7 @@ async def test_publish_malformed_requests(rest_async_client, admin_client): assert res.status_code == 422 -async def test_too_large_record(rest_async_client, admin_client): +async def test_too_large_record(rest_async_client: Client, admin_client: KafkaRestAdminClient) -> None: tn = new_topic(admin_client) await wait_for_topics(rest_async_client, topic_names=[tn], timeout=NEW_TOPIC_TIMEOUT, sleep=1) # Record batch overhead is 22 bytes, reduce just above @@ -392,7 +410,7 @@ async def test_too_large_record(rest_async_client, admin_client): ) -async def test_publish_to_nonexisting_topic(rest_async_client): +async def test_publish_to_nonexisting_topic(rest_async_client: Client) -> None: tn = new_random_name("topic-that-should-not-exist") header = REST_HEADERS["avro"] # check succeeds with 1 record and brand new schema @@ -405,7 +423,11 @@ async def test_publish_to_nonexisting_topic(rest_async_client): assert res.json()["error_code"] == 40401, "Error code should be for topic not found" -async def test_publish_with_incompatible_data(rest_async_client, registry_async_client, admin_client): +async def test_publish_with_incompatible_data( + rest_async_client: Client, + registry_async_client: Client, + admin_client: KafkaRestAdminClient, +) -> None: topic_name = new_topic(admin_client) subject_1 = f"{topic_name}-value" @@ -448,7 +470,7 @@ async def test_publish_with_incompatible_data(rest_async_client, registry_async_ assert "Object does not fit to stored schema" in res_json["message"] -async def test_publish_with_incompatible_schema(rest_async_client, admin_client): +async def test_publish_with_incompatible_schema(rest_async_client: Client, admin_client: KafkaRestAdminClient) -> None: topic_name = new_topic(admin_client) await wait_for_topics(rest_async_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1) url = f"/topics/{topic_name}" @@ -493,7 +515,11 @@ async def test_publish_with_incompatible_schema(rest_async_client, admin_client) assert "Error when registering schema" in res_json["message"] -async def test_publish_with_schema_id_of_another_subject(rest_async_client, registry_async_client, admin_client): +async def test_publish_with_schema_id_of_another_subject( + rest_async_client: Client, + registry_async_client: Client, + admin_client: KafkaRestAdminClient, +) -> None: """ Karapace issue 658: https://github.com/aiven/karapace/issues/658 """ @@ -560,8 +586,10 @@ async def test_publish_with_schema_id_of_another_subject(rest_async_client, regi async def test_publish_with_schema_id_of_another_subject_novalidation( - rest_async_novalidation_client, registry_async_client, admin_client -): + rest_async_novalidation_client: Client, + registry_async_client: Client, + admin_client: KafkaRestAdminClient, +) -> None: """ Same as above but with name_strategy_validation disabled as config """ @@ -623,13 +651,17 @@ async def test_publish_with_schema_id_of_another_subject_novalidation( assert res.status_code == 200 -async def test_brokers(rest_async_client): +async def test_brokers(rest_async_client: Client) -> None: res = await rest_async_client.get("/brokers") assert res.ok assert len(res.json()) == 1, "Only one broker should be running" -async def test_partitions(rest_async_client, admin_client, producer): +async def test_partitions( + rest_async_client: Client, + admin_client: KafkaRestAdminClient, + producer: KafkaProducer, +) -> None: # TODO -> This seems to be the only combination accepted by the offsets endpoint topic_name = new_topic(admin_client) await wait_for_topics(rest_async_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1)