Skip to content

Commit

Permalink
Merge pull request #472 from aiven/jjaakola-aiven-schema-backup-tool-…
Browse files Browse the repository at this point in the history
…crashed-on-null-key-and-value

fix: tolerate null record keys and values on schema backup and restore
  • Loading branch information
tvainika committed Oct 19, 2022
2 parents da670e3 + c929e16 commit 2966a54
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 11 deletions.
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ repos:
rev: v4.1.0
hooks:
- id: trailing-whitespace
exclude: ^vendor/|^tests/.*/fixtures/.*
exclude: ^vendor/|^tests/.*/fixtures/.*|^tests/integration/test_data/.*
- id: end-of-file-fixer
exclude: ^vendor/|^tests/.*/fixtures/.*
exclude: ^vendor/|^tests/.*/fixtures/.*|^tests/integration/test_data/.*
- id: debug-statements

# https://pre-commit.com/#repository-local-hooks
Expand Down
18 changes: 12 additions & 6 deletions karapace/schema_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,9 @@ def export(self, export_func) -> None:
LOG.info("Schema export written to stdout")
self.close()

def encode_key(self, key: Union[JsonData, str]) -> bytes:
def encode_key(self, key: Optional[Union[JsonData, str]]) -> bytes:
if not key:
return b""
if not self.key_formatter:
if isinstance(key, str):
return key.encode("utf8")
Expand All @@ -258,9 +260,13 @@ def encode_value(value: Union[JsonData, str]) -> Optional[bytes]:
return json_encode(value, sort_keys=False, binary=True)


def serialize_schema_message(key_bytes: bytes, value_bytes: bytes) -> str:
key = base64.b16encode(key_bytes).decode("utf8")
value = base64.b16encode(value_bytes).decode("utf8")
def serialize_record(key_bytes: Optional[bytes], value_bytes: Optional[bytes]) -> str:
key = b""
if key_bytes is not None:
key = base64.b16encode(key_bytes).decode("utf8")
value = b""
if value_bytes is not None:
value = base64.b16encode(value_bytes).decode("utf8")
return f"{key}\t{value}\n"


Expand All @@ -281,7 +287,7 @@ def anonymize_avro_schema_message(key_bytes: bytes, value_bytes: bytes) -> str:
# The schemas topic contain all changes to schema metadata.
if key.get("subject", None):
key["subject"] = anonymize_avro.anonymize_name(key["subject"])
return serialize_schema_message(json.dumps(key).encode("utf8"), json.dumps(value).encode("utf8"))
return serialize_record(json.dumps(key).encode("utf8"), json.dumps(value).encode("utf8"))


def parse_args():
Expand Down Expand Up @@ -310,7 +316,7 @@ def main() -> int:
sb = SchemaBackup(config, args.location, args.topic)

if args.command == "get":
sb.export(serialize_schema_message)
sb.export(serialize_record)
return 0
if args.command == "restore":
sb.restore_backup()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
/V2
6B65795F6F6E65 76616C75655F6F6E65
6B65795F74776F 76616C75655F74776F
76616C75655F7468726565
6B65795F666F7572
1 change: 1 addition & 0 deletions tests/integration/test_data/test_restore_v2.log
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/V2
7B0A20202020227375626A656374223A20227375626A6563742D31222C0A202020202276657273696F6E223A20312C0A20202020226D61676963223A20312C0A20202020226B657974797065223A2022534348454D41220A7D 7B0A202020202264656C65746564223A2066616C73652C0A20202020226964223A20312C0A2020202022736368656D61223A20225C22737472696E675C22222C0A20202020227375626A656374223A20227375626A6563742D31222C0A202020202276657273696F6E223A20310A7D
7B0A20202020227375626A656374223A20227375626A6563742D31222C0A202020202276657273696F6E223A20322C0A20202020226D61676963223A20312C0A20202020226B657974797065223A2022534348454D41220A7D 7B0A202020202264656C65746564223A2066616C73652C0A20202020226964223A20322C0A2020202022736368656D61223A20225C22737472696E675C22222C0A20202020227375626A656374223A20227375626A6563742D31222C0A202020202276657273696F6E223A20320A7D
7B0A20202020227375626A656374223A20227375626A6563742D31222C0A202020202276657273696F6E223A20322C0A20202020226D61676963223A20312C0A20202020226B657974797065223A2022534348454D41220A7D
42 changes: 39 additions & 3 deletions tests/integration/test_schema_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from karapace.client import Client
from karapace.config import set_config_defaults
from karapace.key_format import is_key_in_canonical_format
from karapace.schema_backup import SchemaBackup, serialize_schema_message
from karapace.schema_backup import SchemaBackup, serialize_record
from karapace.utils import Expiration
from pathlib import Path
from tests.integration.utils.cluster import RegistryDescription
Expand Down Expand Up @@ -51,7 +51,7 @@ async def test_backup_get(
}
)
sb = SchemaBackup(config, str(backup_location))
sb.export(serialize_schema_message)
sb.export(serialize_record)

# The backup file has been created
assert os.path.exists(backup_location)
Expand All @@ -68,6 +68,43 @@ async def test_backup_get(
assert lines == 1


async def test_backup_restore_and_get_non_schema_topic(
kafka_servers: KafkaServers,
tmp_path: Path,
) -> None:
test_topic_name = new_random_name("non-schemas")

config = set_config_defaults(
{
"bootstrap_uri": kafka_servers.bootstrap_servers,
"topic_name": test_topic_name,
}
)

# Restore from backup
test_data_path = Path("tests/integration/test_data/")
restore_location = test_data_path / "test_restore_non_schema_topic_v2.log"
sb = SchemaBackup(config, str(restore_location))
sb.restore_backup()

# Get the backup
backup_location = tmp_path / "non_schemas_topic.log"
sb = SchemaBackup(config, str(backup_location))
sb.export(serialize_record)
# The backup file has been created
assert os.path.exists(backup_location)

restore_file_content = None
with open(restore_location, "r", encoding="utf8") as fp:
restore_file_content = fp.read()
backup_file_content = None
with open(backup_location, "r", encoding="utf8") as fp:
backup_file_content = fp.read()
assert restore_file_content is not None
assert backup_file_content is not None
assert restore_file_content == backup_file_content


def _assert_canonical_key_format(
bootstrap_servers: str,
schemas_topic: str,
Expand All @@ -86,7 +123,6 @@ def _assert_canonical_key_format(
while raw_msgs:
for _, messages in raw_msgs.items():
for message in messages:
print(message)
key = json.loads(message.key)
assert is_key_in_canonical_format(key), f"Not in canonical format: {key}"
raw_msgs = consumer.poll()
Expand Down

0 comments on commit 2966a54

Please sign in to comment.