Skip to content

Commit

Permalink
Merge pull request #550 from microsoft/dev
Browse files Browse the repository at this point in the history
Release 1.14.0
  • Loading branch information
xinlian12 authored Feb 28, 2024
2 parents ecd051c + 00ee1f0 commit 8c1468d
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 17 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
## Release History

### 1.14.0 (2024-02-28)
#### New Features
* Updated `azure-cosmos` version to 4.56.0.

#### Key Bug Fixes
* Fixed `NullPointerException` in `CosmosDBSinkConnector` when `TRACE` level log is enabled and `SinkRecord` value being null. [PR 549](https://github.com/microsoft/kafka-connect-cosmosdb/pull/549)

#### Other Changes
* Added more DEBUG level logs in `CosmosDBSourceConnector`. [PR 549](https://github.com/microsoft/kafka-connect-cosmosdb/pull/549)

### 1.13.0 (2024-01-25)
#### New Features
* Updated `azure-cosmos` version to 4.54.0.
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<groupId>com.azure.cosmos.kafka</groupId>
<artifactId>kafka-connect-cosmos</artifactId>
<version>1.13.0</version>
<version>1.14.0</version>

<name> kafka-connect-cosmos</name>
<url>https://github.com/microsoft/kafka-connect-cosmosdb</url>
Expand Down Expand Up @@ -48,7 +48,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-cosmos</artifactId>
<version>4.54.0</version>
<version>4.56.0</version>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,14 @@ public void put(Collection<SinkRecord> records) {
if (record.key() != null) {
MDC.put(String.format("CosmosDbSink-%s", containerName), record.key().toString());
}
logger.trace("Writing record, value type: {}", record.value().getClass().getName());

logger.trace("Key Schema: {}", record.keySchema());
logger.trace("Value schema: {}", record.valueSchema());
if (record.value() != null) {
logger.trace("Writing record, value type: {}", record.value().getClass().getName());
logger.trace("Value schema: {}", record.valueSchema());
} else {
logger.trace("Record value is null");
}

Object recordValue;
if (record.value() instanceof Struct) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static java.lang.Thread.sleep;
import static java.util.Collections.singletonMap;
Expand Down Expand Up @@ -62,11 +63,11 @@ public String version() {

@Override
public void start(Map<String, String> map) {
logger.info("Starting CosmosDBSourceTask.");
logger.info("Worker {} Starting CosmosDBSourceTask.", this.config.getWorkerName());
config = new CosmosDBSourceConfig(map);
this.queue = new LinkedTransferQueue<>();

logger.info("Creating the client.");
logger.info("Worker {} Creating the client.", this.config.getWorkerName());
client = getCosmosClient(config);

// Initialize the database, feed and lease containers
Expand Down Expand Up @@ -101,7 +102,7 @@ public void start(Map<String, String> map) {
}
} // Wait for ChangeFeedProcessor to start.

logger.info("Started CosmosDB source task.");
logger.info("Worker {} Started CosmosDB source task.", this.config.getWorkerName());
}

private String getItemLsn(JsonNode item) {
Expand All @@ -121,11 +122,27 @@ public List<SourceRecord> poll() throws InterruptedException {
while (running.get()) {
fillRecords(records, topic);
if (records.isEmpty() || System.currentTimeMillis() > maxWaitTime) {
logger.info("Sending {} documents.", records.size());
break;
}
}

logger.info("Worker {}, Sending {} documents.", this.config.getWorkerName(), records.size());

if (logger.isDebugEnabled()) {
List<String> recordDetails =
records
.stream()
.map(sourceRecord -> String.format("[key %s - offset %s]", sourceRecord.key(), sourceRecord.sourceOffset()))
.collect(Collectors.toList());

logger.debug(
"Worker {}, sending {} documents",
this.config.getWorkerName(),
recordDetails
);
}

logger.debug("Worker {}, shouldFillMoreRecords {}", this.config.getWorkerName(), true);
this.shouldFillMoreRecords.set(true);
return records;
}
Expand Down Expand Up @@ -155,9 +172,6 @@ private void fillRecords(List<SourceRecord> records, String topic) throws Interr
// Get the latest lsn and record as offset
Map<String, Object> sourceOffset = singletonMap(OFFSET_KEY, getItemLsn(node));

if (logger.isDebugEnabled()) {
logger.debug("Latest offset is {}.", sourceOffset.get(OFFSET_KEY));
}
// Convert JSON to Kafka Connect struct and JSON schema
SchemaAndValue schemaAndValue = jsonToStruct.recordToSchemaAndValue(node);

Expand All @@ -176,21 +190,24 @@ private void fillRecords(List<SourceRecord> records, String topic) throws Interr
} else {
// If the buffer Size exceeds then do not remove the node.
if (logger.isDebugEnabled()) {
logger.debug("Adding record back to the queue since adding it exceeds the allowed buffer size {}", config.getTaskBufferSize());
logger.debug(
"Worker {} Adding record back to the queue since adding it exceeds the allowed buffer size {}",
this.config.getWorkerName(),
config.getTaskBufferSize());
}
this.queue.add(node);
break;
}
} catch (Exception e) {
logger.error("Failed to fill Source Records for Topic {}", topic);
logger.error("Worker {} Failed to fill Source Records for Topic {}", this.config.getWorkerName(), topic);
throw e;
}
}
}

