Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
annie-mac committed Oct 26, 2023
1 parent adae373 commit e56b9a1
Showing 1 changed file with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,20 @@ private String getItemLsn(JsonNode item) {
@Override
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> records = new ArrayList<>();

long maxWaitTime = System.currentTimeMillis() + config.getTaskTimeout();

TopicContainerMap topicContainerMap = config.getTopicContainerMap();
String topic = topicContainerMap.getTopicForContainer(config.getAssignedContainer()).orElseThrow(
() -> new IllegalStateException("No topic defined for container " + config.getAssignedContainer() + "."));

// When we have returned from the fillRecords, it means
// maxBufferSize has reached
// or maxTaskTime has reached
// or all the items from the changeFeed batches has been consumed
fillRecords(records, topic);

while (running.get()) {
fillRecords(records, topic);
if (records.isEmpty() || System.currentTimeMillis() > maxWaitTime || !this.shouldFillMoreRecords.get()) {
logger.info("Sending {} documents.", records.size());
break;
}
}

this.shouldFillMoreRecords.set(true);
return records;
Expand Down

0 comments on commit e56b9a1

Please sign in to comment.