Python Rest Client to interact against schema-registry confluent server to manage Avro Schemas resources.
python 3.6+
pip install python-schema-registry-client
If you want the Faust
functionality:
pip install python-schema-registry-client[faust]
Note that this will automatically add a dependency on the faust-streaming fork of faust. If you want to use the
old faust version, simply install it manually and then install python-schema-registry-client
without the faust
extra enabled, the functionality will
be the same.
Documentation: https://marcosschroh.github.io/python-schema-registry-client.io
from schema_registry.client import SchemaRegistryClient, schema
client = SchemaRegistryClient(url="http://127.0.0.1:8081")
deployment_schema = {
"type": "record",
"namespace": "com.kubertenes",
"name": "AvroDeployment",
"fields": [
{"name": "image", "type": "string"},
{"name": "replicas", "type": "int"},
{"name": "port", "type": "int"},
],
}
avro_schema = schema.AvroSchema(deployment_schema)
schema_id = client.register("test-deployment", avro_schema)
or async
from schema_registry.client import AsyncSchemaRegistryClient, schema
async_client = AsyncSchemaRegistryClient(url="http://127.0.0.1:8081")
deployment_schema = {
"type": "record",
"namespace": "com.kubertenes",
"name": "AvroDeployment",
"fields": [
{"name": "image", "type": "string"},
{"name": "replicas", "type": "int"},
{"name": "port", "type": "int"},
],
}
avro_schema = schema.AvroSchema(deployment_schema)
schema_id = await async_client.register("test-deployment", avro_schema)
You can generate the avro schema
directely from a python class using dataclasses-avroschema
and use it in the API for register schemas
, check versions
and test compatibility
:
import dataclasses
from dataclasses_avroschema import AvroModel, types
from schema_registry.client import SchemaRegistryClient
client = SchemaRegistryClient(url="http://127.0.0.1:8081")
@dataclasses.dataclass
class UserAdvance(AvroModel):
name: str
age: int
pets: typing.List[str] = dataclasses.field(default_factory=lambda: ["dog", "cat"])
accounts: typing.Dict[str, int] = dataclasses.field(default_factory=lambda: {"key": 1})
has_car: bool = False
favorite_colors: types.Enum = types.Enum(["BLUE", "YELLOW", "GREEN"], default="BLUE")
country: str = "Argentina"
address: str = None
# register the schema
schema_id = client.register(subject, UserAdvance.avro_schema())
print(schema_id)
# >>> 12
result = client.check_version(subject, UserAdvance.avro_schema())
print(result)
# >>> SchemaVersion(subject='dataclasses-avroschema-subject-2', schema_id=12, schema=1, version={"type":"record" ...')
compatibility = client.test_compatibility(subject, UserAdvance.avro_schema())
print(compatibility)
# >>> True
Usually, we have a situacion like this:
So, our producers/consumers have to serialize/deserialize messages every time that they send/receive from Kafka topics. In this picture, we can imagine a Faust
application receiving messages (encoded with an Avro schema) and we want to deserialize them, so we can ask the schema server
to do that for us. In this scenario, the MessageSerializer
is perfect.
Also, could be a use case that we would like to have an Application only to administrate Avro Schemas
(register, update compatibilities, delete old schemas, etc.), so the SchemaRegistryClient
is perfect.
Install the project and development utilities in edit mode:
pip3 install -e ".[tests,docs,faust]
The tests are run against the Schema Server
using docker compose
, so you will need
Docker
and Docker Compose
installed.
./scripts/test
Run code linting:
./scripts/lint
To perform tests using the python shell you can execute docker-compose up
and the schema registry server
will run on http://127.0.0.1:8081
, the you can interact against it using the SchemaRegistryClient
:
from schema_registry.client import SchemaRegistryClient, schema
client = SchemaRegistryClient(url="http://127.0.0.1:8081")
# do some operations with the client...