@Override
public void stop() {
logger.info("Stopping CosmosDB source task.");
logger.info("Worker {} Stopping CosmosDB source task.", this.config.getWorkerName());
// NOTE: poll() method and stop() method are both called from the same thread,
// so it is important not to include any changes which may block both places forever
running.set(false);
Expand All @@ -200,18 +217,22 @@ public void stop() {
changeFeedProcessor.stop().block();
changeFeedProcessor = null;
}

if (this.client != null) {
this.client.close();
}
}

private CosmosAsyncClient getCosmosClient(CosmosDBSourceConfig config) {
logger.info("Creating Cosmos Client.");
logger.info("Worker {} Creating Cosmos Client.", this.config.getWorkerName());

CosmosClientBuilder cosmosClientBuilder = new CosmosClientBuilder()
.endpoint(config.getConnEndpoint())
.key(config.getConnKey())
.consistencyLevel(ConsistencyLevel.SESSION)
.contentResponseOnWriteEnabled(true)
.connectionSharingAcrossClientsEnabled(config.isConnectionSharingEnabled())
.userAgentSuffix(CosmosDBConfig.COSMOS_CLIENT_USER_AGENT_SUFFIX + version());
.userAgentSuffix(getUserAgentSuffix());

if (config.isGatewayModeEnabled()) {
cosmosClientBuilder.gatewayMode();
Expand All @@ -220,6 +241,10 @@ private CosmosAsyncClient getCosmosClient(CosmosDBSourceConfig config) {
return cosmosClientBuilder.buildAsyncClient();
}

private String getUserAgentSuffix() {
return CosmosDBConfig.COSMOS_CLIENT_USER_AGENT_SUFFIX + version() + "|" + this.config.getWorkerName();
}

private ChangeFeedProcessor getChangeFeedProcessor(
String hostName,
CosmosAsyncContainer feedContainer,
Expand All @@ -243,11 +268,24 @@ private ChangeFeedProcessor getChangeFeedProcessor(
}

protected void handleCosmosDbChanges(List<JsonNode> docs) {
if (docs != null) {
List<String> docIds =
docs
.stream()
.map(jsonNode -> jsonNode.get("id") != null ? jsonNode.get("id").asText() : "null")
.collect(Collectors.toList());
logger.debug(
"handleCosmosDbChanges - Worker {}, total docs {}, Details [{}].",
this.config.getWorkerName(),
docIds.size(),
docIds);
}

for (JsonNode document : docs) {
// Blocks for each transfer till it is processed by the poll method.
// If we fail before checkpointing then the new worker starts again.
try {
logger.trace("Queuing document");
logger.trace("Queuing document {}", document.get("id") != null ? document.get("id").asText() : "null");

// The item is being transferred to the queue, and the method will only return if the item has been polled from the queue.
// The queue is being continuously polled and then put into a batch list, but the batch list is not being flushed right away
Expand All @@ -264,6 +302,7 @@ protected void handleCosmosDbChanges(List<JsonNode> docs) {
if (docs.size() > 0) {
// it is important to flush the current batches to kafka as currently we are using lease container continuationToken for book marking
// so we would only want to move ahead of the book marking when all the records have been returned to kafka
logger.debug("Worker {}, shouldFillMoreRecords {}", this.config.getWorkerName(), false);
this.shouldFillMoreRecords.set(false);
}
}
Expand Down

0 comments on commit 8c1468d

Please sign in to comment.