Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
subodh1810 committed Aug 22, 2023
1 parent ea02ea4 commit b44af98
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.stream.Stream;
Expand All @@ -30,6 +31,7 @@
public class CtidIterator extends AbstractIterator<RowDataWithCtid> implements AutoCloseableIterator<RowDataWithCtid> {

private static final Logger LOGGER = LoggerFactory.getLogger(CtidIterator.class);
private static final int MAX_ALLOWED_RESYNCS = 5;
private static final int QUERY_TARGET_SIZE_GB = 1;
private static final double MEGABYTE = Math.pow(1024, 2);
private static final double GIGABYTE = MEGABYTE * 1024;
Expand All @@ -39,14 +41,17 @@ public class CtidIterator extends AbstractIterator<RowDataWithCtid> implements A
private final List<String> columnNames;
private final CtidStateManager ctidStateManager;
private final JdbcDatabase database;
private final Map<AirbyteStreamNameNamespacePair, Long> fileNodes;
private final String quoteString;
private final String schemaName;
private final CtidPostgresSourceOperations sourceOperations;
private final Queue<Pair<Ctid, Ctid>> subQueriesPlan;
private final String tableName;
private final long tableSize;
private AutoCloseableIterator<RowDataWithCtid> currentIterator;

private Long lastKnownFileNode;
private int numberOfTimesReSynced = 0;

public CtidIterator(final CtidStateManager ctidStateManager,
final JdbcDatabase database,
Expand All @@ -56,12 +61,14 @@ public CtidIterator(final CtidStateManager ctidStateManager,
final String schemaName,
final String tableName,
final long tableSize,
final long blockSize) {
final long blockSize,
final Map<AirbyteStreamNameNamespacePair, Long> fileNodes) {
this.airbyteStream = AirbyteStreamUtils.convertFromNameAndNamespace(tableName, schemaName);
this.blockSize = blockSize;
this.columnNames = columnNames;
this.ctidStateManager = ctidStateManager;
this.database = database;
this.fileNodes = fileNodes;
this.quoteString = quoteString;
this.schemaName = schemaName;
this.sourceOperations = sourceOperations;
Expand Down Expand Up @@ -92,9 +99,15 @@ protected RowDataWithCtid computeNext() {
fileNode,
lastKnownFileNode,
airbyteStream);
if (numberOfTimesReSynced > MAX_ALLOWED_RESYNCS) {
throw new RuntimeException("Airbyte has tried re-syncing stream " + airbyteStream + " more than " + MAX_ALLOWED_RESYNCS
+ " times but VACUUM is still happening in between the sync, Please reach out to the customer to understand their VACUUM frequency.");
}
subQueriesPlan.clear();
subQueriesPlan.addAll(ctidQueryPlan(Ctid.of(0, 0),
tableSize, blockSize, QUERY_TARGET_SIZE_GB));
fileNodes.put(airbyteStream, fileNode);
numberOfTimesReSynced++;
} else {
LOGGER.info("The latest file node {} for stream {} is equal to the last known file node {} known to Airbyte.",
fileNode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.time.OffsetDateTime;
import java.util.Iterator;
import java.util.Objects;
import java.util.function.Function;
import javax.annotation.CheckForNull;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
Expand All @@ -35,7 +36,7 @@ public class CtidStateIterator extends AbstractIterator<AirbyteMessage> implemen
private boolean hasEmittedFinalState;
private String lastCtid;
private final JsonNode streamStateForIncrementalRun;
private final long relationFileNode;
private final Function<AirbyteStreamNameNamespacePair, Long> relationFileNode;
private final CtidStateManager stateManager;
private long recordCount = 0L;
private Instant lastCheckpoint = Instant.now();
Expand All @@ -44,7 +45,7 @@ public class CtidStateIterator extends AbstractIterator<AirbyteMessage> implemen

public CtidStateIterator(final Iterator<AirbyteMessageWithCtid> messageIterator,
final AirbyteStreamNameNamespacePair pair,
final long relationFileNode,
final Function<AirbyteStreamNameNamespacePair, Long> relationFileNode,
final CtidStateManager stateManager,
final JsonNode streamStateForIncrementalRun,
final Duration checkpointDuration,
Expand All @@ -70,7 +71,7 @@ protected AirbyteMessage computeNext() {
.withStateType(StateType.CTID)
.withCtid(lastCtid)
.withIncrementalState(streamStateForIncrementalRun)
.withRelationFilenode(relationFileNode);
.withRelationFilenode(relationFileNode.apply(pair));
LOGGER.info("Emitting ctid state for stream {}, state is {}", pair, ctidStatus);
recordCount = 0L;
lastCheckpoint = Instant.now();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private AutoCloseableIterator<RowDataWithCtid> queryTableCtid(
final long blockSize) {

LOGGER.info("Queueing query for table: {}", tableName);
return new CtidIterator(ctidStateManager, database, sourceOperations, quoteString, columnNames, schemaName, tableName, tableSize, blockSize);
return new CtidIterator(ctidStateManager, database, sourceOperations, quoteString, columnNames, schemaName, tableName, tableSize, blockSize, fileNodes);
}

// Transforms the given iterator to create an {@link AirbyteRecordMessage}
Expand Down Expand Up @@ -169,9 +169,14 @@ private AutoCloseableIterator<AirbyteMessage> augmentWithState(final AutoCloseab
final JsonNode incrementalState =
(currentCtidStatus == null || currentCtidStatus.getIncrementalState() == null) ? streamStateForIncrementalRunSupplier.apply(pair)
: currentCtidStatus.getIncrementalState();
final Long latestFileNode = fileNodes.get(pair);
assert latestFileNode != null;

/**
* We use a function and not the direct value cause the fileNodes Map can be updated by {@link CtidIterator} if it identifies a VACUUM in the middle of the sync.
*/
final Function<AirbyteStreamNameNamespacePair, Long> latestFileNode = (p) -> {
final Long fileNode = fileNodes.get(p);
assert fileNode != null;
return fileNode;
};
final Duration syncCheckpointDuration =
config.get("sync_checkpoint_seconds") != null ? Duration.ofSeconds(config.get("sync_checkpoint_seconds").asLong())
: CtidStateIterator.SYNC_CHECKPOINT_DURATION;
Expand Down

0 comments on commit b44af98

Please sign in to comment.