Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

W-17312191: Upgrade AWS SDK v2 #1018

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions carbonj.service/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ RUN yum update -y && \
yum install -y gcc-c++ gcc make libtool automake autoconf make python3-devel && \
rpm --import http://repos.azulsystems.com/RPM-GPG-KEY-azulsystems && \
yum install -y https://cdn.azul.com/zulu/bin/zulu-repo-1.0.0-1.noarch.rpm && \
yum install -y https://mirror.stream.centos.org/9-stream/AppStream/$(uname -m)/os/Packages/pcp-conf-6.2.1-1.el9.$(uname -m).rpm && \
yum install -y https://mirror.stream.centos.org/9-stream/AppStream/$(uname -m)/os/Packages/pcp-libs-6.2.1-1.el9.$(uname -m).rpm && \
yum install -y https://mirror.stream.centos.org/9-stream/AppStream/$(uname -m)/os/Packages/pcp-conf-6.2.2-6.el9.$(uname -m).rpm && \
yum install -y https://mirror.stream.centos.org/9-stream/AppStream/$(uname -m)/os/Packages/pcp-libs-6.2.2-6.el9.$(uname -m).rpm && \
#
# If sysstat version is updated, confirm iolog.sh execution and update associated version check in entrypoint.sh
#
Expand Down
8 changes: 6 additions & 2 deletions carbonj.service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,12 @@ dependencies {
implementation group: 'io.netty', name: 'netty-all', version: "${nettyAll}"
implementation group: 'net.razorvine', name: 'pickle', version: "${pickle}"
implementation group: 'org.python', name: 'jython-standalone', version: "${jythonStandalone}"
implementation group: 'com.amazonaws', name: 'amazon-kinesis-client', version: "${amazonKinesisClient}"
implementation group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: "${awsJavaSdkV1}"
implementation group: 'org.apache.httpcomponents', name: 'httpclient', version: "${httpClient}"
implementation group: 'software.amazon.kinesis', name: 'amazon-kinesis-client', version: "${awsKinesisClient}"
implementation group: 'software.amazon.awssdk', name: 'kinesis', version: "${awsJavaSdkV2}"
implementation group: 'software.amazon.awssdk', name: 'sts', version: "${awsJavaSdkV2}"
implementation group: 'software.amazon.awssdk', name: 'dynamodb', version: "${awsJavaSdkV2}"
implementation group: 'software.amazon.awssdk', name: 'netty-nio-client', version: "${awsJavaSdkV2}"
implementation group: 'io.dropwizard.metrics', name: 'metrics-core', version: "${metrics}"
implementation group: 'io.dropwizard.metrics', name: 'metrics-jvm', version: "${metrics}"
implementation group: 'io.dropwizard.metrics', name: 'metrics-graphite', version: "${metrics}"
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@
import com.demandware.carbonj.service.ns.NamespaceCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.processor.SingleStreamTracker;
import software.amazon.kinesis.processor.StreamTracker;

import java.io.File;
import java.io.FileInputStream;
Expand All @@ -25,6 +32,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

public class Consumers {
Expand All @@ -36,32 +44,40 @@ public class Consumers {
private final PointProcessor pointProcessor;

private final KinesisConfig kinesisConfig;
private final CheckPointMgr<Date> checkPointMgr;

private final ConsumerRules consumerRules;

private final Map<String, KinesisConsumer> consumers;

private final String kinesisConsumerRegion;

private final PointProcessor recoveryPointProcessor;

private final NamespaceCounter namespaceCounter;

private final File indexNameSyncDir;

Consumers(MetricRegistry metricRegistry, PointProcessor pointProcessor, PointProcessor recoveryPointProcessor, File rulesFile,
KinesisConfig kinesisConfig, CheckPointMgr<Date> checkPointMgr, String kinesisConsumerRegion,
NamespaceCounter namespaceCounter, File indexNameSyncDir) {
private final String activeProfile;

private final KinesisAsyncClient kinesisAsyncClient;

private final DynamoDbAsyncClient dynamoDbAsyncClient;

private final CloudWatchAsyncClient cloudWatchAsyncClient;

private final int kinesisConsumerRetroSeconds;

Consumers(MetricRegistry metricRegistry, PointProcessor pointProcessor, File rulesFile, KinesisConfig kinesisConfig,
NamespaceCounter namespaceCounter, File indexNameSyncDir, String activeProfile,
KinesisAsyncClient kinesisAsyncClient, DynamoDbAsyncClient dynamoDbAsyncClient,
CloudWatchAsyncClient cloudWatchAsyncClient, int kinesisConsumerRetroSeconds) {

this.metricRegistry = metricRegistry;
this.pointProcessor = pointProcessor;
this.recoveryPointProcessor = recoveryPointProcessor;
this.kinesisConfig = kinesisConfig;
this.checkPointMgr = checkPointMgr;
this.kinesisConsumerRegion = kinesisConsumerRegion;
this.namespaceCounter = namespaceCounter;
this.indexNameSyncDir = indexNameSyncDir;
this.activeProfile = activeProfile;
this.kinesisAsyncClient = kinesisAsyncClient;
this.dynamoDbAsyncClient = dynamoDbAsyncClient;
this.cloudWatchAsyncClient = cloudWatchAsyncClient;
this.kinesisConsumerRetroSeconds = kinesisConsumerRetroSeconds;
consumers = new ConcurrentHashMap<>();
consumerRules = new ConsumerRules(rulesFile);
reload();
Expand Down Expand Up @@ -113,6 +129,7 @@ private void reconfigureConsumers(Set<String> newRules, Set<String> currentRules
/* create new consumers */
// we use the host name to generate the kinesis application name as they are stable for stable set pods.
String hostName = getHostName();
Date kinesisConsumerRetroDate = new Date(System.currentTimeMillis() - kinesisConsumerRetroSeconds * 1000L);
for (String consumerName : newRules) {
log.info(String.format("Creating new consumer with kinesis stream name: %s", consumerName));

Expand All @@ -139,8 +156,12 @@ private void reconfigureConsumers(Set<String> newRules, Set<String> currentRules
}

Counter initRetryCounter = metricRegistry.counter(MetricRegistry.name("kinesis.consumer." + kinesisStreamName + ".initRetryCounter"));
KinesisConsumer kinesisConsumer = new KinesisConsumer(metricRegistry, pointProcessor, recoveryPointProcessor, kinesisStreamName,
kinesisApplicationName, kinesisConfig, checkPointMgr, initRetryCounter, kinesisConsumerRegion);
StreamTracker streamTracker = new SingleStreamTracker(kinesisStreamName,
InitialPositionInStreamExtended.newInitialPositionAtTimestamp(kinesisConsumerRetroDate));
ConfigsBuilder configsBuilder = new ConfigsBuilder(streamTracker, kinesisApplicationName, kinesisAsyncClient,
dynamoDbAsyncClient, cloudWatchAsyncClient, UUID.randomUUID().toString(),
new KinesisRecordProcessorFactory(metricRegistry, pointProcessor, kinesisConfig, kinesisStreamName));
KinesisConsumer kinesisConsumer = new KinesisConsumer(kinesisStreamName, kinesisApplicationName, initRetryCounter, configsBuilder);
log.info(String.format("New Consumer created with name %s", kinesisStreamName));
newConsumers.add(consumerName);
consumers.put(consumerName, kinesisConsumer);
Expand Down Expand Up @@ -171,7 +192,7 @@ private String getHostName() {
}

private String getKinesisApplicationName(String streamName, String hostName) {
return streamName + "-" + hostName;
return streamName + "-" + hostName + "-" + activeProfile;
}

private void close(Set<String> consumerSet) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,25 @@
*/
package com.demandware.carbonj.service.engine;

import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.GetItemRequest;
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.KeyType;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class DynamoDbCheckPointMgr implements CheckPointMgr<Date> {
Expand All @@ -31,71 +33,74 @@ public class DynamoDbCheckPointMgr implements CheckPointMgr<Date> {
private final String tableName;
private final int defaultOffsetMins;

private final AmazonDynamoDB client;
private final DynamoDB dynamoDB;
private final DynamoDbAsyncClient client;
private final int checkPointDynamodbTimout;

public DynamoDbCheckPointMgr(AmazonDynamoDB client, String kinesisApplicationName, int defaultOffsetMins,
int provisionedThroughput) throws Exception {
public DynamoDbCheckPointMgr(DynamoDbAsyncClient client, String kinesisApplicationName, int defaultOffsetMins,
int provisionedThroughput, int checkPointDynamodbTimout) throws Exception {
this.client = client;
this.dynamoDB = new DynamoDB(client);
this.defaultOffsetMins = defaultOffsetMins;
this.tableName = "checkpoints-" + kinesisApplicationName;
if (!DynamoDbUtils.isTablePresent(dynamoDB, tableName)) {
this.checkPointDynamodbTimout = checkPointDynamodbTimout;
if (!DynamoDbUtils.isTablePresent(client, tableName, checkPointDynamodbTimout)) {
createTable(tableName, provisionedThroughput);
}
}

private void createTable(String tableName, int provisionedThroughput) throws Exception {
CreateTableRequest request = new CreateTableRequest()
.withAttributeDefinitions(
new AttributeDefinition("checkPointType", ScalarAttributeType.S))
.withKeySchema(
new KeySchemaElement("checkPointType", KeyType.HASH))
.withProvisionedThroughput(
new ProvisionedThroughput((long)provisionedThroughput, (long)provisionedThroughput))
.withTableName(tableName);
CreateTableRequest request = CreateTableRequest.builder()
.tableName(tableName)
.attributeDefinitions(AttributeDefinition.builder().attributeName("checkPointType").attributeType(ScalarAttributeType.S).build())
.keySchema(KeySchemaElement.builder().attributeName("checkPointType").keyType(KeyType.HASH).build())
.provisionedThroughput(ProvisionedThroughput.builder()
.readCapacityUnits((long)provisionedThroughput)
.writeCapacityUnits((long)provisionedThroughput)
.build())
.build();
log.info("Issuing CreateTable request for " + tableName);
Table newlyCreatedTable = dynamoDB.createTable(request);
CompletableFuture<CreateTableResponse> createTableResponse = this.client.createTable(request);
log.info("Waiting for " + tableName + " to be created...this may take a while...");
newlyCreatedTable.waitForActive();
createTableResponse.get(checkPointDynamodbTimout, TimeUnit.SECONDS);
}

@Override
public void checkPoint(Date checkPoint) throws Exception {
Table table = dynamoDB.getTable(tableName);

HashMap<String, String> expressionAttributeNames = new HashMap<>();
Map<String, String> expressionAttributeNames = new HashMap<>();
expressionAttributeNames.put("#V", "checkPointValue");

HashMap<String, Object> expressionAttributeValues = new HashMap<>();
expressionAttributeValues.put(":val1", checkPoint.getTime());
Map<String, AttributeValue> expressionAttributeValues = new HashMap<>();
expressionAttributeValues.put(":val1", AttributeValue.builder().n(String.valueOf(checkPoint.getTime())).build());

table.updateItem(
"checkPointType", // key attribute name
"timestamp", // key attribute value
"set #V = :val1", // UpdateExpression
expressionAttributeNames,
expressionAttributeValues);
client.updateItem(UpdateItemRequest.builder()
.tableName(tableName)
.key(Map.of("checkPointType", AttributeValue.builder().s("timestamp").build()))
.updateExpression("set #V = :val1")
.expressionAttributeNames(expressionAttributeNames)
.expressionAttributeValues(expressionAttributeValues).build());
}

@Override
public Date lastCheckPoint() throws Exception {
HashMap<String, AttributeValue> keyToGet = new HashMap<String, AttributeValue>();
keyToGet.put( "checkPointType", new AttributeValue( "timestamp") );
GetItemRequest request = new GetItemRequest()
.withKey( keyToGet )
.withTableName( tableName );

Map<String, AttributeValue> item = client.getItem( request ).getItem();
if( item == null ) {

GetItemRequest request = GetItemRequest.builder()
.tableName(tableName)
.key(Map.of("checkPointType", AttributeValue.builder().s("timestamp").build()))
.build();

GetItemResponse getItemResponse = this.client.getItem(request).get(checkPointDynamodbTimout, TimeUnit.SECONDS);

if (!getItemResponse.hasItem()) {
return getDefaultCheckPoint();
}
String value = item.get( "checkPointValue" ).getN();

Map<String, AttributeValue> item = getItemResponse.item();
String value = item.get("checkPointValue").n();
if( value == null ) {
return getDefaultCheckPoint();
}

return new Date( Long.parseLong( value ) );
return new Date(Long.parseLong(value));
}

private Date getDefaultCheckPoint() {
Expand All @@ -104,4 +109,3 @@ private Date getDefaultCheckPoint() {
return checkPoint;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,33 @@
*/
package com.demandware.carbonj.service.engine;

import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
import com.amazonaws.services.dynamodbv2.model.TableDescription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.TableDescription;
import software.amazon.awssdk.services.dynamodb.model.TableStatus;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class DynamoDbUtils {

private static final Logger log = LoggerFactory.getLogger(DynamoDbUtils.class);

public static boolean isTablePresent(DynamoDB dynamoDB, String tableName) {
Table table = dynamoDB.getTable(tableName);
public static boolean isTablePresent(DynamoDbAsyncClient client, String tableName, int checkPointDynamodbTimout) throws InterruptedException {
DescribeTableRequest describeTableRequest = DescribeTableRequest.builder().tableName(tableName).build();

try {
TableDescription tableDescription = table.describe();
return "ACTIVE".equals(tableDescription.getTableStatus());
} catch (ResourceNotFoundException e) {
DescribeTableResponse describeTableResponse = client.describeTable(describeTableRequest)
.get(checkPointDynamodbTimout, TimeUnit.SECONDS);
TableDescription tableDescription = describeTableResponse.table();
return tableDescription.tableStatus() == TableStatus.ACTIVE;
} catch (ExecutionException | TimeoutException e) {
log.warn("kinesis consumer table '" + tableName + "' not found!");
return false;
}
}

public static void deleteTable(DynamoDB dynamoDB, String tableName) throws InterruptedException {
Table table = dynamoDB.getTable(tableName);
table.delete();

log.info("Waiting for " + tableName + " to be deleted...this may take a while...");

table.waitForDelete();
}
}
Loading
Loading