From 3b75dccaa60ef33c91d10933c952b6ed21b06668 Mon Sep 17 00:00:00 2001 From: "Fabiano V. Santos" Date: Wed, 13 Nov 2019 12:16:25 -0300 Subject: [PATCH] Configuration to disable auto schema creation --- docs/schema-registry.rst | 5 ++++ .../serde/AbstractSnapshotSerializer.java | 26 ++++++++++++++++++- 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/docs/schema-registry.rst b/docs/schema-registry.rst index 5b925dc92..0c1a33b68 100644 --- a/docs/schema-registry.rst +++ b/docs/schema-registry.rst @@ -146,6 +146,7 @@ Important settings from the above are config.put(KafkaAvroSerializer.STORE_SCHEMA_VERSION_ID_IN_HEADER, "true"); config.put(KafkaAvroSerde.KEY_SCHEMA_VERSION_ID_HEADER_NAME, "key.schema.version.id"); // optional config.put(KafkaAvroSerde.VALUE_SCHEMA_VERSION_ID_HEADER_NAME, "value.schema.version.id"); // optional + config.put("schema.auto.create", "true"); // optional **store.schema.version.id.in.header** By default, this is set to 'false' to maintain backward compatibility. User needs to enable it to save the schema version @@ -159,6 +160,10 @@ key.serializer is set to 'KafkaAvroSerializer'. Configurable header name to save the Record Value schema version information. This configuration is applicable only when value.serializer is set to 'KafkaAvroSerializer'. +**schema.auto.create** +By default, this is set to 'true' to maintain backward compatibility. User needs to disable it so the client won't try to +create schemas if it does not exists. + Run consumer to retrieve schema and deserialze the messages ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/schema-registry/client/src/main/java/com/hortonworks/registries/schemaregistry/serde/AbstractSnapshotSerializer.java b/schema-registry/client/src/main/java/com/hortonworks/registries/schemaregistry/serde/AbstractSnapshotSerializer.java index af2cb1f01..20cb8e8c5 100644 --- a/schema-registry/client/src/main/java/com/hortonworks/registries/schemaregistry/serde/AbstractSnapshotSerializer.java +++ b/schema-registry/client/src/main/java/com/hortonworks/registries/schemaregistry/serde/AbstractSnapshotSerializer.java @@ -17,12 +17,16 @@ import com.hortonworks.registries.schemaregistry.SchemaIdVersion; import com.hortonworks.registries.schemaregistry.SchemaMetadata; import com.hortonworks.registries.schemaregistry.SchemaVersion; +import com.hortonworks.registries.schemaregistry.SchemaVersionInfo; import com.hortonworks.registries.schemaregistry.client.ISchemaRegistryClient; import com.hortonworks.registries.schemaregistry.errors.IncompatibleSchemaException; import com.hortonworks.registries.schemaregistry.errors.InvalidSchemaException; import com.hortonworks.registries.schemaregistry.errors.SchemaBranchNotFoundException; import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException; import com.hortonworks.registries.schemaregistry.exceptions.RegistryException; +import org.apache.commons.lang3.BooleanUtils; + +import java.util.Map; /** * This class implements {@link SnapshotSerializer} and internally creates schema registry client to connect to the @@ -36,6 +40,10 @@ */ public abstract class AbstractSnapshotSerializer extends AbstractSerDes implements SnapshotSerializer { + private static final String AUTO_CREATE_SCHEMA = "schema.auto.create"; + + private boolean autoCreateSchema = true; + public AbstractSnapshotSerializer() { } @@ -43,6 +51,15 @@ public AbstractSnapshotSerializer(ISchemaRegistryClient schemaRegistryClient) { super(schemaRegistryClient); } + @SuppressWarnings("unchecked") + @Override + protected void doInit(Map config) { + super.doInit(config); + + this.autoCreateSchema = BooleanUtils.toBoolean(((Map) config) + .getOrDefault(AUTO_CREATE_SCHEMA, "true").toString()); + } + @Override public final O serialize(I input, SchemaMetadata schemaMetadata) throws SerDesException { ensureInitialized(); @@ -51,8 +68,15 @@ public final O serialize(I input, SchemaMetadata schemaMetadata) throws SerDesEx String schema = getSchemaText(input); // register that schema and get the version + SchemaIdVersion schemaIdVersion; try { - SchemaIdVersion schemaIdVersion = schemaRegistryClient.addSchemaVersion(schemaMetadata, new SchemaVersion(schema, "Schema registered by serializer:" + this.getClass())); + if(autoCreateSchema){ + schemaIdVersion = schemaRegistryClient.addSchemaVersion(schemaMetadata, + new SchemaVersion(schema, "Schema registered by serializer:" + this.getClass())); + } else { + SchemaVersionInfo info = schemaRegistryClient.getLatestSchemaVersionInfo(schemaMetadata.getName()); + schemaIdVersion = new SchemaIdVersion(info.getSchemaMetadataId(), info.getVersion()); + } // write the version and given object to the output return doSerialize(input, schemaIdVersion); } catch (SchemaNotFoundException | IncompatibleSchemaException | InvalidSchemaException | SchemaBranchNotFoundException e) {