Skip to content

Commit

Permalink
Enable stream capable state and add a config.json file to resources o…
Browse files Browse the repository at this point in the history
…f strict encrypt

Enable per stream state for the last suite of strict encrypt tests

Final per stream state enable

Add stream capable state to MysqlDatatype

Enable it for mysqlssl
  • Loading branch information
nguyenaiden committed Sep 21, 2023
1 parent 89ba1b9 commit 9aa88a6
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.MySqlUtils;
Expand All @@ -25,7 +26,7 @@ public abstract class AbstractMySqlSslCertificateStrictEncryptSourceAcceptanceTe

@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {

environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
container = new MySQLContainer<>("mysql:8.0");
container.start();
addTestData(container);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.db.Database;
Expand All @@ -30,10 +31,17 @@
import java.util.HashMap;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.MySQLContainer;
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;

@ExtendWith(SystemStubsExtension.class)
public class MySqlStrictEncryptSourceAcceptanceTest extends SourceAcceptanceTest {

@SystemStub
public EnvironmentVariables environmentVariables;
private static final String STREAM_NAME = "id_and_name";
private static final String STREAM_NAME2 = "public.starships";

Expand All @@ -42,6 +50,7 @@ public class MySqlStrictEncryptSourceAcceptanceTest extends SourceAcceptanceTest

@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
container = new MySQLContainer<>("mysql:8.0");
container.start();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"host": "default",
"port": 5555,
"database": "default",
"username": "default",
"replication_method": { "method": "STANDARD" }
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils.SslMode;
import io.airbyte.integrations.source.mysql.cursor_based.MySqlCursorBasedStateManager;
import io.airbyte.integrations.source.mysql.helpers.CdcConfigurationHelper;
import io.airbyte.integrations.source.mysql.initialsync.MySqlFeatureFlags;
import io.airbyte.integrations.source.mysql.initialsync.MySqlInitialLoadHandler;
import io.airbyte.integrations.source.mysql.initialsync.MySqlInitialLoadStreamStateManager;
import io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil;
Expand Down Expand Up @@ -349,7 +348,6 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
final Instant emittedAt) {

final JsonNode sourceConfig = database.getSourceConfig();
final MySqlFeatureFlags mySqlFeatureFlags = new MySqlFeatureFlags(sourceConfig);
if (isCdc(sourceConfig) && isAnyStreamIncrementalSyncMode(catalog)) {
LOGGER.info("Using PK + CDC");
return MySqlInitialReadUtil.getCdcReadIterators(database, catalog, tableNameToTable, stateManager, emittedAt, getQuoteString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class MySqlFeatureFlags {
public MySqlFeatureFlags(final JsonNode sourceConfig) {
this.sourceConfig = sourceConfig;
}

private boolean getFlagValue(final String flag) {
return sourceConfig.has(flag) && sourceConfig.get(flag).asBoolean();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.MySqlUtils;
Expand All @@ -26,7 +27,7 @@ public abstract class AbstractMySqlSslCertificateSourceAcceptanceTest extends My

@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {

environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
container = new MySQLContainer<>("mysql:8.0");
container.start();
addTestData(container);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@
public class MySqlSourceAcceptanceTest extends SourceAcceptanceTest {

@SystemStub
private EnvironmentVariables environmentVariables;

public EnvironmentVariables environmentVariables;
private static final String STREAM_NAME = "id_and_name";
private static final String STREAM_NAME2 = "public.starships";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.factory.DSLContextFactory;
Expand All @@ -15,9 +16,16 @@
import io.airbyte.integrations.util.HostPortResolver;
import java.util.Map;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.MySQLContainer;
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;

@ExtendWith(SystemStubsExtension.class)
public class MySqlSourceDatatypeTest extends AbstractMySqlSourceDatatypeTest {
@SystemStub
private EnvironmentVariables environmentVariables;

@Override
protected void tearDown(final TestDestinationEnv testEnv) {
Expand All @@ -26,6 +34,7 @@ protected void tearDown(final TestDestinationEnv testEnv) {

@Override
protected Database setupDatabase() throws Exception {
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
container = new MySQLContainer<>("mysql:8.0");
container.start();
final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.factory.DSLContextFactory;
Expand All @@ -21,6 +22,7 @@ public class MySqlSslSourceAcceptanceTest extends MySqlSourceAcceptanceTest {

@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
container = new MySQLContainer<>("mysql:8.0");
container.start();
final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
Expand Down

0 comments on commit 9aa88a6

Please sign in to comment.