diff --git a/pom.xml b/pom.xml
index 34c0b0b..63ded66 100644
--- a/pom.xml
+++ b/pom.xml
@@ -19,7 +19,7 @@
org.apache.kafka
connect-api
- 0.11.0.2
+ 1.0.0
com.amazonaws
@@ -37,6 +37,11 @@
6.9.10
test
+
+ com.amazonaws
+ aws-java-sdk-sts
+ 1.11.265
+
diff --git a/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkConnector.java b/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkConnector.java
index 7adcdd5..beeb6b2 100644
--- a/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkConnector.java
+++ b/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkConnector.java
@@ -11,6 +11,7 @@
import org.apache.kafka.connect.sink.SinkConnector;
public class AmazonKinesisSinkConnector extends SinkConnector {
+ public static final String STS_SESSION_NAME_DEFAULT = "AmazonKinesisSink";
public static final String REGION = "region";
@@ -46,6 +47,10 @@ public class AmazonKinesisSinkConnector extends SinkConnector {
public static final String SLEEP_CYCLES = "sleepCycles";
+ public static final String PRODUCER_ROLE = "producerRole";
+
+ public static final String STS_SESSION_NAME = "stsSessionName";
+
private String region;
private String streamName;
@@ -80,6 +85,10 @@ public class AmazonKinesisSinkConnector extends SinkConnector {
private String sleepCycles;
+ private String producerRole;
+
+ private String stsSessionName;
+
@Override
public void start(Map props) {
region = props.get(REGION);
@@ -99,6 +108,8 @@ public void start(Map props) {
outstandingRecordsThreshold = props.get(OUTSTANDING_RECORDS_THRESHOLD);
sleepPeriod = props.get(SLEEP_PERIOD);
sleepCycles = props.get(SLEEP_CYCLES);
+ producerRole = props.get(PRODUCER_ROLE);
+ stsSessionName = props.getOrDefault(STS_SESSION_NAME, STS_SESSION_NAME_DEFAULT);
}
@Override
@@ -198,6 +209,12 @@ public List