diff --git a/schema-registry/schema-registry-common/src/main/java/com/hortonworks/registries/schemaregistry/cache/AbstractCache.java b/schema-registry/schema-registry-client/src/main/java/com/hortonworks/registries/schemaregistry/client/AbstractCache.java similarity index 80% rename from schema-registry/schema-registry-common/src/main/java/com/hortonworks/registries/schemaregistry/cache/AbstractCache.java rename to schema-registry/schema-registry-client/src/main/java/com/hortonworks/registries/schemaregistry/client/AbstractCache.java index d676edd9b..ca8d63f6e 100644 --- a/schema-registry/schema-registry-common/src/main/java/com/hortonworks/registries/schemaregistry/cache/AbstractCache.java +++ b/schema-registry/schema-registry-client/src/main/java/com/hortonworks/registries/schemaregistry/client/AbstractCache.java @@ -1,5 +1,5 @@ /** - * Copyright 2018-2019 Cloudera, Inc. + * Copyright 2018-2023 Cloudera, Inc. *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,7 +14,9 @@ * limitations under the License. **/ -package com.hortonworks.registries.schemaregistry.cache; +package com.hortonworks.registries.schemaregistry.client; + +import com.hortonworks.registries.schemaregistry.cache.SchemaRegistryCacheType; public interface AbstractCache { diff --git a/schema-registry/schema-registry-client/src/main/java/com/hortonworks/registries/schemaregistry/client/SchemaRegistryClient.java b/schema-registry/schema-registry-client/src/main/java/com/hortonworks/registries/schemaregistry/client/SchemaRegistryClient.java index e44bbefdf..e3fe548f4 100644 --- a/schema-registry/schema-registry-client/src/main/java/com/hortonworks/registries/schemaregistry/client/SchemaRegistryClient.java +++ b/schema-registry/schema-registry-client/src/main/java/com/hortonworks/registries/schemaregistry/client/SchemaRegistryClient.java @@ -45,7 +45,6 @@ import com.hortonworks.registries.schemaregistry.SchemaVersionRetriever; import com.hortonworks.registries.schemaregistry.SerDesInfo; import com.hortonworks.registries.schemaregistry.SerDesPair; -import com.hortonworks.registries.schemaregistry.cache.SchemaVersionInfoCache; import com.hortonworks.registries.schemaregistry.errors.IncompatibleSchemaException; import com.hortonworks.registries.schemaregistry.errors.InvalidSchemaBranchDeletionException; import com.hortonworks.registries.schemaregistry.errors.InvalidSchemaException; @@ -339,19 +338,20 @@ private SchemaMetadataCache createSchemaMetadataCache() { private SchemaVersionInfoCache createSchemaVersionInfoCache() { return new SchemaVersionInfoCache( - new SchemaVersionRetriever() { - @Override - public SchemaVersionInfo retrieveSchemaVersion(SchemaVersionKey key) throws SchemaNotFoundException { - return doGetSchemaVersionInfo(key); - } + new SchemaVersionRetriever() { + @Override + public SchemaVersionInfo retrieveSchemaVersion(SchemaVersionKey key) throws SchemaNotFoundException { + return doGetSchemaVersionInfo(key); + } - @Override - public SchemaVersionInfo retrieveSchemaVersion(SchemaIdVersion key) throws SchemaNotFoundException { - return doGetSchemaVersionInfo(key); - } - }, - ((Number) configuration.getValue(Configuration.SCHEMA_VERSION_CACHE_SIZE.name())).intValue(), - ((Number) configuration.getValue(Configuration.SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS.name())).longValue() * 1000L + @Override + public SchemaVersionInfo retrieveSchemaVersion(SchemaIdVersion key) throws SchemaNotFoundException { + return doGetSchemaVersionInfo(key); + } + }, + createSchemaMetadataFetcher(), + ((Number) configuration.getValue(Configuration.SCHEMA_VERSION_CACHE_SIZE.name())).intValue(), + ((Number) configuration.getValue(Configuration.SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS.name())).longValue() * 1000L ); } diff --git a/schema-registry/schema-registry-client/src/main/java/com/hortonworks/registries/schemaregistry/client/SchemaVersionInfoCache.java b/schema-registry/schema-registry-client/src/main/java/com/hortonworks/registries/schemaregistry/client/SchemaVersionInfoCache.java new file mode 100644 index 000000000..72f18a9ae --- /dev/null +++ b/schema-registry/schema-registry-client/src/main/java/com/hortonworks/registries/schemaregistry/client/SchemaVersionInfoCache.java @@ -0,0 +1,318 @@ +/** + * Copyright 2016-2023 Cloudera, Inc. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package com.hortonworks.registries.schemaregistry.client; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.hortonworks.registries.schemaregistry.SchemaIdVersion; +import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo; +import com.hortonworks.registries.schemaregistry.SchemaVersionInfo; +import com.hortonworks.registries.schemaregistry.SchemaVersionKey; +import com.hortonworks.registries.schemaregistry.SchemaVersionRetriever; +import com.hortonworks.registries.schemaregistry.cache.SchemaRegistryCacheType; +import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException; +import com.hortonworks.registries.shaded.javax.ws.rs.NotFoundException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.CheckForNull; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + +import static com.google.common.cache.RemovalCause.REPLACED; + +/** + * Loading cache for {@link Key} with values {@link SchemaVersionInfo}. + */ +public class SchemaVersionInfoCache implements AbstractCache { + private static final Logger LOG = LoggerFactory.getLogger(SchemaVersionInfoCache.class); + + private final Cache cache; + private final ConcurrentMap metadataNameVersion2Id; + private final ConcurrentMap metadataIdVersion2Id; + private final SchemaVersionRetriever schemaVersionRetriever; + + private final SchemaMetadataCache.SchemaMetadataFetcher schemaMetadataFetcher; + + SchemaVersionInfoCache(final SchemaVersionRetriever schemaVersionRetriever, + final SchemaMetadataCache.SchemaMetadataFetcher schemaMetadataFetcher, + final int schemaCacheSize, + final long schemaCacheExpiryInMilliSecs, + ConcurrentMap metadataNameVersion2Id, + ConcurrentMap metadataIdVersion2Id + ) { + this.metadataNameVersion2Id = metadataNameVersion2Id; + this.metadataIdVersion2Id = metadataIdVersion2Id; + this.schemaVersionRetriever = schemaVersionRetriever; + this.schemaMetadataFetcher = schemaMetadataFetcher; + cache = createCache(schemaCacheSize, schemaCacheExpiryInMilliSecs); + } + + public SchemaVersionInfoCache(final SchemaVersionRetriever schemaVersionRetriever, + final SchemaMetadataCache.SchemaMetadataFetcher schemaMetadataFetcher, + final int schemaCacheSize, + final long schemaCacheExpiryInMilliSecs) { + this(schemaVersionRetriever, schemaMetadataFetcher, schemaCacheSize, schemaCacheExpiryInMilliSecs, + new ConcurrentHashMap<>(schemaCacheSize), + new ConcurrentHashMap<>(schemaCacheSize)); + } + + public SchemaVersionInfo getSchema(SchemaVersionInfoCache.Key key) throws SchemaNotFoundException { + try { + LOG.debug("Trying to load entry for cache with key [{}] from target service", key); + SchemaVersionInfo schemaVersionInfo = null; + if (key.schemaVersionKey != null) { + schemaVersionInfo = getSchemaVersionInfoFromSchemaVersionKey(key.schemaVersionKey); + } else if (key.schemaIdVersion != null) { + schemaVersionInfo = getSchemaVersionInfoFromSchemaIdVersion(key.schemaIdVersion); + } else { + throw new IllegalArgumentException("Given argument is not valid: " + key); + } + LOG.trace("Result: {}", schemaVersionInfo); + return schemaVersionInfo; + } catch (SchemaNotFoundException snfe) { + if (key.schemaVersionKey != null) { + throw new SchemaNotFoundException(key.schemaVersionKey.toString(), snfe.getMessage()); + } else { + throw snfe; + } + } catch (NotFoundException nfe) { + throw new SchemaNotFoundException(key.toString(), nfe); + } + } + + @CheckForNull + public SchemaVersionInfo getSchemaIfPresent(SchemaVersionInfoCache.Key key) { + LOG.debug("Trying to get entry from cache if it is present in local cache with key [{}]", key); + Long id = null; + if (key.schemaVersionKey != null) { + id = metadataNameVersion2Id.getOrDefault(key.schemaVersionKey, null); + } else if (key.schemaIdVersion != null) { + id = getIdFromSchemaIdVersionOrCache(key.schemaIdVersion); + } + if (id != null) { + return cache.getIfPresent(id); + } + return null; + } + + public void invalidateSchema(SchemaVersionInfoCache.Key key) { + LOG.debug("Invalidating cache entry for key [{}]", key); + if (key.schemaVersionKey != null) { + invalidateBySchemaVersionKey(key.schemaVersionKey); + } else if (key.schemaIdVersion != null) { + invalidateBySchemaIdVersion(key.schemaIdVersion); + } else { + throw new IllegalArgumentException("Given argument is not valid: " + key); + } + } + + public void invalidateAll() { + cache.invalidateAll(); + } + + @Override + public SchemaRegistryCacheType getCacheType() { + return SchemaRegistryCacheType.SCHEMA_VERSION_CACHE; + } + + @JsonIgnoreProperties(ignoreUnknown = true) + public static class Key { + + @JsonProperty + private SchemaVersionKey schemaVersionKey; + + @JsonProperty + private SchemaIdVersion schemaIdVersion; + + public Key(SchemaVersionKey schemaVersionKey) { + this.schemaVersionKey = schemaVersionKey; + } + + public Key(SchemaIdVersion schemaIdVersion) { + this.schemaIdVersion = schemaIdVersion; + } + + public static Key of(SchemaVersionKey schemaVersionKey) { + return new Key(schemaVersionKey); + } + + public static Key of(SchemaIdVersion schemaIdVersion) { + return new Key(schemaIdVersion); + } + + // For JSON serialization/deserialization + private Key() { + + } + + @Override + public String toString() { + return "Key {" + + "schemaVersionKey=" + schemaVersionKey + + ", schemaIdVersion=" + schemaIdVersion + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + Key key = (Key) o; + + if (schemaVersionKey != null ? !schemaVersionKey.equals(key.schemaVersionKey) + : key.schemaVersionKey != null) { + return false; + } + return schemaIdVersion != null ? schemaIdVersion.equals(key.schemaIdVersion) : key.schemaIdVersion == null; + } + + @Override + public int hashCode() { + int result = schemaVersionKey != null ? schemaVersionKey.hashCode() : 0; + result = 31 * result + (schemaIdVersion != null ? schemaIdVersion.hashCode() : 0); + return result; + } + } + + private SchemaVersionInfo getSchemaVersionInfoFromSchemaIdVersion(SchemaIdVersion schemaIdVersion) throws SchemaNotFoundException { + Long id = getIdFromSchemaIdVersionOrCache(schemaIdVersion); + if (id != null) { + SchemaVersionInfo schemaVersionInfo = cache.getIfPresent(id); + if (schemaVersionInfo != null) { + return schemaVersionInfo; + } + } + SchemaVersionInfo schemaVersionInfo = schemaVersionRetriever.retrieveSchemaVersion(schemaIdVersion); + schemaVersionInfo = updateSchemaVersionInfoIfSchemaMetadataIdIsNull(schemaVersionInfo, schemaIdVersion.getSchemaMetadataId()); + updateCaches(schemaVersionInfo, createSchemaVersionKeyFromSchemaVersionInfo(schemaVersionInfo)); + return schemaVersionInfo; + } + + private SchemaVersionInfo updateSchemaVersionInfoIfSchemaMetadataIdIsNull(SchemaVersionInfo schemaVersionInfo, + Long schemaMetadataId) throws SchemaNotFoundException { + if (schemaVersionInfo.getSchemaMetadataId() != null) { + return schemaVersionInfo; + } + Long metadataId = schemaMetadataId; + if (metadataId == null) { + SchemaMetadataInfo schemaMetadataInfo = schemaMetadataFetcher.fetch(schemaVersionInfo.getName()); + if (schemaMetadataInfo == null) { + throw new SchemaNotFoundException(schemaVersionInfo.getName(), "Could not find schema."); + } + metadataId = schemaMetadataInfo.getId(); + } + return new SchemaVersionInfo(schemaVersionInfo.getId(), + schemaVersionInfo.getName(), + schemaVersionInfo.getVersion(), + metadataId, + schemaVersionInfo.getSchemaText(), + schemaVersionInfo.getTimestamp(), + schemaVersionInfo.getDescription(), + schemaVersionInfo.getStateId() + ); + } + + private void updateCaches(SchemaVersionInfo schemaVersionInfo, SchemaVersionKey schemaVersionKey) { + long id = schemaVersionInfo.getId(); + metadataNameVersion2Id.put(schemaVersionKey, id); + metadataIdVersion2Id.put(createSchemaIdVersionFromSchemaVersionInfo(schemaVersionInfo), id); + cache.put(id, schemaVersionInfo); + } + + private SchemaVersionInfo getSchemaVersionInfoFromSchemaVersionKey(SchemaVersionKey schemaVersionKey) throws SchemaNotFoundException { + SchemaVersionInfo schemaVersionInfo = getSchemaVersionInfoFromCache(schemaVersionKey); + if (schemaVersionInfo != null) { + return schemaVersionInfo; + } + schemaVersionInfo = schemaVersionRetriever.retrieveSchemaVersion(schemaVersionKey); + schemaVersionInfo = updateSchemaVersionInfoIfSchemaMetadataIdIsNull(schemaVersionInfo, null); + updateCaches(schemaVersionInfo, schemaVersionKey); + + return schemaVersionInfo; + } + + private SchemaVersionInfo getSchemaVersionInfoFromCache(SchemaVersionKey schemaVersionKey) { + Long id = metadataNameVersion2Id.getOrDefault(schemaVersionKey, null); + if (id != null) { + return cache.getIfPresent(id); + } + return null; + } + + private void invalidateInternalCaches(SchemaVersionInfo schemaVersionInfo) { + SchemaVersionKey schemaVersionKey = createSchemaVersionKeyFromSchemaVersionInfo(schemaVersionInfo); + LOG.trace("Invalidate schemaVersionKey: {}", schemaVersionKey); + metadataNameVersion2Id.remove(schemaVersionKey); + SchemaIdVersion schemaIdVersion = createSchemaIdVersionFromSchemaVersionInfo(schemaVersionInfo); + LOG.trace("Invalidate schemaIdVersion: {}", schemaIdVersion); + metadataIdVersion2Id.remove(schemaIdVersion); + } + + private static SchemaIdVersion createSchemaIdVersionFromSchemaVersionInfo(SchemaVersionInfo schemaVersionInfo) { + return new SchemaIdVersion(schemaVersionInfo.getSchemaMetadataId(), schemaVersionInfo.getVersion()); + } + + private static SchemaVersionKey createSchemaVersionKeyFromSchemaVersionInfo(SchemaVersionInfo schemaVersionInfo) { + return new SchemaVersionKey(schemaVersionInfo.getName(), schemaVersionInfo.getVersion()); + } + + private Cache createCache(int schemaCacheSize, + long schemaCacheExpiryInMilliSecs) { + return CacheBuilder.newBuilder() + .maximumSize(schemaCacheSize) + .expireAfterAccess(schemaCacheExpiryInMilliSecs, TimeUnit.MILLISECONDS) + .removalListener((RemovalListener) notification -> { + if (notification.getCause() != REPLACED) { + SchemaVersionInfo id = notification.getValue(); + if (id != null) { + invalidateInternalCaches(id); + } + } + }) + .build(); + } + + private void invalidateBySchemaIdVersion(SchemaIdVersion key) { + Long id = getIdFromSchemaIdVersionOrCache(key); + if (id != null) { + cache.invalidate(id); + } + } + + private Long getIdFromSchemaIdVersionOrCache(SchemaIdVersion key) { + Long id = key.getSchemaVersionId(); + if (id == null) { + id = metadataIdVersion2Id.getOrDefault(key, null); + } + return id; + } + + private void invalidateBySchemaVersionKey(SchemaVersionKey key) { + Long id = metadataNameVersion2Id.getOrDefault(key, null); + if (id != null) { + cache.invalidate(id); + } + } +} diff --git a/schema-registry/schema-registry-client/src/test/java/com/hortonworks/registries/schemaregistry/client/SchemaVersionInfoCacheTest.java b/schema-registry/schema-registry-client/src/test/java/com/hortonworks/registries/schemaregistry/client/SchemaVersionInfoCacheTest.java new file mode 100644 index 000000000..3313aedfe --- /dev/null +++ b/schema-registry/schema-registry-client/src/test/java/com/hortonworks/registries/schemaregistry/client/SchemaVersionInfoCacheTest.java @@ -0,0 +1,318 @@ +/* + * Copyright 2016-2019 Cloudera, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hortonworks.registries.schemaregistry.client; + +import com.google.common.collect.Sets; +import com.hortonworks.registries.schemaregistry.SchemaIdVersion; +import com.hortonworks.registries.schemaregistry.SchemaMetadata; +import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo; +import com.hortonworks.registries.schemaregistry.SchemaVersionInfo; +import com.hortonworks.registries.schemaregistry.SchemaVersionKey; +import com.hortonworks.registries.schemaregistry.SchemaVersionRetriever; +import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException; +import com.hortonworks.registries.schemaregistry.state.SchemaVersionLifecycleStates; +import com.hortonworks.registries.shaded.javax.ws.rs.NotFoundException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +/** + * + */ +public class SchemaVersionInfoCacheTest { + + @Test + public void testSchemaVersionCache() throws Exception { + + final Map schemaIdWithVersionInfo = new HashMap<>(); + final Map schemaKeyWithVersionInfo = new HashMap<>(); + + long schemaMetadataId = 1L; + String schemaName = "schema-1"; + Integer version = 2; + long schemaVersionId = 3L; + SchemaVersionInfo schemaVersionInfo = + new SchemaVersionInfo(schemaVersionId, schemaName, version, schemaMetadataId, "schema-text", + System.currentTimeMillis(), "schema-description", SchemaVersionLifecycleStates.ENABLED.getId()); + SchemaIdVersion withVersionId = new SchemaIdVersion(schemaVersionId); + SchemaIdVersion withMetaIdAndVersion = new SchemaIdVersion(schemaMetadataId, version); + SchemaIdVersion withBoth = new SchemaIdVersion(schemaMetadataId, version, schemaVersionId); + SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version); + schemaKeyWithVersionInfo.put(schemaVersionKey, schemaVersionInfo); + HashSet allIdVersions = Sets.newHashSet(withVersionId, withMetaIdAndVersion, withBoth); + allIdVersions.stream().forEach(x -> schemaIdWithVersionInfo.put(x, schemaVersionInfo)); + + SchemaVersionInfo otherSchemaVersionInfo = new SchemaVersionInfo(schemaVersionId + 1, "other-" + schemaName, version, schemaMetadataId + 1L, + "other-schema-text", System.currentTimeMillis(), "schema-description", SchemaVersionLifecycleStates.ENABLED.getId()); + SchemaIdVersion otherIdVersion = new SchemaIdVersion(otherSchemaVersionInfo.getId()); + SchemaVersionKey otherSchemaVersionKey = new SchemaVersionKey(otherSchemaVersionInfo.getName(), otherSchemaVersionInfo + .getVersion()); + schemaIdWithVersionInfo.put(otherIdVersion, otherSchemaVersionInfo); + schemaKeyWithVersionInfo.put(otherSchemaVersionKey, otherSchemaVersionInfo); + + + SchemaVersionRetriever schemaRetriever = new SchemaVersionRetriever() { + @Override + public SchemaVersionInfo retrieveSchemaVersion(SchemaVersionKey key) throws SchemaNotFoundException { + return schemaKeyWithVersionInfo.get(key); + } + + @Override + public SchemaVersionInfo retrieveSchemaVersion(SchemaIdVersion key) throws SchemaNotFoundException { + return schemaIdWithVersionInfo.get(key); + } + }; + SchemaMetadataCache.SchemaMetadataFetcher schemaMetadataFetcher = new SchemaMetadataCache.SchemaMetadataFetcher() { + @Override + public SchemaMetadataInfo fetch(String name) throws SchemaNotFoundException { + return null; + } + + @Override + public SchemaMetadataInfo fetch(Long id) throws SchemaNotFoundException { + return null; + } + }; + + SchemaVersionInfoCache schemaVersionInfoCache = new SchemaVersionInfoCache(schemaRetriever, schemaMetadataFetcher, 32, 60 * 60 * 1000L); + + // invalidate key without accessing earlier. + for (SchemaVersionKey versionKey : schemaKeyWithVersionInfo.keySet()) { + schemaVersionInfoCache.invalidateSchema(SchemaVersionInfoCache.Key.of(versionKey)); + } + + for (SchemaIdVersion schemaIdVersion : schemaIdWithVersionInfo.keySet()) { + schemaVersionInfoCache.invalidateSchema(SchemaVersionInfoCache.Key.of(schemaIdVersion)); + } + + // access with all version id keys + for (SchemaIdVersion key : allIdVersions) { + SchemaVersionInfo recvdSchemaVersionInfo = schemaVersionInfoCache.getSchema(SchemaVersionInfoCache.Key.of(key)); + Assertions.assertEquals(schemaVersionInfo, recvdSchemaVersionInfo); + } + + // access with version key + SchemaVersionInfo recvdSchemaVersionInfo = schemaVersionInfoCache.getSchema(SchemaVersionInfoCache.Key.of(schemaVersionKey)); + Assertions.assertEquals(schemaVersionInfo, recvdSchemaVersionInfo); + + // invalidate one kind of schemaIdVersion, and all other combinations should return null + schemaVersionInfoCache.invalidateSchema(SchemaVersionInfoCache.Key.of(withVersionId)); + for (SchemaIdVersion idVersion : allIdVersions) { + recvdSchemaVersionInfo = schemaVersionInfoCache.getSchemaIfPresent(SchemaVersionInfoCache.Key.of(idVersion)); + Assertions.assertNull(recvdSchemaVersionInfo); + } + + SchemaVersionInfo recvdOtherSchemaVersionInfo = schemaVersionInfoCache.getSchema(SchemaVersionInfoCache.Key.of(otherSchemaVersionKey)); + Assertions.assertEquals(otherSchemaVersionInfo, recvdOtherSchemaVersionInfo); + recvdOtherSchemaVersionInfo = schemaVersionInfoCache.getSchema(SchemaVersionInfoCache.Key.of(otherIdVersion)); + Assertions.assertEquals(otherSchemaVersionInfo, recvdOtherSchemaVersionInfo); + + // all values for these keys should be non existent in cache + for (SchemaIdVersion idVersion : allIdVersions) { + Assertions.assertNull(schemaVersionInfoCache.getSchemaIfPresent(SchemaVersionInfoCache.Key.of(idVersion))); + } + + // all values for these keys should be loaded + for (SchemaIdVersion idVersion : allIdVersions) { + Assertions.assertEquals(schemaVersionInfo, schemaVersionInfoCache.getSchema(SchemaVersionInfoCache.Key.of(idVersion))); + } + + // all values for these keys should exist locally without loading from target + for (SchemaIdVersion idVersion : allIdVersions) { + Assertions.assertEquals(schemaVersionInfo, schemaVersionInfoCache.getSchemaIfPresent(SchemaVersionInfoCache.Key.of(idVersion))); + } + + } + + @Test + public void getSchemaBySchemaIdVersion() throws Exception { + SchemaVersionRetriever mySchemaRetriever = Mockito.mock(SchemaVersionRetriever.class); + when(mySchemaRetriever.retrieveSchemaVersion(any(SchemaIdVersion.class))).thenReturn(createSchemaVersionInfo()); + SchemaVersionInfoCache schemaVersionInfoCache = new SchemaVersionInfoCache(mySchemaRetriever, null, 32, + 60 * 1000L); + + SchemaVersionInfo result = schemaVersionInfoCache.getSchema( + SchemaVersionInfoCache.Key.of(new SchemaIdVersion(15L, 6))); + + verify(mySchemaRetriever).retrieveSchemaVersion(any(SchemaIdVersion.class)); + verifyNoMoreInteractions(mySchemaRetriever); + assertSchemaVersionInfo(result); + } + + @Test + public void getSchemaBySchemaVersionKey() throws Exception { + SchemaVersionRetriever mySchemaRetriever = Mockito.mock(SchemaVersionRetriever.class); + when(mySchemaRetriever.retrieveSchemaVersion(any(SchemaVersionKey.class))).thenReturn(createSchemaVersionInfo()); + SchemaVersionInfoCache schemaVersionInfoCache = new SchemaVersionInfoCache(mySchemaRetriever, null, 32, + 60 * 1000L); + + SchemaVersionInfo result = schemaVersionInfoCache.getSchema( + SchemaVersionInfoCache.Key.of(new SchemaVersionKey("schema-name", 6))); + + verify(mySchemaRetriever).retrieveSchemaVersion(any(SchemaVersionKey.class)); + verifyNoMoreInteractions(mySchemaRetriever); + assertSchemaVersionInfo(result); + } + + @Test + public void getSchemaVersionInfoFromCache() throws Exception { + SchemaVersionRetriever mySchemaRetriever = Mockito.mock(SchemaVersionRetriever.class); + when(mySchemaRetriever.retrieveSchemaVersion(any(SchemaVersionKey.class))).thenReturn(createSchemaVersionInfo()); + SchemaVersionInfoCache schemaVersionInfoCache = new SchemaVersionInfoCache(mySchemaRetriever, null, 32, + 60 * 1000L); + + // fill up the cache from remote + schemaVersionInfoCache.getSchema( + SchemaVersionInfoCache.Key.of(new SchemaVersionKey("schema-name", 6))); + // get schemaversioninfo from the cache using different inputs + SchemaVersionInfo result = schemaVersionInfoCache.getSchema( + SchemaVersionInfoCache.Key.of(new SchemaVersionKey("schema-name", 6))); + SchemaVersionInfo resultFromSchemaVersionId = schemaVersionInfoCache.getSchema( + SchemaVersionInfoCache.Key.of(new SchemaIdVersion(15L))); + SchemaVersionInfo resultFromSchemaMetadataVersion = schemaVersionInfoCache.getSchema( + SchemaVersionInfoCache.Key.of(new SchemaIdVersion(5L, 6))); + + verify(mySchemaRetriever).retrieveSchemaVersion(any(SchemaVersionKey.class)); + verifyNoMoreInteractions(mySchemaRetriever); + assertSchemaVersionInfo(result); + assertSchemaVersionInfo(resultFromSchemaVersionId); + assertSchemaVersionInfo(resultFromSchemaMetadataVersion); + } + + @Test + public void getSchemaVersionInfoWithoutMetadataId() throws Exception { + SchemaVersionRetriever mySchemaRetriever = Mockito.mock(SchemaVersionRetriever.class); + SchemaMetadataCache.SchemaMetadataFetcher schemaMetadataFetcher = Mockito.mock(SchemaMetadataCache.SchemaMetadataFetcher.class); + when(mySchemaRetriever.retrieveSchemaVersion(any(SchemaIdVersion.class))).thenReturn(createSchemaVersionInfoWithoutMetadataId()); + when(schemaMetadataFetcher.fetch("schema-name")).thenReturn(new SchemaMetadataInfo( + new SchemaMetadata.Builder("schema-metadata-name").type("type1").build(), + 5L, 0L)); + SchemaVersionInfoCache schemaVersionInfoCache = new SchemaVersionInfoCache(mySchemaRetriever, schemaMetadataFetcher, 32, + 60 * 1000L); + + SchemaVersionInfo result = schemaVersionInfoCache.getSchema( + SchemaVersionInfoCache.Key.of(new SchemaIdVersion(15L))); + + verify(mySchemaRetriever).retrieveSchemaVersion(any(SchemaIdVersion.class)); + verify(schemaMetadataFetcher).fetch("schema-name"); + verifyNoMoreInteractions(mySchemaRetriever); + assertSchemaVersionInfo(result); + } + @Test + public void getSchemaThrowsSchemaNotFoundException() throws Exception { + SchemaVersionRetriever mySchemaRetriever = Mockito.mock(SchemaVersionRetriever.class); + SchemaVersionKey versionKey = new SchemaVersionKey("schema-1", 1); + when(mySchemaRetriever.retrieveSchemaVersion(versionKey)).thenThrow(new SchemaNotFoundException("Schema not found.", "entity")); + SchemaVersionInfoCache schemaVersionInfoCache = new SchemaVersionInfoCache(mySchemaRetriever, null, 32, + 60 * 1000L); + + SchemaNotFoundException thrown = Assertions + .assertThrows(SchemaNotFoundException.class, () -> { + schemaVersionInfoCache.getSchema(SchemaVersionInfoCache.Key.of(versionKey)); + }, "NumberFormatException error was expected"); + + Assertions.assertEquals(versionKey.toString(), thrown.getMessage()); + } + + @Test + public void getSchemaThrowsNotFoundException() throws Exception { + SchemaVersionRetriever mySchemaRetriever = Mockito.mock(SchemaVersionRetriever.class); + SchemaVersionKey versionKey = new SchemaVersionKey("schema-1", 1); + when(mySchemaRetriever.retrieveSchemaVersion(versionKey)).thenThrow(new NotFoundException("HTTP 404.")); + SchemaVersionInfoCache schemaVersionInfoCache = new SchemaVersionInfoCache(mySchemaRetriever, null, 32, + 60 * 1000L); + + SchemaNotFoundException thrown = Assertions + .assertThrows(SchemaNotFoundException.class, () -> { + schemaVersionInfoCache.getSchema(SchemaVersionInfoCache.Key.of(versionKey)); + }, "NumberFormatException error was expected"); + + Assertions.assertEquals(SchemaVersionInfoCache.Key.of(versionKey).toString(), thrown.getMessage()); + } + + private static SchemaVersionInfo createSchemaVersionInfo() { + return new SchemaVersionInfo(15L, + "schema-name", 6, 5L, "text", 0L, "description", null); + } + private static SchemaVersionInfo createSchemaVersionInfoWithoutMetadataId() { + return new SchemaVersionInfo(15L, + "schema-name", 6, null, "text", 0L, "description", null); + } + + private static void assertSchemaVersionInfo(SchemaVersionInfo result) { + Assertions.assertEquals(15L, result.getId()); + Assertions.assertEquals("schema-name", result.getName()); + Assertions.assertEquals(6, result.getVersion()); + Assertions.assertEquals(5L, result.getSchemaMetadataId()); + Assertions.assertEquals("text", result.getSchemaText()); + Assertions.assertEquals(0L, result.getTimestamp()); + Assertions.assertEquals("description", result.getDescription()); + } + + @Test + public void itInvalidatesCaches() throws Exception { + ConcurrentMap metadataNameVersion2Id = new ConcurrentHashMap<>(); + ConcurrentMap metadataIdVersion2Id = new ConcurrentHashMap<>(); + SchemaVersionRetriever mySchemaRetriever = Mockito.mock(SchemaVersionRetriever.class); + when(mySchemaRetriever.retrieveSchemaVersion(any(SchemaVersionKey.class))).thenReturn(createSchemaVersionInfo()); + SchemaVersionInfoCache schemaVersionInfoCache = new SchemaVersionInfoCache(mySchemaRetriever, null, 32, + 60 * 1000L, metadataNameVersion2Id, metadataIdVersion2Id); + + + schemaVersionInfoCache.getSchema( + SchemaVersionInfoCache.Key.of(new SchemaVersionKey("schema-name", 6))); + + Assertions.assertEquals(1, metadataNameVersion2Id.size()); + Assertions.assertEquals(1, metadataIdVersion2Id.size()); + schemaVersionInfoCache.invalidateSchema(SchemaVersionInfoCache.Key.of(new SchemaVersionKey("schema-name", 6))); + Assertions.assertEquals(0, metadataNameVersion2Id.size()); + Assertions.assertEquals(0, metadataIdVersion2Id.size()); + } + + @Test + public void itInvalidatesAll() throws Exception { + ConcurrentMap metadataNameVersion2Id = new ConcurrentHashMap<>(); + ConcurrentMap metadataIdVersion2Id = new ConcurrentHashMap<>(); + SchemaVersionRetriever mySchemaRetriever = Mockito.mock(SchemaVersionRetriever.class); + when(mySchemaRetriever.retrieveSchemaVersion(new SchemaVersionKey("schema-name", 6))).thenReturn(createSchemaVersionInfo()); + when(mySchemaRetriever.retrieveSchemaVersion(new SchemaVersionKey("schema-name2", 7))).thenReturn(new SchemaVersionInfo(16L, + "schema-name2", 7, 6L, "text2", 2L, "description2", null)); + SchemaVersionInfoCache schemaVersionInfoCache = new SchemaVersionInfoCache(mySchemaRetriever, null, 32, + 60 * 1000L, metadataNameVersion2Id, metadataIdVersion2Id); + + schemaVersionInfoCache.getSchema( + SchemaVersionInfoCache.Key.of(new SchemaVersionKey("schema-name", 6))); + schemaVersionInfoCache.getSchema( + SchemaVersionInfoCache.Key.of(new SchemaVersionKey("schema-name2", 7))); + + Assertions.assertEquals(2, metadataNameVersion2Id.size()); + Assertions.assertEquals(2, metadataIdVersion2Id.size()); + schemaVersionInfoCache.invalidateAll(); + Assertions.assertEquals(0, metadataNameVersion2Id.size()); + Assertions.assertEquals(0, metadataIdVersion2Id.size()); + } +} diff --git a/schema-registry/schema-registry-common/src/main/java/com/hortonworks/registries/schemaregistry/cache/SchemaVersionInfoCache.java b/schema-registry/schema-registry-common/src/main/java/com/hortonworks/registries/schemaregistry/cache/SchemaVersionInfoCache.java deleted file mode 100644 index bb27310dd..000000000 --- a/schema-registry/schema-registry-common/src/main/java/com/hortonworks/registries/schemaregistry/cache/SchemaVersionInfoCache.java +++ /dev/null @@ -1,232 +0,0 @@ -/** - * Copyright 2016-2023 Cloudera, Inc. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ -package com.hortonworks.registries.schemaregistry.cache; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.UncheckedExecutionException; -import com.hortonworks.registries.schemaregistry.SchemaIdVersion; -import com.hortonworks.registries.schemaregistry.SchemaVersionInfo; -import com.hortonworks.registries.schemaregistry.SchemaVersionKey; -import com.hortonworks.registries.schemaregistry.SchemaVersionRetriever; -import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException; -import com.hortonworks.registries.shaded.javax.ws.rs.NotFoundException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - -/** - * Loading cache for {@link Key} with values {@link SchemaVersionInfo}. - */ -public class SchemaVersionInfoCache implements AbstractCache { - private static final Logger LOG = LoggerFactory.getLogger(SchemaVersionInfoCache.class); - - private final LoadingCache loadingCache; - private final ConcurrentMap idWithNameVersion; - private final ConcurrentMap> nameVersionWithIds; - - public SchemaVersionInfoCache(final SchemaVersionRetriever schemaRetriever, - final int schemaCacheSize, - final long schemaCacheExpiryInMilliSecs) { - idWithNameVersion = new ConcurrentHashMap<>(schemaCacheSize); - nameVersionWithIds = new ConcurrentHashMap<>(schemaCacheSize); - loadingCache = createLoadingCache(schemaRetriever, schemaCacheSize, schemaCacheExpiryInMilliSecs); - } - - private LoadingCache createLoadingCache(SchemaVersionRetriever schemaRetriever, - int schemaCacheSize, - long schemaCacheExpiryInMilliSecs) { - return CacheBuilder.newBuilder() - .maximumSize(schemaCacheSize) - .expireAfterAccess(schemaCacheExpiryInMilliSecs, TimeUnit.MILLISECONDS) - .build(new CacheLoader() { - @Override - public SchemaVersionInfo load(Key key) throws Exception { - LOG.debug("Key is not in cache: [{}]. Loading from from target service", key); - SchemaVersionInfo schemaVersionInfo; - if (key.schemaVersionKey != null) { - schemaVersionInfo = schemaRetriever.retrieveSchemaVersion(key.schemaVersionKey); - } else if (key.schemaIdVersion != null) { - schemaVersionInfo = schemaRetriever.retrieveSchemaVersion(key.schemaIdVersion); - } else { - throw new IllegalArgumentException("Given argument is not valid: " + key); - } - - LOG.debug("Update cache for entry {}", schemaVersionInfo); - updateCacheInvalidationEntries(schemaVersionInfo); - LOG.trace("Return version {}", schemaVersionInfo); - return schemaVersionInfo; - } - }); - } - - private void updateCacheInvalidationEntries(SchemaVersionInfo schemaVersionInfo) { - // need to support this as SchemaIdVersion supports multiple ways to construct for backward compatible APIs - // this would have been simple without that. - SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaVersionInfo.getName(), schemaVersionInfo.getVersion()); - SchemaIdVersion key1 = new SchemaIdVersion(schemaVersionInfo.getId()); - idWithNameVersion.putIfAbsent(key1, schemaVersionKey); - Long schemaMetadataId = schemaVersionInfo.getSchemaMetadataId(); - - // schemaMetadataId can be null from earlier registry instances. - if (schemaMetadataId != null) { - SchemaIdVersion key2 = new SchemaIdVersion(schemaMetadataId, schemaVersionInfo.getVersion()); - nameVersionWithIds.putIfAbsent(schemaVersionKey, Lists.newArrayList(key1, key2)); - idWithNameVersion.putIfAbsent(key2, schemaVersionKey); - } else { - nameVersionWithIds.putIfAbsent(schemaVersionKey, Collections.singletonList(key1)); - } - } - - public SchemaVersionInfo getSchema(SchemaVersionInfoCache.Key key) throws SchemaNotFoundException { - try { - LOG.debug("Trying to load entry for cache with key [{}] from target service", key); - SchemaVersionInfo schemaVersionInfo = loadingCache.get(key); - LOG.trace("Result: {}", schemaVersionInfo); - return schemaVersionInfo; - } catch (ExecutionException e) { - if (e.getCause().getClass() == SchemaNotFoundException.class) { - SchemaNotFoundException exception = (SchemaNotFoundException) e.getCause(); - if (key.schemaVersionKey != null) { - throw new SchemaNotFoundException(exception.getMessage(), key.schemaVersionKey.toString()); - } else { - throw exception; - } - } - throw new RuntimeException(e); - } catch (UncheckedExecutionException e) { - Throwable cause = e.getCause(); - if (cause instanceof NotFoundException) { - throw new SchemaNotFoundException(key.toString(), cause); - } else if (cause instanceof RuntimeException) { - // Do not expose cache implementation details to the caller - throw (RuntimeException) cause; - } else { - // Should not happen, best option is to rethrow - throw e; - } - } - } - - public SchemaVersionInfo getSchemaIfPresent(SchemaVersionInfoCache.Key key) throws SchemaNotFoundException { - LOG.debug("Trying to get entry from cache if it is present in local cache with key [{}]", key); - return loadingCache.getIfPresent(key); - } - - public void invalidateSchema(SchemaVersionInfoCache.Key key) { - LOG.debug("Invalidating cache entry for key [{}]", key); - loadingCache.invalidate(key); - - SchemaVersionKey schemaVersionKey = - key.schemaIdVersion != null ? idWithNameVersion.get(key.schemaIdVersion) : key.schemaVersionKey; - - // it can be null if it is not accessed earlier. - if (schemaVersionKey != null) { - loadingCache.invalidate(Key.of(schemaVersionKey)); - List schemaIdVersions = nameVersionWithIds.get(schemaVersionKey); - if (schemaIdVersions != null) { - for (SchemaIdVersion schemaIdVersion : schemaIdVersions) { - loadingCache.invalidate(Key.of(schemaIdVersion)); - } - } - } - } - - public void invalidateAll() { - LOG.info("Invalidating all the cache entries"); - - loadingCache.invalidateAll(); - } - - @Override - public SchemaRegistryCacheType getCacheType() { - return SchemaRegistryCacheType.SCHEMA_VERSION_CACHE; - } - - @JsonIgnoreProperties(ignoreUnknown = true) - public static class Key { - - @JsonProperty - private SchemaVersionKey schemaVersionKey; - - @JsonProperty - private SchemaIdVersion schemaIdVersion; - - public Key(SchemaVersionKey schemaVersionKey) { - this.schemaVersionKey = schemaVersionKey; - } - - public Key(SchemaIdVersion schemaIdVersion) { - this.schemaIdVersion = schemaIdVersion; - } - - public static Key of(SchemaVersionKey schemaVersionKey) { - return new Key(schemaVersionKey); - } - - public static Key of(SchemaIdVersion schemaIdVersion) { - return new Key(schemaIdVersion); - } - - // For JSON serialization/deserialization - private Key() { - - } - - @Override - public String toString() { - return "Key {" + - "schemaVersionKey=" + schemaVersionKey + - ", schemaIdVersion=" + schemaIdVersion + - '}'; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - Key key = (Key) o; - - if (schemaVersionKey != null ? !schemaVersionKey.equals(key.schemaVersionKey) - : key.schemaVersionKey != null) { - return false; - } - return schemaIdVersion != null ? schemaIdVersion.equals(key.schemaIdVersion) : key.schemaIdVersion == null; - } - - @Override - public int hashCode() { - int result = schemaVersionKey != null ? schemaVersionKey.hashCode() : 0; - result = 31 * result + (schemaIdVersion != null ? schemaIdVersion.hashCode() : 0); - return result; - } - } - -} diff --git a/schema-registry/schema-registry-common/src/test/java/com/hortonworks/registries/schemaregistry/SchemaVersionInfoCacheTest.java b/schema-registry/schema-registry-common/src/test/java/com/hortonworks/registries/schemaregistry/SchemaVersionInfoCacheTest.java deleted file mode 100644 index cf1e2df9e..000000000 --- a/schema-registry/schema-registry-common/src/test/java/com/hortonworks/registries/schemaregistry/SchemaVersionInfoCacheTest.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Copyright 2016-2019 Cloudera, Inc. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.hortonworks.registries.schemaregistry; - -import com.google.common.collect.Sets; -import com.hortonworks.registries.schemaregistry.cache.SchemaVersionInfoCache; -import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException; -import com.hortonworks.registries.schemaregistry.state.SchemaVersionLifecycleStates; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; - -/** - * - */ -public class SchemaVersionInfoCacheTest { - - @Test - public void testSchemaVersionCache() throws Exception { - - final Map schemaIdWithVersionInfo = new HashMap<>(); - final Map schemaKeyWithVersionInfo = new HashMap<>(); - - long schemaMetadataId = 1L; - String schemaName = "schema-1"; - Integer version = 2; - long schemaVersionId = 3L; - SchemaVersionInfo schemaVersionInfo = - new SchemaVersionInfo(schemaVersionId, schemaName, version, schemaMetadataId, "schema-text", - System.currentTimeMillis(), "schema-description", SchemaVersionLifecycleStates.ENABLED.getId()); - SchemaIdVersion withVersionId = new SchemaIdVersion(schemaVersionId); - SchemaIdVersion withMetaIdAndVersion = new SchemaIdVersion(schemaMetadataId, version); - SchemaIdVersion withBoth = new SchemaIdVersion(schemaMetadataId, version, schemaVersionId); - SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version); - schemaKeyWithVersionInfo.put(schemaVersionKey, schemaVersionInfo); - HashSet allIdVersions = Sets.newHashSet(withVersionId, withMetaIdAndVersion, withBoth); - allIdVersions.stream().forEach(x -> schemaIdWithVersionInfo.put(x, schemaVersionInfo)); - - SchemaVersionInfo otherSchemaVersionInfo = new SchemaVersionInfo(schemaVersionId + 1, "other-" + schemaName, version, schemaMetadataId + 1L, - "other-schema-text", System.currentTimeMillis(), "schema-description", SchemaVersionLifecycleStates.ENABLED.getId()); - SchemaIdVersion otherIdVersion = new SchemaIdVersion(otherSchemaVersionInfo.getId()); - SchemaVersionKey otherSchemaVersionKey = new SchemaVersionKey(otherSchemaVersionInfo.getName(), otherSchemaVersionInfo - .getVersion()); - schemaIdWithVersionInfo.put(otherIdVersion, otherSchemaVersionInfo); - schemaKeyWithVersionInfo.put(otherSchemaVersionKey, otherSchemaVersionInfo); - - - SchemaVersionRetriever schemaRetriever = new SchemaVersionRetriever() { - @Override - public SchemaVersionInfo retrieveSchemaVersion(SchemaVersionKey key) throws SchemaNotFoundException { - return schemaKeyWithVersionInfo.get(key); - } - - @Override - public SchemaVersionInfo retrieveSchemaVersion(SchemaIdVersion key) throws SchemaNotFoundException { - return schemaIdWithVersionInfo.get(key); - } - }; - - SchemaVersionInfoCache schemaVersionInfoCache = new SchemaVersionInfoCache(schemaRetriever, 32, 60 * 1000L); - - // invalidate key without accessing earlier. - for (SchemaVersionKey versionKey : schemaKeyWithVersionInfo.keySet()) { - schemaVersionInfoCache.invalidateSchema(SchemaVersionInfoCache.Key.of(versionKey)); - } - - for (SchemaIdVersion schemaIdVersion : schemaIdWithVersionInfo.keySet()) { - schemaVersionInfoCache.invalidateSchema(SchemaVersionInfoCache.Key.of(schemaIdVersion)); - } - - // access with all version id keys - for (SchemaIdVersion key : allIdVersions) { - SchemaVersionInfo recvdSchemaVersionInfo = schemaVersionInfoCache.getSchema(SchemaVersionInfoCache.Key.of(key)); - Assertions.assertEquals(schemaVersionInfo, recvdSchemaVersionInfo); - } - - // access with version key - SchemaVersionInfo recvdSchemaVersionInfo = schemaVersionInfoCache.getSchema(SchemaVersionInfoCache.Key.of(schemaVersionKey)); - Assertions.assertEquals(schemaVersionInfo, recvdSchemaVersionInfo); - - // invalidate one kind of schemaIdVersion, and all other combinations should return null - schemaVersionInfoCache.invalidateSchema(SchemaVersionInfoCache.Key.of(withVersionId)); - for (SchemaIdVersion idVersion : allIdVersions) { - recvdSchemaVersionInfo = schemaVersionInfoCache.getSchemaIfPresent(SchemaVersionInfoCache.Key.of(idVersion)); - Assertions.assertNull(recvdSchemaVersionInfo); - } - - SchemaVersionInfo recvdOtherSchemaVersionInfo = schemaVersionInfoCache.getSchema(SchemaVersionInfoCache.Key.of(otherSchemaVersionKey)); - Assertions.assertEquals(otherSchemaVersionInfo, recvdOtherSchemaVersionInfo); - recvdOtherSchemaVersionInfo = schemaVersionInfoCache.getSchema(SchemaVersionInfoCache.Key.of(otherIdVersion)); - Assertions.assertEquals(otherSchemaVersionInfo, recvdOtherSchemaVersionInfo); - - // all values for these keys should be non existent in cache - for (SchemaIdVersion idVersion : allIdVersions) { - Assertions.assertNull(schemaVersionInfoCache.getSchemaIfPresent(SchemaVersionInfoCache.Key.of(idVersion))); - } - - // all values for these keys should be loaded - for (SchemaIdVersion idVersion : allIdVersions) { - Assertions.assertEquals(schemaVersionInfo, schemaVersionInfoCache.getSchema(SchemaVersionInfoCache.Key.of(idVersion))); - } - - // all values for these keys should exist locally without loading from target - for (SchemaIdVersion idVersion : allIdVersions) { - Assertions.assertEquals(schemaVersionInfo, schemaVersionInfoCache.getSchemaIfPresent(SchemaVersionInfoCache.Key.of(idVersion))); - } - - } -}