Skip to content

Commit

Permalink
apply fix
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaohansong committed May 6, 2024
1 parent 0b5ab01 commit 6e682aa
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
// sync, the
// data is replicated as expected.
@Throws(Exception::class)
fun testCdcAndNonResumableFullRefreshInSameSync() {
protected open fun testCdcAndNonResumableFullRefreshInSameSync() {
val configuredCatalog = Jsons.clone(configuredCatalog)

val MODEL_RECORDS_2: List<JsonNode> =
Expand All @@ -734,7 +734,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
createTableSqlFmt(),
modelsSchema(),
MODELS_STREAM_NAME_2,
columnClause(columns, Optional.of(COL_ID)),
columnClause(columns, Optional.empty()),
)

for (recordJson in MODEL_RECORDS_2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,9 +420,10 @@ abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {
setEmittedAtToNull(actualMessages)

val expectedMessages = airbyteMessagesReadOneColumn
Assertions.assertEquals(expectedMessages.size, actualMessages.size)
Assertions.assertTrue(expectedMessages.containsAll(actualMessages))
Assertions.assertTrue(actualMessages.containsAll(expectedMessages))
val actualRecordMessages = filterRecords(actualMessages)
Assertions.assertEquals(expectedMessages.size, actualRecordMessages.size)
Assertions.assertTrue(expectedMessages.containsAll(actualRecordMessages))
Assertions.assertTrue(actualRecordMessages.containsAll(expectedMessages))
}

protected open val airbyteMessagesReadOneColumn: List<AirbyteMessage>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.ConnectorSpecification;
import io.airbyte.protocol.models.v0.SyncMode;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
Expand Down Expand Up @@ -490,15 +491,15 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
throw new RuntimeException(e);
}

recategoriseStreamsForXmin(database, catalog, stateManager);
finalListOfStreamsToBeSyncedViaCtid = finalListOfStreamsToBeSyncedViaCtid.stream()
.filter(streamUnderCheck -> streamUnderCheck.getSyncMode() == SyncMode.INCREMENTAL).collect(toList());

final FileNodeHandler fileNodeHandler =
PostgresQueryUtils.fileNodeForStreams(database,
finalListOfStreamsToBeSyncedViaCtid,
getQuoteString());

ctidStateManager.setStreamStateIteratorFields(namespacePair -> Jsons.jsonNode(xminStatus));
ctidStateManager.setFileNodeHandler(fileNodeHandler);
final PostgresCtidHandler ctidHandler =
createInitialLoader(database, finalListOfStreamsToBeSyncedViaCtid, fileNodeHandler, getQuoteString(), ctidStateManager,
Optional.empty());
Expand Down Expand Up @@ -535,7 +536,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
} else if (isAnyStreamIncrementalSyncMode(catalog)) {
final PostgresCursorBasedStateManager postgresCursorBasedStateManager =
new PostgresCursorBasedStateManager(stateManager.getRawStateMessages(), catalog);
recategoriseForCursorBased(database, catalog, stateManager);
recategoriseForCursorBased(database, catalog, stateManager, true);

final FileNodeHandler fileNodeHandler =
PostgresQueryUtils.fileNodeForStreams(database,
Expand All @@ -561,7 +562,6 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
}

ctidStateManager.setStreamStateIteratorFields(namespacePair -> Jsons.jsonNode(cursorBasedStatusMap.get(namespacePair)));
ctidStateManager.setFileNodeHandler(fileNodeHandler);
final PostgresCtidHandler cursorBasedCtidHandler =
createInitialLoader(database, finalListOfStreamsToBeSyncedViaCtid, fileNodeHandler, getQuoteString(), ctidStateManager, Optional.empty());

Expand Down Expand Up @@ -707,7 +707,8 @@ public AirbyteConnectionStatus check(final JsonNode config) throws Exception {

private void recategoriseStreamsForXmin(final JdbcDatabase database,
final ConfiguredAirbyteCatalog catalog,
final StateManager stateManager) {
final StateManager stateManager,
final boolean incrementalModeOnly) {
final XminStatus xminStatus;
try {
xminStatus = PostgresQueryUtils.getXminStatus(database);
Expand Down Expand Up @@ -736,11 +737,15 @@ private void recategoriseStreamsForXmin(final JdbcDatabase database,
finalListOfStreamsToBeSyncedViaCtid =
filterStreamsUnderVacuumForCtidSync(streamsUnderVacuum.result(), xminStreamsCategorised.ctidStreams());
}
if (incrementalModeOnly) {
finalListOfStreamsToBeSyncedViaCtid = filterIncrementalSyncModeStreams(finalListOfStreamsToBeSyncedViaCtid);
}
}

private void recategoriseForCursorBased(final JdbcDatabase database,
final ConfiguredAirbyteCatalog catalog,
final StateManager postgresCursorBasedStateManager) {
final StateManager postgresCursorBasedStateManager,
final boolean incrementalModeOnly) {

cursorBasedStreamsCategorised = categoriseStreams(postgresCursorBasedStateManager, catalog);
final ResultWithFailed<List<AirbyteStreamNameNamespacePair>> streamsUnderVacuum = streamsUnderVacuum(database,
Expand All @@ -765,6 +770,13 @@ private void recategoriseForCursorBased(final JdbcDatabase database,
finalListOfStreamsToBeSyncedViaCtid =
filterStreamsUnderVacuumForCtidSync(streamsUnderVacuum.result(), cursorBasedStreamsCategorised.ctidStreams());
}
if (incrementalModeOnly) {
finalListOfStreamsToBeSyncedViaCtid = filterIncrementalSyncModeStreams(finalListOfStreamsToBeSyncedViaCtid);
}
}

private List<ConfiguredAirbyteStream> filterIncrementalSyncModeStreams(final List<ConfiguredAirbyteStream> allStreams) {
return allStreams.stream().filter(streamUnderCheck -> streamUnderCheck.getSyncMode() == SyncMode.INCREMENTAL).collect(toList());
}

@Override
Expand All @@ -783,21 +795,23 @@ protected void initializeForStateManager(final JdbcDatabase database,
ctidStateManager = getCtidInitialLoadGlobalStateManager(database, catalog, stateManager, getQuoteString(), savedOffsetAfterReplicationSlotLSN);
} else {
if (isXmin(sourceConfig)) {
recategoriseStreamsForXmin(database, catalog, stateManager);
recategoriseStreamsForXmin(database, catalog, stateManager, /* incrementalOnly= */false);
final FileNodeHandler fileNodeHandler =
PostgresQueryUtils.fileNodeForStreams(database,
finalListOfStreamsToBeSyncedViaCtid,
getQuoteString());
ctidStateManager = new CtidPerStreamStateManager(xminStreamsCategorised.ctidStreams().statesFromCtidSync(), fileNodeHandler);
ctidStateManager.setFileNodeHandler(fileNodeHandler);
} else {
recategoriseForCursorBased(database, catalog, stateManager);
recategoriseForCursorBased(database, catalog, stateManager, /* incrementalOnly= */false);
final FileNodeHandler fileNodeHandler =
PostgresQueryUtils.fileNodeForStreams(database,
finalListOfStreamsToBeSyncedViaCtid,
getQuoteString());

ctidStateManager =
new CtidPerStreamStateManager(cursorBasedStreamsCategorised.ctidStreams().statesFromCtidSync(), fileNodeHandler);
ctidStateManager.setFileNodeHandler(fileNodeHandler);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public AirbyteStateMessage createCtidStateMessage(final AirbyteStreamNameNamespa
@Override
public AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamespacePair pair, final JsonNode streamStateForIncrementalRun) {
if (streamStateForIncrementalRun == null || streamStateForIncrementalRun.isEmpty()) {
// resumeable full refresh
// resumeable full refresh for cursor based stream.
var ctidStatus = pairToCtidStatus.get(pair);
return createCtidStateMessage(pair, ctidStatus);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,10 @@ private void assertStateTypes(final List<? extends AirbyteStateMessage> stateMes
}
}

@Override
@Test
protected void testCdcAndNonResumableFullRefreshInSameSync() throws Exception {}

@Override
protected void assertStateMessagesForNewTableSnapshotTest(final List<? extends AirbyteStateMessage> stateMessages,
final AirbyteStateMessage stateMessageEmittedAfterFirstSyncCompletion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ void testReadIncrementalSuccessWithFullRefresh() throws Exception {
// we need to get the last state for the "incremental stream", which is not necessarily the last
// state message of the batch.
final AirbyteStateMessage lastEmittedState = getLastStateMessageOfStream(stateAfterFirstBatch, STREAM_NAME);

final JsonNode state = Jsons.jsonNode(List.of(lastEmittedState));

testdb.query(ctx -> {
Expand Down

0 comments on commit 6e682aa

Please sign in to comment.