Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for '.' in Bucket Name #84

Merged
merged 3 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 37 additions & 2 deletions cdi-core/src/main/java/com/linkedin/cdi/source/S3SourceV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package com.linkedin.cdi.source;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
Expand All @@ -14,6 +15,7 @@
import java.net.URL;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
Expand All @@ -33,6 +35,9 @@ public class S3SourceV2 extends MultistageSource<Schema, GenericRecord> {
private static final String KEY_CONNECTION_TIMEOUT = "connection_timeout";
private static final HashSet<String> S3_REGIONS_SET =
Region.regions().stream().map(region -> region.toString()).collect(Collectors.toCollection(HashSet::new));

private static final String KEY_BUCKET_NAME = "bucket_name";

private S3Keys s3SourceV2Keys = new S3Keys();

public S3Keys getS3SourceV2Keys() {
Expand Down Expand Up @@ -87,11 +92,11 @@ protected void initialize(State state) {
}

// separate the endpoint, which should be a URL without bucket name, from the domain name
s3SourceV2Keys.setEndpoint("https://" + getEndpointFromHost(url.getHost()));
s3SourceV2Keys.setEndpoint("https://" + getEndpoint(parameters, url.getHost()));
s3SourceV2Keys.setPrefix(url.getPath().substring(1));

// separate the bucket name from URI domain name
s3SourceV2Keys.setBucket(url.getHost().split("\\.")[0]);
s3SourceV2Keys.setBucket(getBucketName(parameters, url.getHost()));

s3SourceV2Keys.setFilesPattern(MSTAGE_SOURCE_FILES_PATTERN.get(state));
s3SourceV2Keys.setMaxKeys(MSTAGE_S3_LIST_MAX_KEYS.get(state));
Expand Down Expand Up @@ -130,4 +135,34 @@ private String getEndpointFromHost(String host) {
segments.remove(0);
return Joiner.on(".").join(segments);
}

/**
*
* @param parameters JsonObject containing ms.source.s3.parameters
* @param host hostname with bucket name in the beginning
* @return the bucket name
*/
@VisibleForTesting
protected String getBucketName(JsonObject parameters, String host) {
if (parameters.has(KEY_BUCKET_NAME)) {
return parameters.get(KEY_BUCKET_NAME).getAsString();
}
return host.split("\\.")[0];
}

/**
*
* @param parameters JsonObject containing ms.source.s3.parameters
* @param host hostname with bucket name in the beginning
* @return the endpoint name if bucket name is present in the parameters then removes the bucket name from host and
* calls the getEndpointFromHost method to get the endpoint.
*/
@VisibleForTesting
protected String getEndpoint(JsonObject parameters, String host) {
if (parameters.has(KEY_BUCKET_NAME)) {
String bucketName = parameters.get(KEY_BUCKET_NAME).getAsString().toLowerCase();
host = host.toLowerCase(Locale.ROOT).replace(bucketName, "");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor concern butt I don't see the tolowercase used in the original getEndpointFromHost method. Is there an existing use case that has issues with cases?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are replacing bucket name from the host, if some client enters bucket name in different case as in the source uri. It won't be replaced properly.

}
return getEndpointFromHost(host);
}
}
42 changes: 42 additions & 0 deletions cdi-core/src/test/java/com/linkedin/cdi/source/S3SourceV2Test.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package com.linkedin.cdi.source;

import com.google.gson.Gson;
import com.google.gson.JsonObject;
import java.io.UnsupportedEncodingException;
import org.apache.gobblin.configuration.WorkUnitState;
Expand Down Expand Up @@ -36,4 +37,45 @@ public void testInitialization() throws UnsupportedEncodingException {
Assert.assertEquals(source.getS3SourceV2Keys().getMaxKeys(), new Integer(1000));
Assert.assertEquals(source.getS3SourceV2Keys().getConnectionTimeout(), new Integer(30));
}

@Test
public void testBucketName() {
S3SourceV2 s3SourceV2 = new S3SourceV2();
JsonObject parameteres =
new Gson().fromJson("{\"region\" : \"us-east-2\", \"bucket_name\" : \"collect-us-west-2.tealium.com\"}",
JsonObject.class);
String host = "collect-us-west-2.tealium.com.s3.amazonaws.com";
String bucketName = s3SourceV2.getBucketName(parameteres, host);
Assert.assertEquals(bucketName, "collect-us-west-2.tealium.com");
}

@Test
public void testBucketNameWithoutBucketParameter() {
S3SourceV2 s3SourceV2 = new S3SourceV2();
JsonObject parameteres = new Gson().fromJson("{\"region\" : \"us-east-2\"}", JsonObject.class);
String host = "collect-us-west-2.s3.amazonaws.com";
String bucketName = s3SourceV2.getBucketName(parameteres, host);
Assert.assertEquals(bucketName, "collect-us-west-2");
}

@Test
public void testEndpoint() {
S3SourceV2 s3SourceV2 = new S3SourceV2();
JsonObject parameteres =
new Gson().fromJson("{\"region\" : \"us-east-2\", \"bucket_name\" : \"colleCt-us-west-2.tealium.com\"}",
JsonObject.class);
String host = "collect-us-west-2.tealium.com.s3.amazonaws.com";
String endpoint = s3SourceV2.getEndpoint(parameteres, host);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: parameteres -> parameters

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure with fix the typo.

Assert.assertEquals(endpoint, "s3.amazonaws.com");
}

@Test
public void testEndpointWithoutPeriod() {
S3SourceV2 s3SourceV2 = new S3SourceV2();
JsonObject parameters =
new Gson().fromJson("{\"region\" : \"us-east-2\", \"bucket_name\" : \"collect-us-west-2\"}", JsonObject.class);
String host = "collect-us-west-2.s3.amazonaws.com";
String endpoint = s3SourceV2.getEndpoint(parameters, host);
Assert.assertEquals(endpoint, "s3.amazonaws.com");
}
}
1 change: 1 addition & 0 deletions docs/parameters/ms.source.s3.parameters.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ It can have the following attributes:
- **write_timeout_seconds**: integer, write time out in seconds
- **connection_timeout_seconds**: Sets the socket to timeout after failing to establish a connection with the server after milliseconds.
- **connection_max_idle_millis**: Sets the socket to timeout after timeout milliseconds of inactivity on the socket.
- **bucket_name**: Sets the bucket name, optional if the bucket name doesn't contain any special characters.

### Example

Expand Down
Loading