Skip to content

Commit

Permalink
added support for custom templates
Browse files Browse the repository at this point in the history
  • Loading branch information
hifly81 committed Sep 16, 2024
1 parent 4d7d8b6 commit 53f516f
Show file tree
Hide file tree
Showing 17 changed files with 306 additions and 33 deletions.
45 changes: 45 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ tear-down.sh
JR Source Connector can be configured with:

- _**template**_: A valid JR existing template name. For a list of available templates see: https://jrnd.io/docs/#listing-existing-templates
- _**embedded_template**_: Location of a file containing a valid custom JR template. This property will take precedence over _template_. File must exist on Kafka Connect Worker nodes.
- _**topic**_: target topic
- _**frequency**_: Repeat the creation of a random object every X milliseconds.
- _**objects**_: Number of objects to create at every run. Default is 1.
Expand Down Expand Up @@ -252,6 +253,50 @@ message shopping_rating {
}
```

### Custom template

A JR connector job with a custom template will be instantiated and produce 5 new random messages to _customer_ topic every 5 seconds, using the _Confluent Schema Registry_ to register the _Avro_ schema.
Template definition is loaded from file _/tmp/customer-template.json_.

```
{
"name" : "jr-avro-custom-quickstart",
"config": {
"connector.class" : "io.jrnd.kafka.connect.connector.JRSourceConnector",
"embedded_template" : "/tmp/customer-template.json",
"topic": "customer",
"frequency" : 5000,
"objects": 5,
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"tasks.max": 1
}
}
```

Consume from _customer_ topic:

```
kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic customer --from-beginning --property schema.registry.url=http://localhost:8081
{"customer_id":"6775933f-89c2-43b0-9eaf-e52e5f23293c","first_name":"Cynthia","last_name":"Foster","email":"cynthia.foster@hotmail.com","phone_number":"623 27678252","street_address":"Louisville, Cedar Lane 99, 21401","state":"Massachusetts","zip_code":"21401","country":"United States","country_code":"US"}
{"customer_id":"a15f891e-a3e7-4720-bf59-28202596c667","first_name":"Zachary","last_name":"Harris","email":"zachary.harris@aol.com","phone_number":"747 95821702","street_address":"Austin, River Road 8, 99801","state":"Illinois","zip_code":"99801","country":"United States","country_code":"US"}
{"customer_id":"8906111f-d6d3-4115-bd1a-3e231e3caaa2","first_name":"Julie","last_name":"Long","email":"julie.long@email.com","phone_number":"718 08720661","street_address":"Raleigh, Peachtree Street 43, 58501","state":"Georgia","zip_code":"58501","country":"United States","country_code":"US"}
{"customer_id":"9864ef53-eadf-4012-9cd0-c79e755169df","first_name":"Bryan","last_name":"Wilson","email":"bryan.wilson@mac.com","phone_number":"984 61669636","street_address":"San Antonio, Juniper Drive 23, 17101","state":"Illinois","zip_code":"17101","country":"United States","country_code":"US"}
{"customer_id":"a57911e5-dc9e-4da4-b280-1c0b0143538e","first_name":"Charles","last_name":"Thompson","email":"charles.thompson@gmail.com","phone_number":"726 39040449","street_address":"Richmond, Hillcrest Road 6, 43215","state":"Indiana","zip_code":"43215","country":"United States","country_code":"US"}
```

Show the _Avro_ schema registered:

```
curl -v http://localhost:8081/subjects/customer-value/versions/1/schema
< HTTP/1.1 200 OK
< Content-Type: application/vnd.schemaregistry.v1+json
{"type":"record","name":"recordRecord","fields":[{"name":"customer_id","type":"string"},{"name":"first_name","type":"string"},{"name":"last_name","type":"string"},{"name":"email","type":"string"},{"name":"phone_number","type":"string"},{"name":"street_address","type":"string"},{"name":"state","type":"string"},{"name":"zip_code","type":"string"},{"name":"country","type":"string"},{"name":"country_code","type":"string"}],"connect.name":"recordRecord"}
```

## Install the connector

- Download and extract the ZIP file from [releases](https://github.com/jrnd-io/jr-kafka-connect-source/releases).
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>

<groupId>io.jrnd</groupId>
<version>0.0.9</version>
<version>0.0.10</version>
<artifactId>jr-kafka-connect-source</artifactId>
<packaging>jar</packaging>

Expand Down
2 changes: 1 addition & 1 deletion quickstart/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ RUN CGO_ENABLED=1 GOOS=linux go build -tags static_all -v -ldflags="-X 'github.c

FROM confluentinc/cp-kafka-connect-base:7.7.0

ARG JR_SOURCE_CONNECTOR_VERSION=0.0.9
ARG JR_SOURCE_CONNECTOR_VERSION=0.0.10

COPY --from=builder /tmp/jr-main/templates/ /home/appuser/.jr/templates/
COPY --from=builder /tmp/jr-main/build/jr /bin
Expand Down
2 changes: 1 addition & 1 deletion quickstart/Dockerfile-arm64
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ RUN CGO_ENABLED=1 GOOS=linux go build -tags static_all -v -ldflags="-X 'github.c

FROM confluentinc/cp-kafka-connect-base:7.7.0

ARG JR_SOURCE_CONNECTOR_VERSION=0.0.9
ARG JR_SOURCE_CONNECTOR_VERSION=0.0.10

COPY --from=builder /tmp/jr-main/templates/ /home/appuser/.jr/templates/
COPY --from=builder /tmp/jr-main/build/jr /bin
Expand Down
2 changes: 1 addition & 1 deletion quickstart/build-image.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

DOCKERFILE=quickstart/Dockerfile
IMAGE_NAME=jrndio/kafka-connect-demo-image
IMAGE_VERSION=0.0.9
IMAGE_VERSION=0.0.10

if [[ $(uname -m) == 'arm64' ]]; then
DOCKERFILE=quickstart/Dockerfile-arm64
Expand Down
13 changes: 13 additions & 0 deletions quickstart/config/customer-template.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@

{
"customer_id": "{{uuid}}",
"first_name": "{{name}}",
"last_name": "{{surname}}",
"email": "{{email}}",
"phone_number": "{{phone}}",
"street_address": "{{city}}, {{street}} {{building 2}}, {{zip}}",
"state": "{{state}}",
"zip_code": "{{zip}}",
"country": "United States",
"country_code": "US"
}
13 changes: 13 additions & 0 deletions quickstart/config/jr-source.avro.custom.quickstart.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"name" : "jr-avro-custom-quickstart",
"config": {
"connector.class" : "io.jrnd.kafka.connect.connector.JRSourceConnector",
"embedded_template" : "/tmp/customer-template.json",
"topic": "customer",
"frequency" : 5000,
"objects": 5,
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"tasks.max": 1
}
}
4 changes: 3 additions & 1 deletion quickstart/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ services:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:9092'

connect:
image: jrndio/kafka-connect-demo-image:0.0.9
image: jrndio/kafka-connect-demo-image:0.0.10
hostname: connect
container_name: connect
depends_on:
- schema-registry
ports:
- "8083:8083"
volumes:
- ./config/customer-template.json:/tmp/customer-template.json
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:9092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
Expand Down
2 changes: 1 addition & 1 deletion src/assembly/manifest.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name" : "jr-source-connector",
"version" : "0.0.9",
"version" : "0.0.10",
"title" : "JR Source Connector",
"description" : "A Kafka Connector for JR, the leading streaming quality data generator.",
"owner" : {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package io.jrnd.kafka.connect.connector;

import io.jrnd.kafka.connect.connector.model.Template;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -86,7 +87,7 @@ public List<String> templates() {
}

public List<String> runTemplate(
String template,
Template templateWrapper,
int objects,
String keyField,
int keyValueLength) {
Expand All @@ -101,13 +102,13 @@ public List<String> runTemplate(

if(keyField == null || keyField.isEmpty()) {
commandBuilder.append(" run ");
commandBuilder.append(template);
commandBuilder.append(templateWrapper.isEmbedded()? "--embedded '" + templateWrapper.getTemplate() + "'":templateWrapper.getTemplate());
commandBuilder.append(" -n ");
commandBuilder.append(objects);
}
else {
commandBuilder.append(" run ");
commandBuilder.append(template);
commandBuilder.append(templateWrapper.isEmbedded()? "--embedded '" + templateWrapper.getTemplate() + "'":templateWrapper.getTemplate());
commandBuilder.append(" --key '{{key " + "\"{\\\"");
commandBuilder.append(keyField);
commandBuilder.append("\\\":\" ");
Expand All @@ -120,6 +121,9 @@ public List<String> runTemplate(

}

if (LOG.isDebugEnabled())
LOG.debug("JR command to execute {}", commandBuilder);

processBuilder.command(
CommandInterpeter.getInstance().getCommand(),
CommandInterpeter.getInstance().getArguments(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -37,6 +41,7 @@
public class JRSourceConnector extends SourceConnector {

public static final String JR_EXISTING_TEMPLATE = "template";
public static final String EMBEDDED_TEMPLATE = "embedded_template";
public static final String JR_EXECUTABLE_PATH = "jr_executable_path";
public static final String TOPIC_CONFIG = "topic";
public static final String POLL_CONFIG = "frequency";
Expand All @@ -50,6 +55,7 @@ public class JRSourceConnector extends SourceConnector {

private String topic;
private String template;
private String embeddedTemplate;
private Long pollMs;
private Integer objects;
private String keyField;
Expand All @@ -60,6 +66,7 @@ public class JRSourceConnector extends SourceConnector {

private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(JR_EXISTING_TEMPLATE, ConfigDef.Type.STRING, DEFAULT_TEMPLATE, ConfigDef.Importance.HIGH, "A valid JR existing template name.")
.define(EMBEDDED_TEMPLATE, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "Location of a file containing a valid custom JR template. This property will take precedence over 'template'.")
.define(TOPIC_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, "Topics to publish data to.")
.define(POLL_CONFIG, ConfigDef.Type.LONG, ConfigDef.Importance.HIGH, "Repeat the creation every X milliseconds.")
.define(OBJECTS_CONFIG, ConfigDef.Type.INT, 1, ConfigDef.Importance.HIGH, "Number of objects to create at every run.")
Expand All @@ -68,7 +75,6 @@ public class JRSourceConnector extends SourceConnector {
.define(JR_EXECUTABLE_PATH, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Location for JR executable on workers.")
.define(VALUE_CONVERTER, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "one between org.apache.kafka.connect.storage.StringConverter, io.confluent.connect.avro.AvroConverter, io.confluent.connect.json.JsonSchemaConverter or io.confluent.connect.protobuf.ProtobufConverter")
.define(KEY_CONVERTER, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "org.apache.kafka.connect.storage.StringConverter");
;

private static final Logger LOG = LoggerFactory.getLogger(JRSourceConnector.class);

Expand All @@ -80,21 +86,33 @@ public void start(Map<String, String> map) {
jrExecutablePath = parsedConfig.getString(JR_EXECUTABLE_PATH);
JRCommandExecutor jrCommandExecutor = JRCommandExecutor.getInstance(jrExecutablePath);

//check list of available templates
List<String> templates = jrCommandExecutor.templates();
if(templates.isEmpty())
throw new ConfigException("JR template list is empty");

template = parsedConfig.getString(JR_EXISTING_TEMPLATE);
if(template == null || template.isEmpty())
template = DEFAULT_TEMPLATE;
embeddedTemplate = parsedConfig.getString(EMBEDDED_TEMPLATE);
if (embeddedTemplate != null && !embeddedTemplate.isEmpty()) {
try {
embeddedTemplate = readFileToString(embeddedTemplate);
embeddedTemplate = embeddedTemplate.replaceAll("[\\n\\r]", "");
} catch (IOException e) {
throw new RuntimeException("can't read template from file.");
}
}

if(!templates.contains(template))
throw new ConfigException("'template' must be a valid JR template");
if((embeddedTemplate == null || embeddedTemplate.isEmpty())) {
template = parsedConfig.getString(JR_EXISTING_TEMPLATE);
if(template == null || template.isEmpty())
template = DEFAULT_TEMPLATE;

//list of available templates
List<String> templates = jrCommandExecutor.templates();
if(templates.isEmpty())
throw new ConfigException("JR template list is empty.");
if(!templates.contains(template)) {
throw new ConfigException("'template' must be a valid JR template.");
}
}

List<String> topics = parsedConfig.getList(TOPIC_CONFIG);
if (topics == null || topics.size() != 1) {
throw new ConfigException("'topic' configuration requires definition of a single topic");
throw new ConfigException("'topic' configuration requires definition of a single topic.");
}
topic = topics.get(0);

Expand All @@ -115,8 +133,8 @@ public void start(Map<String, String> map) {
valueConverter = StringConverter.class.getName();

if (LOG.isInfoEnabled())
LOG.info("Config: template: {} - topic: {} - frequency: {} - objects: {} - key_name: {} - key_value_interval_max: {} - executable path: {}",
template, topic, pollMs, objects, keyField, keyValueIntervalMax, jrExecutablePath);
LOG.info("Config: template: {} - embedded_template: {} - topic: {} - frequency: {} - objects: {} - key_name: {} - key_value_interval_max: {} - executable path: {}",
template, embeddedTemplate, topic, pollMs, objects, keyField, keyValueIntervalMax, jrExecutablePath);
}

@Override
Expand All @@ -128,7 +146,10 @@ public Class<? extends Task> taskClass() {
public List<Map<String, String>> taskConfigs(int i) {
ArrayList<Map<String, String>> configs = new ArrayList<>();
Map<String, String> config = new HashMap<>();
config.put(JR_EXISTING_TEMPLATE, template);
if(template != null && !template.isEmpty())
config.put(JR_EXISTING_TEMPLATE, template);
if(embeddedTemplate != null && !embeddedTemplate.isEmpty())
config.put(EMBEDDED_TEMPLATE, embeddedTemplate);
config.put(TOPIC_CONFIG, topic);
config.put(POLL_CONFIG, String.valueOf(pollMs));
config.put(OBJECTS_CONFIG, String.valueOf(objects));
Expand Down Expand Up @@ -157,6 +178,11 @@ public String version() {
return null;
}

private String readFileToString(String filePath) throws IOException {
Path path = Paths.get(filePath);
return Files.readString(path);
}

public Integer getObjects() {
return objects;
}
Expand All @@ -169,6 +195,10 @@ public String getTemplate() {
return template;
}

public String getEmbeddedTemplate() {
return embeddedTemplate;
}

public String getTopic() {
return topic;
}
Expand Down
Loading

0 comments on commit 53f516f

Please sign in to comment.