Skip to content

Latest commit

 

History

History
132 lines (91 loc) · 4.52 KB

README_Source.md

File metadata and controls

132 lines (91 loc) · 4.52 KB

Kafka Connect Cosmos DB Source Connector

The Azure Cosmos DB Source connector provides the capability to read data from the Cosmos DB Change Feed and publish this data to a Kafka topic.

Installation

Install Connector Manually

Download and extract the ZIP file for your connector and follow the manual connector installation instructions

Configuration

At the moment the following settings can be configured by means of the connector.properties file. For a config file containing default settings see this example.

All configuation properties for the source connector are prefixed with connect.cosmosdb. e.g. connect.cosmosdb.databasename

Name Description Type Default Valid Values Importance
databasename name of the database to write to string
master.key the configured master key for Cosmos DB string
endpoint the endpoint for the Cosmos DB Account uri
task.poll.interval int

Kafka Connect Converter Configuration

Data will always be read from Cosmos DB as JSON.

The key.converter and value.converter configuration should be set according to how you want the data serialized when written to the Kafka topic.

If the data in Cosmos DB contains the schema embedded in the document and it is in the following format -

then you can configure the value.converter to use JSON with Schema by setting the following configuration:

It is possible to have the Source connector output CSV string by using StringConverter as follows:

Quick Start

Prerequisites

Create Azure Cosmos DB Instance, Database and Collection

Create a new Azure Resource Group for this quickstart, then add to it a Cosmos DB Account, Database and Collection using the Azure CLI

# create cosmosdb account

# create database

# create collection

Install connector

# install the connector (run from your CP installation directory)

# start conluent platform
confluent local start

Insert document in to Cosmos DB

Insert a new document in to Cosmos DB using the Azure CLI

Verify the record is in Cosmos DB

Load the connector

Create azure-cosmosdb.json file with the following contents:

{
  "name": "azure-cosmosdb",
  "config": {
    "tasks.max": "1",
    "connector.class": "com.microsoft.azure.cosmosdb.kafka.connect.source.CosmosDBSourceConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://localhost:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://localhost:8081",
    "confluent.topic.bootstrap.servers": "localhost:9092",
    "confluent.topic.replication.factor": "1"
  }
}

Load the Azure Cosmos DB Source Connector

confluent local load azure-cosmosdb -- -d path/to/azure-cosmosdb.json

Confirm that the connector is in a RUNNING state.

confluent local status azure-cosmosdb

Confirm that the messages were delivered to the result topic in Kafka

Cleanup

Delete the connector

confluent local unload azure-cosmosdb

Stop Confluent Platform

confluent local stop

Delete the created Azure Cosmos DB service and its resource group using Azure CLI.