diff --git a/schema-registry/schema-registry-common/src/main/java/com/hortonworks/registries/schemaregistry/cache/AbstractCache.java b/schema-registry/schema-registry-client/src/main/java/com/hortonworks/registries/schemaregistry/client/AbstractCache.java
similarity index 80%
rename from schema-registry/schema-registry-common/src/main/java/com/hortonworks/registries/schemaregistry/cache/AbstractCache.java
rename to schema-registry/schema-registry-client/src/main/java/com/hortonworks/registries/schemaregistry/client/AbstractCache.java
index d676edd9b..ca8d63f6e 100644
--- a/schema-registry/schema-registry-common/src/main/java/com/hortonworks/registries/schemaregistry/cache/AbstractCache.java
+++ b/schema-registry/schema-registry-client/src/main/java/com/hortonworks/registries/schemaregistry/client/AbstractCache.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2018-2019 Cloudera, Inc.
+ * Copyright 2018-2023 Cloudera, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,7 +14,9 @@
* limitations under the License.
**/
-package com.hortonworks.registries.schemaregistry.cache;
+package com.hortonworks.registries.schemaregistry.client;
+
+import com.hortonworks.registries.schemaregistry.cache.SchemaRegistryCacheType;
public interface AbstractCache {
diff --git a/schema-registry/schema-registry-client/src/main/java/com/hortonworks/registries/schemaregistry/client/SchemaRegistryClient.java b/schema-registry/schema-registry-client/src/main/java/com/hortonworks/registries/schemaregistry/client/SchemaRegistryClient.java
index e44bbefdf..e3fe548f4 100644
--- a/schema-registry/schema-registry-client/src/main/java/com/hortonworks/registries/schemaregistry/client/SchemaRegistryClient.java
+++ b/schema-registry/schema-registry-client/src/main/java/com/hortonworks/registries/schemaregistry/client/SchemaRegistryClient.java
@@ -45,7 +45,6 @@
import com.hortonworks.registries.schemaregistry.SchemaVersionRetriever;
import com.hortonworks.registries.schemaregistry.SerDesInfo;
import com.hortonworks.registries.schemaregistry.SerDesPair;
-import com.hortonworks.registries.schemaregistry.cache.SchemaVersionInfoCache;
import com.hortonworks.registries.schemaregistry.errors.IncompatibleSchemaException;
import com.hortonworks.registries.schemaregistry.errors.InvalidSchemaBranchDeletionException;
import com.hortonworks.registries.schemaregistry.errors.InvalidSchemaException;
@@ -339,19 +338,20 @@ private SchemaMetadataCache createSchemaMetadataCache() {
private SchemaVersionInfoCache createSchemaVersionInfoCache() {
return new SchemaVersionInfoCache(
- new SchemaVersionRetriever() {
- @Override
- public SchemaVersionInfo retrieveSchemaVersion(SchemaVersionKey key) throws SchemaNotFoundException {
- return doGetSchemaVersionInfo(key);
- }
+ new SchemaVersionRetriever() {
+ @Override
+ public SchemaVersionInfo retrieveSchemaVersion(SchemaVersionKey key) throws SchemaNotFoundException {
+ return doGetSchemaVersionInfo(key);
+ }
- @Override
- public SchemaVersionInfo retrieveSchemaVersion(SchemaIdVersion key) throws SchemaNotFoundException {
- return doGetSchemaVersionInfo(key);
- }
- },
- ((Number) configuration.getValue(Configuration.SCHEMA_VERSION_CACHE_SIZE.name())).intValue(),
- ((Number) configuration.getValue(Configuration.SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS.name())).longValue() * 1000L
+ @Override
+ public SchemaVersionInfo retrieveSchemaVersion(SchemaIdVersion key) throws SchemaNotFoundException {
+ return doGetSchemaVersionInfo(key);
+ }
+ },
+ createSchemaMetadataFetcher(),
+ ((Number) configuration.getValue(Configuration.SCHEMA_VERSION_CACHE_SIZE.name())).intValue(),
+ ((Number) configuration.getValue(Configuration.SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS.name())).longValue() * 1000L
);
}
diff --git a/schema-registry/schema-registry-client/src/main/java/com/hortonworks/registries/schemaregistry/client/SchemaVersionInfoCache.java b/schema-registry/schema-registry-client/src/main/java/com/hortonworks/registries/schemaregistry/client/SchemaVersionInfoCache.java
new file mode 100644
index 000000000..72f18a9ae
--- /dev/null
+++ b/schema-registry/schema-registry-client/src/main/java/com/hortonworks/registries/schemaregistry/client/SchemaVersionInfoCache.java
@@ -0,0 +1,318 @@
+/**
+ * Copyright 2016-2023 Cloudera, Inc.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package com.hortonworks.registries.schemaregistry.client;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.hortonworks.registries.schemaregistry.SchemaIdVersion;
+import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo;
+import com.hortonworks.registries.schemaregistry.SchemaVersionInfo;
+import com.hortonworks.registries.schemaregistry.SchemaVersionKey;
+import com.hortonworks.registries.schemaregistry.SchemaVersionRetriever;
+import com.hortonworks.registries.schemaregistry.cache.SchemaRegistryCacheType;
+import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException;
+import com.hortonworks.registries.shaded.javax.ws.rs.NotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckForNull;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.cache.RemovalCause.REPLACED;
+
+/**
+ * Loading cache for {@link Key} with values {@link SchemaVersionInfo}.
+ */
+public class SchemaVersionInfoCache implements AbstractCache {
+ private static final Logger LOG = LoggerFactory.getLogger(SchemaVersionInfoCache.class);
+
+ private final Cache cache;
+ private final ConcurrentMap metadataNameVersion2Id;
+ private final ConcurrentMap metadataIdVersion2Id;
+ private final SchemaVersionRetriever schemaVersionRetriever;
+
+ private final SchemaMetadataCache.SchemaMetadataFetcher schemaMetadataFetcher;
+
+ SchemaVersionInfoCache(final SchemaVersionRetriever schemaVersionRetriever,
+ final SchemaMetadataCache.SchemaMetadataFetcher schemaMetadataFetcher,
+ final int schemaCacheSize,
+ final long schemaCacheExpiryInMilliSecs,
+ ConcurrentMap metadataNameVersion2Id,
+ ConcurrentMap metadataIdVersion2Id
+ ) {
+ this.metadataNameVersion2Id = metadataNameVersion2Id;
+ this.metadataIdVersion2Id = metadataIdVersion2Id;
+ this.schemaVersionRetriever = schemaVersionRetriever;
+ this.schemaMetadataFetcher = schemaMetadataFetcher;
+ cache = createCache(schemaCacheSize, schemaCacheExpiryInMilliSecs);
+ }
+
+ public SchemaVersionInfoCache(final SchemaVersionRetriever schemaVersionRetriever,
+ final SchemaMetadataCache.SchemaMetadataFetcher schemaMetadataFetcher,
+ final int schemaCacheSize,
+ final long schemaCacheExpiryInMilliSecs) {
+ this(schemaVersionRetriever, schemaMetadataFetcher, schemaCacheSize, schemaCacheExpiryInMilliSecs,
+ new ConcurrentHashMap<>(schemaCacheSize),
+ new ConcurrentHashMap<>(schemaCacheSize));
+ }
+
+ public SchemaVersionInfo getSchema(SchemaVersionInfoCache.Key key) throws SchemaNotFoundException {
+ try {
+ LOG.debug("Trying to load entry for cache with key [{}] from target service", key);
+ SchemaVersionInfo schemaVersionInfo = null;
+ if (key.schemaVersionKey != null) {
+ schemaVersionInfo = getSchemaVersionInfoFromSchemaVersionKey(key.schemaVersionKey);
+ } else if (key.schemaIdVersion != null) {
+ schemaVersionInfo = getSchemaVersionInfoFromSchemaIdVersion(key.schemaIdVersion);
+ } else {
+ throw new IllegalArgumentException("Given argument is not valid: " + key);
+ }
+ LOG.trace("Result: {}", schemaVersionInfo);
+ return schemaVersionInfo;
+ } catch (SchemaNotFoundException snfe) {
+ if (key.schemaVersionKey != null) {
+ throw new SchemaNotFoundException(key.schemaVersionKey.toString(), snfe.getMessage());
+ } else {
+ throw snfe;
+ }
+ } catch (NotFoundException nfe) {
+ throw new SchemaNotFoundException(key.toString(), nfe);
+ }
+ }
+
+ @CheckForNull
+ public SchemaVersionInfo getSchemaIfPresent(SchemaVersionInfoCache.Key key) {
+ LOG.debug("Trying to get entry from cache if it is present in local cache with key [{}]", key);
+ Long id = null;
+ if (key.schemaVersionKey != null) {
+ id = metadataNameVersion2Id.getOrDefault(key.schemaVersionKey, null);
+ } else if (key.schemaIdVersion != null) {
+ id = getIdFromSchemaIdVersionOrCache(key.schemaIdVersion);
+ }
+ if (id != null) {
+ return cache.getIfPresent(id);
+ }
+ return null;
+ }
+
+ public void invalidateSchema(SchemaVersionInfoCache.Key key) {
+ LOG.debug("Invalidating cache entry for key [{}]", key);
+ if (key.schemaVersionKey != null) {
+ invalidateBySchemaVersionKey(key.schemaVersionKey);
+ } else if (key.schemaIdVersion != null) {
+ invalidateBySchemaIdVersion(key.schemaIdVersion);
+ } else {
+ throw new IllegalArgumentException("Given argument is not valid: " + key);
+ }
+ }
+
+ public void invalidateAll() {
+ cache.invalidateAll();
+ }
+
+ @Override
+ public SchemaRegistryCacheType getCacheType() {
+ return SchemaRegistryCacheType.SCHEMA_VERSION_CACHE;
+ }
+
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ public static class Key {
+
+ @JsonProperty
+ private SchemaVersionKey schemaVersionKey;
+
+ @JsonProperty
+ private SchemaIdVersion schemaIdVersion;
+
+ public Key(SchemaVersionKey schemaVersionKey) {
+ this.schemaVersionKey = schemaVersionKey;
+ }
+
+ public Key(SchemaIdVersion schemaIdVersion) {
+ this.schemaIdVersion = schemaIdVersion;
+ }
+
+ public static Key of(SchemaVersionKey schemaVersionKey) {
+ return new Key(schemaVersionKey);
+ }
+
+ public static Key of(SchemaIdVersion schemaIdVersion) {
+ return new Key(schemaIdVersion);
+ }
+
+ // For JSON serialization/deserialization
+ private Key() {
+
+ }
+
+ @Override
+ public String toString() {
+ return "Key {" +
+ "schemaVersionKey=" + schemaVersionKey +
+ ", schemaIdVersion=" + schemaIdVersion +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ Key key = (Key) o;
+
+ if (schemaVersionKey != null ? !schemaVersionKey.equals(key.schemaVersionKey)
+ : key.schemaVersionKey != null) {
+ return false;
+ }
+ return schemaIdVersion != null ? schemaIdVersion.equals(key.schemaIdVersion) : key.schemaIdVersion == null;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = schemaVersionKey != null ? schemaVersionKey.hashCode() : 0;
+ result = 31 * result + (schemaIdVersion != null ? schemaIdVersion.hashCode() : 0);
+ return result;
+ }
+ }
+
+ private SchemaVersionInfo getSchemaVersionInfoFromSchemaIdVersion(SchemaIdVersion schemaIdVersion) throws SchemaNotFoundException {
+ Long id = getIdFromSchemaIdVersionOrCache(schemaIdVersion);
+ if (id != null) {
+ SchemaVersionInfo schemaVersionInfo = cache.getIfPresent(id);
+ if (schemaVersionInfo != null) {
+ return schemaVersionInfo;
+ }
+ }
+ SchemaVersionInfo schemaVersionInfo = schemaVersionRetriever.retrieveSchemaVersion(schemaIdVersion);
+ schemaVersionInfo = updateSchemaVersionInfoIfSchemaMetadataIdIsNull(schemaVersionInfo, schemaIdVersion.getSchemaMetadataId());
+ updateCaches(schemaVersionInfo, createSchemaVersionKeyFromSchemaVersionInfo(schemaVersionInfo));
+ return schemaVersionInfo;
+ }
+
+ private SchemaVersionInfo updateSchemaVersionInfoIfSchemaMetadataIdIsNull(SchemaVersionInfo schemaVersionInfo,
+ Long schemaMetadataId) throws SchemaNotFoundException {
+ if (schemaVersionInfo.getSchemaMetadataId() != null) {
+ return schemaVersionInfo;
+ }
+ Long metadataId = schemaMetadataId;
+ if (metadataId == null) {
+ SchemaMetadataInfo schemaMetadataInfo = schemaMetadataFetcher.fetch(schemaVersionInfo.getName());
+ if (schemaMetadataInfo == null) {
+ throw new SchemaNotFoundException(schemaVersionInfo.getName(), "Could not find schema.");
+ }
+ metadataId = schemaMetadataInfo.getId();
+ }
+ return new SchemaVersionInfo(schemaVersionInfo.getId(),
+ schemaVersionInfo.getName(),
+ schemaVersionInfo.getVersion(),
+ metadataId,
+ schemaVersionInfo.getSchemaText(),
+ schemaVersionInfo.getTimestamp(),
+ schemaVersionInfo.getDescription(),
+ schemaVersionInfo.getStateId()
+ );
+ }
+
+ private void updateCaches(SchemaVersionInfo schemaVersionInfo, SchemaVersionKey schemaVersionKey) {
+ long id = schemaVersionInfo.getId();
+ metadataNameVersion2Id.put(schemaVersionKey, id);
+ metadataIdVersion2Id.put(createSchemaIdVersionFromSchemaVersionInfo(schemaVersionInfo), id);
+ cache.put(id, schemaVersionInfo);
+ }
+
+ private SchemaVersionInfo getSchemaVersionInfoFromSchemaVersionKey(SchemaVersionKey schemaVersionKey) throws SchemaNotFoundException {
+ SchemaVersionInfo schemaVersionInfo = getSchemaVersionInfoFromCache(schemaVersionKey);
+ if (schemaVersionInfo != null) {
+ return schemaVersionInfo;
+ }
+ schemaVersionInfo = schemaVersionRetriever.retrieveSchemaVersion(schemaVersionKey);
+ schemaVersionInfo = updateSchemaVersionInfoIfSchemaMetadataIdIsNull(schemaVersionInfo, null);
+ updateCaches(schemaVersionInfo, schemaVersionKey);
+
+ return schemaVersionInfo;
+ }
+
+ private SchemaVersionInfo getSchemaVersionInfoFromCache(SchemaVersionKey schemaVersionKey) {
+ Long id = metadataNameVersion2Id.getOrDefault(schemaVersionKey, null);
+ if (id != null) {
+ return cache.getIfPresent(id);
+ }
+ return null;
+ }
+
+ private void invalidateInternalCaches(SchemaVersionInfo schemaVersionInfo) {
+ SchemaVersionKey schemaVersionKey = createSchemaVersionKeyFromSchemaVersionInfo(schemaVersionInfo);
+ LOG.trace("Invalidate schemaVersionKey: {}", schemaVersionKey);
+ metadataNameVersion2Id.remove(schemaVersionKey);
+ SchemaIdVersion schemaIdVersion = createSchemaIdVersionFromSchemaVersionInfo(schemaVersionInfo);
+ LOG.trace("Invalidate schemaIdVersion: {}", schemaIdVersion);
+ metadataIdVersion2Id.remove(schemaIdVersion);
+ }
+
+ private static SchemaIdVersion createSchemaIdVersionFromSchemaVersionInfo(SchemaVersionInfo schemaVersionInfo) {
+ return new SchemaIdVersion(schemaVersionInfo.getSchemaMetadataId(), schemaVersionInfo.getVersion());
+ }
+
+ private static SchemaVersionKey createSchemaVersionKeyFromSchemaVersionInfo(SchemaVersionInfo schemaVersionInfo) {
+ return new SchemaVersionKey(schemaVersionInfo.getName(), schemaVersionInfo.getVersion());
+ }
+
+ private Cache createCache(int schemaCacheSize,
+ long schemaCacheExpiryInMilliSecs) {
+ return CacheBuilder.newBuilder()
+ .maximumSize(schemaCacheSize)
+ .expireAfterAccess(schemaCacheExpiryInMilliSecs, TimeUnit.MILLISECONDS)
+ .removalListener((RemovalListener) notification -> {
+ if (notification.getCause() != REPLACED) {
+ SchemaVersionInfo id = notification.getValue();
+ if (id != null) {
+ invalidateInternalCaches(id);
+ }
+ }
+ })
+ .build();
+ }
+
+ private void invalidateBySchemaIdVersion(SchemaIdVersion key) {
+ Long id = getIdFromSchemaIdVersionOrCache(key);
+ if (id != null) {
+ cache.invalidate(id);
+ }
+ }
+
+ private Long getIdFromSchemaIdVersionOrCache(SchemaIdVersion key) {
+ Long id = key.getSchemaVersionId();
+ if (id == null) {
+ id = metadataIdVersion2Id.getOrDefault(key, null);
+ }
+ return id;
+ }
+
+ private void invalidateBySchemaVersionKey(SchemaVersionKey key) {
+ Long id = metadataNameVersion2Id.getOrDefault(key, null);
+ if (id != null) {
+ cache.invalidate(id);
+ }
+ }
+}
diff --git a/schema-registry/schema-registry-client/src/test/java/com/hortonworks/registries/schemaregistry/client/SchemaVersionInfoCacheTest.java b/schema-registry/schema-registry-client/src/test/java/com/hortonworks/registries/schemaregistry/client/SchemaVersionInfoCacheTest.java
new file mode 100644
index 000000000..3313aedfe
--- /dev/null
+++ b/schema-registry/schema-registry-client/src/test/java/com/hortonworks/registries/schemaregistry/client/SchemaVersionInfoCacheTest.java
@@ -0,0 +1,318 @@
+/*
+ * Copyright 2016-2019 Cloudera, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.hortonworks.registries.schemaregistry.client;
+
+import com.google.common.collect.Sets;
+import com.hortonworks.registries.schemaregistry.SchemaIdVersion;
+import com.hortonworks.registries.schemaregistry.SchemaMetadata;
+import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo;
+import com.hortonworks.registries.schemaregistry.SchemaVersionInfo;
+import com.hortonworks.registries.schemaregistry.SchemaVersionKey;
+import com.hortonworks.registries.schemaregistry.SchemaVersionRetriever;
+import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException;
+import com.hortonworks.registries.schemaregistry.state.SchemaVersionLifecycleStates;
+import com.hortonworks.registries.shaded.javax.ws.rs.NotFoundException;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+/**
+ *
+ */
+public class SchemaVersionInfoCacheTest {
+
+ @Test
+ public void testSchemaVersionCache() throws Exception {
+
+ final Map schemaIdWithVersionInfo = new HashMap<>();
+ final Map schemaKeyWithVersionInfo = new HashMap<>();
+
+ long schemaMetadataId = 1L;
+ String schemaName = "schema-1";
+ Integer version = 2;
+ long schemaVersionId = 3L;
+ SchemaVersionInfo schemaVersionInfo =
+ new SchemaVersionInfo(schemaVersionId, schemaName, version, schemaMetadataId, "schema-text",
+ System.currentTimeMillis(), "schema-description", SchemaVersionLifecycleStates.ENABLED.getId());
+ SchemaIdVersion withVersionId = new SchemaIdVersion(schemaVersionId);
+ SchemaIdVersion withMetaIdAndVersion = new SchemaIdVersion(schemaMetadataId, version);
+ SchemaIdVersion withBoth = new SchemaIdVersion(schemaMetadataId, version, schemaVersionId);
+ SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version);
+ schemaKeyWithVersionInfo.put(schemaVersionKey, schemaVersionInfo);
+ HashSet allIdVersions = Sets.newHashSet(withVersionId, withMetaIdAndVersion, withBoth);
+ allIdVersions.stream().forEach(x -> schemaIdWithVersionInfo.put(x, schemaVersionInfo));
+
+ SchemaVersionInfo otherSchemaVersionInfo = new SchemaVersionInfo(schemaVersionId + 1, "other-" + schemaName, version, schemaMetadataId + 1L,
+ "other-schema-text", System.currentTimeMillis(), "schema-description", SchemaVersionLifecycleStates.ENABLED.getId());
+ SchemaIdVersion otherIdVersion = new SchemaIdVersion(otherSchemaVersionInfo.getId());
+ SchemaVersionKey otherSchemaVersionKey = new SchemaVersionKey(otherSchemaVersionInfo.getName(), otherSchemaVersionInfo
+ .getVersion());
+ schemaIdWithVersionInfo.put(otherIdVersion, otherSchemaVersionInfo);
+ schemaKeyWithVersionInfo.put(otherSchemaVersionKey, otherSchemaVersionInfo);
+
+
+ SchemaVersionRetriever schemaRetriever = new SchemaVersionRetriever() {
+ @Override
+ public SchemaVersionInfo retrieveSchemaVersion(SchemaVersionKey key) throws SchemaNotFoundException {
+ return schemaKeyWithVersionInfo.get(key);
+ }
+
+ @Override
+ public SchemaVersionInfo retrieveSchemaVersion(SchemaIdVersion key) throws SchemaNotFoundException {
+ return schemaIdWithVersionInfo.get(key);
+ }
+ };
+ SchemaMetadataCache.SchemaMetadataFetcher schemaMetadataFetcher = new SchemaMetadataCache.SchemaMetadataFetcher() {
+ @Override
+ public SchemaMetadataInfo fetch(String name) throws SchemaNotFoundException {
+ return null;
+ }
+
+ @Override
+ public SchemaMetadataInfo fetch(Long id) throws SchemaNotFoundException {
+ return null;
+ }
+ };
+
+ SchemaVersionInfoCache schemaVersionInfoCache = new SchemaVersionInfoCache(schemaRetriever, schemaMetadataFetcher, 32, 60 * 60 * 1000L);
+
+ // invalidate key without accessing earlier.
+ for (SchemaVersionKey versionKey : schemaKeyWithVersionInfo.keySet()) {
+ schemaVersionInfoCache.invalidateSchema(SchemaVersionInfoCache.Key.of(versionKey));
+ }
+
+ for (SchemaIdVersion schemaIdVersion : schemaIdWithVersionInfo.keySet()) {
+ schemaVersionInfoCache.invalidateSchema(SchemaVersionInfoCache.Key.of(schemaIdVersion));
+ }
+
+ // access with all version id keys
+ for (SchemaIdVersion key : allIdVersions) {
+ SchemaVersionInfo recvdSchemaVersionInfo = schemaVersionInfoCache.getSchema(SchemaVersionInfoCache.Key.of(key));
+ Assertions.assertEquals(schemaVersionInfo, recvdSchemaVersionInfo);
+ }
+
+ // access with version key
+ SchemaVersionInfo recvdSchemaVersionInfo = schemaVersionInfoCache.getSchema(SchemaVersionInfoCache.Key.of(schemaVersionKey));
+ Assertions.assertEquals(schemaVersionInfo, recvdSchemaVersionInfo);
+
+ // invalidate one kind of schemaIdVersion, and all other combinations should return null
+ schemaVersionInfoCache.invalidateSchema(SchemaVersionInfoCache.Key.of(withVersionId));
+ for (SchemaIdVersion idVersion : allIdVersions) {
+ recvdSchemaVersionInfo = schemaVersionInfoCache.getSchemaIfPresent(SchemaVersionInfoCache.Key.of(idVersion));
+ Assertions.assertNull(recvdSchemaVersionInfo);
+ }
+
+ SchemaVersionInfo recvdOtherSchemaVersionInfo = schemaVersionInfoCache.getSchema(SchemaVersionInfoCache.Key.of(otherSchemaVersionKey));
+ Assertions.assertEquals(otherSchemaVersionInfo, recvdOtherSchemaVersionInfo);
+ recvdOtherSchemaVersionInfo = schemaVersionInfoCache.getSchema(SchemaVersionInfoCache.Key.of(otherIdVersion));
+ Assertions.assertEquals(otherSchemaVersionInfo, recvdOtherSchemaVersionInfo);
+
+ // all values for these keys should be non existent in cache
+ for (SchemaIdVersion idVersion : allIdVersions) {
+ Assertions.assertNull(schemaVersionInfoCache.getSchemaIfPresent(SchemaVersionInfoCache.Key.of(idVersion)));
+ }
+
+ // all values for these keys should be loaded
+ for (SchemaIdVersion idVersion : allIdVersions) {
+ Assertions.assertEquals(schemaVersionInfo, schemaVersionInfoCache.getSchema(SchemaVersionInfoCache.Key.of(idVersion)));
+ }
+
+ // all values for these keys should exist locally without loading from target
+ for (SchemaIdVersion idVersion : allIdVersions) {
+ Assertions.assertEquals(schemaVersionInfo, schemaVersionInfoCache.getSchemaIfPresent(SchemaVersionInfoCache.Key.of(idVersion)));
+ }
+
+ }
+
+ @Test
+ public void getSchemaBySchemaIdVersion() throws Exception {
+ SchemaVersionRetriever mySchemaRetriever = Mockito.mock(SchemaVersionRetriever.class);
+ when(mySchemaRetriever.retrieveSchemaVersion(any(SchemaIdVersion.class))).thenReturn(createSchemaVersionInfo());
+ SchemaVersionInfoCache schemaVersionInfoCache = new SchemaVersionInfoCache(mySchemaRetriever, null, 32,
+ 60 * 1000L);
+
+ SchemaVersionInfo result = schemaVersionInfoCache.getSchema(
+ SchemaVersionInfoCache.Key.of(new SchemaIdVersion(15L, 6)));
+
+ verify(mySchemaRetriever).retrieveSchemaVersion(any(SchemaIdVersion.class));
+ verifyNoMoreInteractions(mySchemaRetriever);
+ assertSchemaVersionInfo(result);
+ }
+
+ @Test
+ public void getSchemaBySchemaVersionKey() throws Exception {
+ SchemaVersionRetriever mySchemaRetriever = Mockito.mock(SchemaVersionRetriever.class);
+ when(mySchemaRetriever.retrieveSchemaVersion(any(SchemaVersionKey.class))).thenReturn(createSchemaVersionInfo());
+ SchemaVersionInfoCache schemaVersionInfoCache = new SchemaVersionInfoCache(mySchemaRetriever, null, 32,
+ 60 * 1000L);
+
+ SchemaVersionInfo result = schemaVersionInfoCache.getSchema(
+ SchemaVersionInfoCache.Key.of(new SchemaVersionKey("schema-name", 6)));
+
+ verify(mySchemaRetriever).retrieveSchemaVersion(any(SchemaVersionKey.class));
+ verifyNoMoreInteractions(mySchemaRetriever);
+ assertSchemaVersionInfo(result);
+ }
+
+ @Test
+ public void getSchemaVersionInfoFromCache() throws Exception {
+ SchemaVersionRetriever mySchemaRetriever = Mockito.mock(SchemaVersionRetriever.class);
+ when(mySchemaRetriever.retrieveSchemaVersion(any(SchemaVersionKey.class))).thenReturn(createSchemaVersionInfo());
+ SchemaVersionInfoCache schemaVersionInfoCache = new SchemaVersionInfoCache(mySchemaRetriever, null, 32,
+ 60 * 1000L);
+
+ // fill up the cache from remote
+ schemaVersionInfoCache.getSchema(
+ SchemaVersionInfoCache.Key.of(new SchemaVersionKey("schema-name", 6)));
+ // get schemaversioninfo from the cache using different inputs
+ SchemaVersionInfo result = schemaVersionInfoCache.getSchema(
+ SchemaVersionInfoCache.Key.of(new SchemaVersionKey("schema-name", 6)));
+ SchemaVersionInfo resultFromSchemaVersionId = schemaVersionInfoCache.getSchema(
+ SchemaVersionInfoCache.Key.of(new SchemaIdVersion(15L)));
+ SchemaVersionInfo resultFromSchemaMetadataVersion = schemaVersionInfoCache.getSchema(
+ SchemaVersionInfoCache.Key.of(new SchemaIdVersion(5L, 6)));
+
+ verify(mySchemaRetriever).retrieveSchemaVersion(any(SchemaVersionKey.class));
+ verifyNoMoreInteractions(mySchemaRetriever);
+ assertSchemaVersionInfo(result);
+ assertSchemaVersionInfo(resultFromSchemaVersionId);
+ assertSchemaVersionInfo(resultFromSchemaMetadataVersion);
+ }
+
+ @Test
+ public void getSchemaVersionInfoWithoutMetadataId() throws Exception {
+ SchemaVersionRetriever mySchemaRetriever = Mockito.mock(SchemaVersionRetriever.class);
+ SchemaMetadataCache.SchemaMetadataFetcher schemaMetadataFetcher = Mockito.mock(SchemaMetadataCache.SchemaMetadataFetcher.class);
+ when(mySchemaRetriever.retrieveSchemaVersion(any(SchemaIdVersion.class))).thenReturn(createSchemaVersionInfoWithoutMetadataId());
+ when(schemaMetadataFetcher.fetch("schema-name")).thenReturn(new SchemaMetadataInfo(
+ new SchemaMetadata.Builder("schema-metadata-name").type("type1").build(),
+ 5L, 0L));
+ SchemaVersionInfoCache schemaVersionInfoCache = new SchemaVersionInfoCache(mySchemaRetriever, schemaMetadataFetcher, 32,
+ 60 * 1000L);
+
+ SchemaVersionInfo result = schemaVersionInfoCache.getSchema(
+ SchemaVersionInfoCache.Key.of(new SchemaIdVersion(15L)));
+
+ verify(mySchemaRetriever).retrieveSchemaVersion(any(SchemaIdVersion.class));
+ verify(schemaMetadataFetcher).fetch("schema-name");
+ verifyNoMoreInteractions(mySchemaRetriever);
+ assertSchemaVersionInfo(result);
+ }
+ @Test
+ public void getSchemaThrowsSchemaNotFoundException() throws Exception {
+ SchemaVersionRetriever mySchemaRetriever = Mockito.mock(SchemaVersionRetriever.class);
+ SchemaVersionKey versionKey = new SchemaVersionKey("schema-1", 1);
+ when(mySchemaRetriever.retrieveSchemaVersion(versionKey)).thenThrow(new SchemaNotFoundException("Schema not found.", "entity"));
+ SchemaVersionInfoCache schemaVersionInfoCache = new SchemaVersionInfoCache(mySchemaRetriever, null, 32,
+ 60 * 1000L);
+
+ SchemaNotFoundException thrown = Assertions
+ .assertThrows(SchemaNotFoundException.class, () -> {
+ schemaVersionInfoCache.getSchema(SchemaVersionInfoCache.Key.of(versionKey));
+ }, "NumberFormatException error was expected");
+
+ Assertions.assertEquals(versionKey.toString(), thrown.getMessage());
+ }
+
+ @Test
+ public void getSchemaThrowsNotFoundException() throws Exception {
+ SchemaVersionRetriever mySchemaRetriever = Mockito.mock(SchemaVersionRetriever.class);
+ SchemaVersionKey versionKey = new SchemaVersionKey("schema-1", 1);
+ when(mySchemaRetriever.retrieveSchemaVersion(versionKey)).thenThrow(new NotFoundException("HTTP 404."));
+ SchemaVersionInfoCache schemaVersionInfoCache = new SchemaVersionInfoCache(mySchemaRetriever, null, 32,
+ 60 * 1000L);
+
+ SchemaNotFoundException thrown = Assertions
+ .assertThrows(SchemaNotFoundException.class, () -> {
+ schemaVersionInfoCache.getSchema(SchemaVersionInfoCache.Key.of(versionKey));
+ }, "NumberFormatException error was expected");
+
+ Assertions.assertEquals(SchemaVersionInfoCache.Key.of(versionKey).toString(), thrown.getMessage());
+ }
+
+ private static SchemaVersionInfo createSchemaVersionInfo() {
+ return new SchemaVersionInfo(15L,
+ "schema-name", 6, 5L, "text", 0L, "description", null);
+ }
+ private static SchemaVersionInfo createSchemaVersionInfoWithoutMetadataId() {
+ return new SchemaVersionInfo(15L,
+ "schema-name", 6, null, "text", 0L, "description", null);
+ }
+
+ private static void assertSchemaVersionInfo(SchemaVersionInfo result) {
+ Assertions.assertEquals(15L, result.getId());
+ Assertions.assertEquals("schema-name", result.getName());
+ Assertions.assertEquals(6, result.getVersion());
+ Assertions.assertEquals(5L, result.getSchemaMetadataId());
+ Assertions.assertEquals("text", result.getSchemaText());
+ Assertions.assertEquals(0L, result.getTimestamp());
+ Assertions.assertEquals("description", result.getDescription());
+ }
+
+ @Test
+ public void itInvalidatesCaches() throws Exception {
+ ConcurrentMap metadataNameVersion2Id = new ConcurrentHashMap<>();
+ ConcurrentMap metadataIdVersion2Id = new ConcurrentHashMap<>();
+ SchemaVersionRetriever mySchemaRetriever = Mockito.mock(SchemaVersionRetriever.class);
+ when(mySchemaRetriever.retrieveSchemaVersion(any(SchemaVersionKey.class))).thenReturn(createSchemaVersionInfo());
+ SchemaVersionInfoCache schemaVersionInfoCache = new SchemaVersionInfoCache(mySchemaRetriever, null, 32,
+ 60 * 1000L, metadataNameVersion2Id, metadataIdVersion2Id);
+
+
+ schemaVersionInfoCache.getSchema(
+ SchemaVersionInfoCache.Key.of(new SchemaVersionKey("schema-name", 6)));
+
+ Assertions.assertEquals(1, metadataNameVersion2Id.size());
+ Assertions.assertEquals(1, metadataIdVersion2Id.size());
+ schemaVersionInfoCache.invalidateSchema(SchemaVersionInfoCache.Key.of(new SchemaVersionKey("schema-name", 6)));
+ Assertions.assertEquals(0, metadataNameVersion2Id.size());
+ Assertions.assertEquals(0, metadataIdVersion2Id.size());
+ }
+
+ @Test
+ public void itInvalidatesAll() throws Exception {
+ ConcurrentMap metadataNameVersion2Id = new ConcurrentHashMap<>();
+ ConcurrentMap metadataIdVersion2Id = new ConcurrentHashMap<>();
+ SchemaVersionRetriever mySchemaRetriever = Mockito.mock(SchemaVersionRetriever.class);
+ when(mySchemaRetriever.retrieveSchemaVersion(new SchemaVersionKey("schema-name", 6))).thenReturn(createSchemaVersionInfo());
+ when(mySchemaRetriever.retrieveSchemaVersion(new SchemaVersionKey("schema-name2", 7))).thenReturn(new SchemaVersionInfo(16L,
+ "schema-name2", 7, 6L, "text2", 2L, "description2", null));
+ SchemaVersionInfoCache schemaVersionInfoCache = new SchemaVersionInfoCache(mySchemaRetriever, null, 32,
+ 60 * 1000L, metadataNameVersion2Id, metadataIdVersion2Id);
+
+ schemaVersionInfoCache.getSchema(
+ SchemaVersionInfoCache.Key.of(new SchemaVersionKey("schema-name", 6)));
+ schemaVersionInfoCache.getSchema(
+ SchemaVersionInfoCache.Key.of(new SchemaVersionKey("schema-name2", 7)));
+
+ Assertions.assertEquals(2, metadataNameVersion2Id.size());
+ Assertions.assertEquals(2, metadataIdVersion2Id.size());
+ schemaVersionInfoCache.invalidateAll();
+ Assertions.assertEquals(0, metadataNameVersion2Id.size());
+ Assertions.assertEquals(0, metadataIdVersion2Id.size());
+ }
+}
diff --git a/schema-registry/schema-registry-common/src/main/java/com/hortonworks/registries/schemaregistry/cache/SchemaVersionInfoCache.java b/schema-registry/schema-registry-common/src/main/java/com/hortonworks/registries/schemaregistry/cache/SchemaVersionInfoCache.java
deleted file mode 100644
index bb27310dd..000000000
--- a/schema-registry/schema-registry-common/src/main/java/com/hortonworks/registries/schemaregistry/cache/SchemaVersionInfoCache.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/**
- * Copyright 2016-2023 Cloudera, Inc.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-package com.hortonworks.registries.schemaregistry.cache;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.UncheckedExecutionException;
-import com.hortonworks.registries.schemaregistry.SchemaIdVersion;
-import com.hortonworks.registries.schemaregistry.SchemaVersionInfo;
-import com.hortonworks.registries.schemaregistry.SchemaVersionKey;
-import com.hortonworks.registries.schemaregistry.SchemaVersionRetriever;
-import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException;
-import com.hortonworks.registries.shaded.javax.ws.rs.NotFoundException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Loading cache for {@link Key} with values {@link SchemaVersionInfo}.
- */
-public class SchemaVersionInfoCache implements AbstractCache {
- private static final Logger LOG = LoggerFactory.getLogger(SchemaVersionInfoCache.class);
-
- private final LoadingCache loadingCache;
- private final ConcurrentMap idWithNameVersion;
- private final ConcurrentMap> nameVersionWithIds;
-
- public SchemaVersionInfoCache(final SchemaVersionRetriever schemaRetriever,
- final int schemaCacheSize,
- final long schemaCacheExpiryInMilliSecs) {
- idWithNameVersion = new ConcurrentHashMap<>(schemaCacheSize);
- nameVersionWithIds = new ConcurrentHashMap<>(schemaCacheSize);
- loadingCache = createLoadingCache(schemaRetriever, schemaCacheSize, schemaCacheExpiryInMilliSecs);
- }
-
- private LoadingCache createLoadingCache(SchemaVersionRetriever schemaRetriever,
- int schemaCacheSize,
- long schemaCacheExpiryInMilliSecs) {
- return CacheBuilder.newBuilder()
- .maximumSize(schemaCacheSize)
- .expireAfterAccess(schemaCacheExpiryInMilliSecs, TimeUnit.MILLISECONDS)
- .build(new CacheLoader() {
- @Override
- public SchemaVersionInfo load(Key key) throws Exception {
- LOG.debug("Key is not in cache: [{}]. Loading from from target service", key);
- SchemaVersionInfo schemaVersionInfo;
- if (key.schemaVersionKey != null) {
- schemaVersionInfo = schemaRetriever.retrieveSchemaVersion(key.schemaVersionKey);
- } else if (key.schemaIdVersion != null) {
- schemaVersionInfo = schemaRetriever.retrieveSchemaVersion(key.schemaIdVersion);
- } else {
- throw new IllegalArgumentException("Given argument is not valid: " + key);
- }
-
- LOG.debug("Update cache for entry {}", schemaVersionInfo);
- updateCacheInvalidationEntries(schemaVersionInfo);
- LOG.trace("Return version {}", schemaVersionInfo);
- return schemaVersionInfo;
- }
- });
- }
-
- private void updateCacheInvalidationEntries(SchemaVersionInfo schemaVersionInfo) {
- // need to support this as SchemaIdVersion supports multiple ways to construct for backward compatible APIs
- // this would have been simple without that.
- SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaVersionInfo.getName(), schemaVersionInfo.getVersion());
- SchemaIdVersion key1 = new SchemaIdVersion(schemaVersionInfo.getId());
- idWithNameVersion.putIfAbsent(key1, schemaVersionKey);
- Long schemaMetadataId = schemaVersionInfo.getSchemaMetadataId();
-
- // schemaMetadataId can be null from earlier registry instances.
- if (schemaMetadataId != null) {
- SchemaIdVersion key2 = new SchemaIdVersion(schemaMetadataId, schemaVersionInfo.getVersion());
- nameVersionWithIds.putIfAbsent(schemaVersionKey, Lists.newArrayList(key1, key2));
- idWithNameVersion.putIfAbsent(key2, schemaVersionKey);
- } else {
- nameVersionWithIds.putIfAbsent(schemaVersionKey, Collections.singletonList(key1));
- }
- }
-
- public SchemaVersionInfo getSchema(SchemaVersionInfoCache.Key key) throws SchemaNotFoundException {
- try {
- LOG.debug("Trying to load entry for cache with key [{}] from target service", key);
- SchemaVersionInfo schemaVersionInfo = loadingCache.get(key);
- LOG.trace("Result: {}", schemaVersionInfo);
- return schemaVersionInfo;
- } catch (ExecutionException e) {
- if (e.getCause().getClass() == SchemaNotFoundException.class) {
- SchemaNotFoundException exception = (SchemaNotFoundException) e.getCause();
- if (key.schemaVersionKey != null) {
- throw new SchemaNotFoundException(exception.getMessage(), key.schemaVersionKey.toString());
- } else {
- throw exception;
- }
- }
- throw new RuntimeException(e);
- } catch (UncheckedExecutionException e) {
- Throwable cause = e.getCause();
- if (cause instanceof NotFoundException) {
- throw new SchemaNotFoundException(key.toString(), cause);
- } else if (cause instanceof RuntimeException) {
- // Do not expose cache implementation details to the caller
- throw (RuntimeException) cause;
- } else {
- // Should not happen, best option is to rethrow
- throw e;
- }
- }
- }
-
- public SchemaVersionInfo getSchemaIfPresent(SchemaVersionInfoCache.Key key) throws SchemaNotFoundException {
- LOG.debug("Trying to get entry from cache if it is present in local cache with key [{}]", key);
- return loadingCache.getIfPresent(key);
- }
-
- public void invalidateSchema(SchemaVersionInfoCache.Key key) {
- LOG.debug("Invalidating cache entry for key [{}]", key);
- loadingCache.invalidate(key);
-
- SchemaVersionKey schemaVersionKey =
- key.schemaIdVersion != null ? idWithNameVersion.get(key.schemaIdVersion) : key.schemaVersionKey;
-
- // it can be null if it is not accessed earlier.
- if (schemaVersionKey != null) {
- loadingCache.invalidate(Key.of(schemaVersionKey));
- List schemaIdVersions = nameVersionWithIds.get(schemaVersionKey);
- if (schemaIdVersions != null) {
- for (SchemaIdVersion schemaIdVersion : schemaIdVersions) {
- loadingCache.invalidate(Key.of(schemaIdVersion));
- }
- }
- }
- }
-
- public void invalidateAll() {
- LOG.info("Invalidating all the cache entries");
-
- loadingCache.invalidateAll();
- }
-
- @Override
- public SchemaRegistryCacheType getCacheType() {
- return SchemaRegistryCacheType.SCHEMA_VERSION_CACHE;
- }
-
- @JsonIgnoreProperties(ignoreUnknown = true)
- public static class Key {
-
- @JsonProperty
- private SchemaVersionKey schemaVersionKey;
-
- @JsonProperty
- private SchemaIdVersion schemaIdVersion;
-
- public Key(SchemaVersionKey schemaVersionKey) {
- this.schemaVersionKey = schemaVersionKey;
- }
-
- public Key(SchemaIdVersion schemaIdVersion) {
- this.schemaIdVersion = schemaIdVersion;
- }
-
- public static Key of(SchemaVersionKey schemaVersionKey) {
- return new Key(schemaVersionKey);
- }
-
- public static Key of(SchemaIdVersion schemaIdVersion) {
- return new Key(schemaIdVersion);
- }
-
- // For JSON serialization/deserialization
- private Key() {
-
- }
-
- @Override
- public String toString() {
- return "Key {" +
- "schemaVersionKey=" + schemaVersionKey +
- ", schemaIdVersion=" + schemaIdVersion +
- '}';
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- Key key = (Key) o;
-
- if (schemaVersionKey != null ? !schemaVersionKey.equals(key.schemaVersionKey)
- : key.schemaVersionKey != null) {
- return false;
- }
- return schemaIdVersion != null ? schemaIdVersion.equals(key.schemaIdVersion) : key.schemaIdVersion == null;
- }
-
- @Override
- public int hashCode() {
- int result = schemaVersionKey != null ? schemaVersionKey.hashCode() : 0;
- result = 31 * result + (schemaIdVersion != null ? schemaIdVersion.hashCode() : 0);
- return result;
- }
- }
-
-}
diff --git a/schema-registry/schema-registry-common/src/test/java/com/hortonworks/registries/schemaregistry/SchemaVersionInfoCacheTest.java b/schema-registry/schema-registry-common/src/test/java/com/hortonworks/registries/schemaregistry/SchemaVersionInfoCacheTest.java
deleted file mode 100644
index cf1e2df9e..000000000
--- a/schema-registry/schema-registry-common/src/test/java/com/hortonworks/registries/schemaregistry/SchemaVersionInfoCacheTest.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Copyright 2016-2019 Cloudera, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.hortonworks.registries.schemaregistry;
-
-import com.google.common.collect.Sets;
-import com.hortonworks.registries.schemaregistry.cache.SchemaVersionInfoCache;
-import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException;
-import com.hortonworks.registries.schemaregistry.state.SchemaVersionLifecycleStates;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-
-/**
- *
- */
-public class SchemaVersionInfoCacheTest {
-
- @Test
- public void testSchemaVersionCache() throws Exception {
-
- final Map schemaIdWithVersionInfo = new HashMap<>();
- final Map schemaKeyWithVersionInfo = new HashMap<>();
-
- long schemaMetadataId = 1L;
- String schemaName = "schema-1";
- Integer version = 2;
- long schemaVersionId = 3L;
- SchemaVersionInfo schemaVersionInfo =
- new SchemaVersionInfo(schemaVersionId, schemaName, version, schemaMetadataId, "schema-text",
- System.currentTimeMillis(), "schema-description", SchemaVersionLifecycleStates.ENABLED.getId());
- SchemaIdVersion withVersionId = new SchemaIdVersion(schemaVersionId);
- SchemaIdVersion withMetaIdAndVersion = new SchemaIdVersion(schemaMetadataId, version);
- SchemaIdVersion withBoth = new SchemaIdVersion(schemaMetadataId, version, schemaVersionId);
- SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version);
- schemaKeyWithVersionInfo.put(schemaVersionKey, schemaVersionInfo);
- HashSet allIdVersions = Sets.newHashSet(withVersionId, withMetaIdAndVersion, withBoth);
- allIdVersions.stream().forEach(x -> schemaIdWithVersionInfo.put(x, schemaVersionInfo));
-
- SchemaVersionInfo otherSchemaVersionInfo = new SchemaVersionInfo(schemaVersionId + 1, "other-" + schemaName, version, schemaMetadataId + 1L,
- "other-schema-text", System.currentTimeMillis(), "schema-description", SchemaVersionLifecycleStates.ENABLED.getId());
- SchemaIdVersion otherIdVersion = new SchemaIdVersion(otherSchemaVersionInfo.getId());
- SchemaVersionKey otherSchemaVersionKey = new SchemaVersionKey(otherSchemaVersionInfo.getName(), otherSchemaVersionInfo
- .getVersion());
- schemaIdWithVersionInfo.put(otherIdVersion, otherSchemaVersionInfo);
- schemaKeyWithVersionInfo.put(otherSchemaVersionKey, otherSchemaVersionInfo);
-
-
- SchemaVersionRetriever schemaRetriever = new SchemaVersionRetriever() {
- @Override
- public SchemaVersionInfo retrieveSchemaVersion(SchemaVersionKey key) throws SchemaNotFoundException {
- return schemaKeyWithVersionInfo.get(key);
- }
-
- @Override
- public SchemaVersionInfo retrieveSchemaVersion(SchemaIdVersion key) throws SchemaNotFoundException {
- return schemaIdWithVersionInfo.get(key);
- }
- };
-
- SchemaVersionInfoCache schemaVersionInfoCache = new SchemaVersionInfoCache(schemaRetriever, 32, 60 * 1000L);
-
- // invalidate key without accessing earlier.
- for (SchemaVersionKey versionKey : schemaKeyWithVersionInfo.keySet()) {
- schemaVersionInfoCache.invalidateSchema(SchemaVersionInfoCache.Key.of(versionKey));
- }
-
- for (SchemaIdVersion schemaIdVersion : schemaIdWithVersionInfo.keySet()) {
- schemaVersionInfoCache.invalidateSchema(SchemaVersionInfoCache.Key.of(schemaIdVersion));
- }
-
- // access with all version id keys
- for (SchemaIdVersion key : allIdVersions) {
- SchemaVersionInfo recvdSchemaVersionInfo = schemaVersionInfoCache.getSchema(SchemaVersionInfoCache.Key.of(key));
- Assertions.assertEquals(schemaVersionInfo, recvdSchemaVersionInfo);
- }
-
- // access with version key
- SchemaVersionInfo recvdSchemaVersionInfo = schemaVersionInfoCache.getSchema(SchemaVersionInfoCache.Key.of(schemaVersionKey));
- Assertions.assertEquals(schemaVersionInfo, recvdSchemaVersionInfo);
-
- // invalidate one kind of schemaIdVersion, and all other combinations should return null
- schemaVersionInfoCache.invalidateSchema(SchemaVersionInfoCache.Key.of(withVersionId));
- for (SchemaIdVersion idVersion : allIdVersions) {
- recvdSchemaVersionInfo = schemaVersionInfoCache.getSchemaIfPresent(SchemaVersionInfoCache.Key.of(idVersion));
- Assertions.assertNull(recvdSchemaVersionInfo);
- }
-
- SchemaVersionInfo recvdOtherSchemaVersionInfo = schemaVersionInfoCache.getSchema(SchemaVersionInfoCache.Key.of(otherSchemaVersionKey));
- Assertions.assertEquals(otherSchemaVersionInfo, recvdOtherSchemaVersionInfo);
- recvdOtherSchemaVersionInfo = schemaVersionInfoCache.getSchema(SchemaVersionInfoCache.Key.of(otherIdVersion));
- Assertions.assertEquals(otherSchemaVersionInfo, recvdOtherSchemaVersionInfo);
-
- // all values for these keys should be non existent in cache
- for (SchemaIdVersion idVersion : allIdVersions) {
- Assertions.assertNull(schemaVersionInfoCache.getSchemaIfPresent(SchemaVersionInfoCache.Key.of(idVersion)));
- }
-
- // all values for these keys should be loaded
- for (SchemaIdVersion idVersion : allIdVersions) {
- Assertions.assertEquals(schemaVersionInfo, schemaVersionInfoCache.getSchema(SchemaVersionInfoCache.Key.of(idVersion)));
- }
-
- // all values for these keys should exist locally without loading from target
- for (SchemaIdVersion idVersion : allIdVersions) {
- Assertions.assertEquals(schemaVersionInfo, schemaVersionInfoCache.getSchemaIfPresent(SchemaVersionInfoCache.Key.of(idVersion)));
- }
-
- }
-}