diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/CollectionUtils.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/CollectionUtils.java index e70fe2185348..bb8e4375b29e 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/CollectionUtils.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/CollectionUtils.java @@ -19,6 +19,16 @@ public static boolean containsIgnoreCase(final Collection collection, fi return matchingKey(collection, search).isPresent(); } + /** + * Convenience method for when you need to check an entire collection for membership in another collection. + * @param searchCollection the collection you want to check membership in + * @param searchTerms the keys you're looking for + * @return whether all searchTerms are in the searchCollection + */ + public static boolean containsAllIgnoreCase(final Collection searchCollection, final Collection searchTerms) { + return searchTerms.stream().allMatch(term -> containsIgnoreCase(searchCollection, term)); + } + /** * From a collection of strings, return an entry which matches the search term ignoring case * @param collection the collection to search diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/SqlGenerator.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/SqlGenerator.java index c35b9de62822..b02e640546a3 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/SqlGenerator.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/SqlGenerator.java @@ -5,11 +5,13 @@ package io.airbyte.integrations.base.destination.typing_deduping; import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser.StreamConfig; +import org.apache.commons.lang3.StringUtils; import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.function.Supplier; import java.util.stream.Stream; public interface SqlGenerator { diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java index 79f2e5cff9ca..676de233bd84 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java @@ -4,6 +4,7 @@ package io.airbyte.integrations.destination.bigquery.typing_deduping; +import static io.airbyte.integrations.base.destination.typing_deduping.CollectionUtils.containsAllIgnoreCase; import static io.airbyte.integrations.base.destination.typing_deduping.CollectionUtils.containsIgnoreCase; import static io.airbyte.integrations.base.destination.typing_deduping.CollectionUtils.matchingKey; import static java.util.function.Predicate.not; @@ -177,7 +178,9 @@ public StandardSQLTypeName toDialectType(final AirbyteProtocolType airbyteProtoc @Override public String createTable(final StreamConfig stream, final String suffix) { final String columnDeclarations = columnsAndTypes(stream); - final String clusterConfig = String.join(", ", clusteringColumns(stream)); + final String clusterConfig = clusteringColumns(stream).stream() + .map(c -> StringUtils.wrap(c, QUOTE)) + .collect(joining(", ")); return new StringSubstitutor(Map.of( "final_namespace", stream.id().finalNamespace(QUOTE), @@ -187,7 +190,7 @@ public String createTable(final StreamConfig stream, final String suffix) { """ CREATE SCHEMA IF NOT EXISTS ${final_namespace}; - CREATE TABLE ${final_table_id} ( + CREATE OR REPLACE TABLE ${final_table_id} ( _airbyte_raw_id STRING NOT NULL, _airbyte_extracted_at TIMESTAMP NOT NULL, _airbyte_meta JSON NOT NULL, @@ -204,7 +207,7 @@ private List clusteringColumns(final StreamConfig stream) { // We're doing deduping, therefore we have a primary key. // Cluster on all the PK columns stream.primaryKey().forEach(columnId -> { - clusterColumns.add(columnId.name(QUOTE)); + clusterColumns.add(columnId.name()); }); } clusterColumns.add("_airbyte_extracted_at"); @@ -236,15 +239,16 @@ public boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, alterTableReport.columnsToAdd(), alterTableReport.columnsToRemove(), alterTableReport.columnsToChangeType(), - alterTableReport.isDestinationV2Format()); + alterTableReport.isDestinationV2Format(), + tableClusteringMatches, + tablePartitioningMatches); return alterTableReport.isNoOp() && tableClusteringMatches && tablePartitioningMatches; } private boolean clusteringMatches(StreamConfig stream, StandardTableDefinition existingTable) { - return existingTable.getClustering() == null ? false : existingTable.getClustering().getFields().stream() - .map(String::toLowerCase) - .collect(Collectors.toSet()) - .containsAll(clusteringColumns(stream)); + return existingTable.getClustering() == null ? false : containsAllIgnoreCase( + existingTable.getClustering().getFields().stream().collect(Collectors.toSet()), + clusteringColumns(stream)); } private boolean partitioningMatches(StandardTableDefinition existingTable) { @@ -306,7 +310,7 @@ public List softReset(final StreamConfig stream, final TableDefinition e private String clearLoadedAt(StreamId streamId) { return new StringSubstitutor(Map.of("raw_table_id", streamId.rawTableId(QUOTE))) .replace(""" - UPDATE ${raw_table_id} SET _airbyte_loaded_at = NULL; + UPDATE ${raw_table_id} SET _airbyte_loaded_at = NULL WHERE 1=1; """); }