Skip to content

Commit

Permalink
Merge SFTP protocol and default factory
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Li committed Aug 13, 2021
1 parent 9365e8b commit ffa3d4a
Show file tree
Hide file tree
Showing 41 changed files with 1,081 additions and 231 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.linkedin.cdi.factory.DefaultConnectionClientFactory;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.configuration.State;
import com.linkedin.cdi.factory.DefaultS3ClientFactory;


/**
Expand Down Expand Up @@ -168,6 +168,18 @@ public <T> T getDefaultValue() {
return (T) ",";
}
},
/**
* By default, CsvExtractor tries to infer the true type of fields when inferring schema
* However, in some cases, the inference is not accurate, and users may prefer to keep all fields as strings.
* In this case ms.csv.default.field.type = string
* Supported types: string | int | long | double | boolean | float
*/
MSTAGE_CSV_DEFAULT_FIELD_TYPE("ms.csv.default.field.type", String.class) {
@Override
public <T> T getDefaultValue() {
return (T) StringUtils.EMPTY;
}
},
/**
* if csv.column.header is true, csv.skip.lines will be 1 by default, if more than 1
* row to be skipped, then set this parameter explicitly.
Expand Down Expand Up @@ -345,13 +357,13 @@ public Long getMillis(State state) {
}
},
/**
* http.client.factory define an indirect way to specify the type of HttpClient to use.
* default = {@link com.linkedin.cdi.factory.ApacheHttpClientFactory}
* Define an indirect way to specify the type of connection clients
* default = {@link DefaultConnectionClientFactory}
*/
MSTAGE_HTTP_CLIENT_FACTORY("ms.http.client.factory", String.class) {
MSTAGE_CONNECTION_CLIENT_FACTORY("ms.connection.client.factory", String.class) {
@Override
public <T> T getDefaultValue() {
return (T) "com.linkedin.cdi.factory.ApacheHttpClientFactory";
return (T) "com.linkedin.cdi.factory.DefaultConnectionClientFactory";
}
},
/**
Expand Down Expand Up @@ -400,17 +412,6 @@ public <T> T getDefaultValue() {
* Currently, we don't allow exceptions being made to revert errors by using reason code.
*/
MSTAGE_HTTP_STATUS_REASONS("ms.http.status.reasons", JsonObject.class),
/**
* jdbc.client.factory define an indirect way to specify the type of JDBC Client to use.
* default = {@link com.linkedin.cdi.factory.DefaultJdbcClientFactory}
*/
MSTAGE_JDBC_CLIENT_FACTORY("ms.jdbc.client.factory", String.class) {
@Override
public <T> T getDefaultValue() {
return (T) "com.linkedin.cdi.factory.DefaultJdbcClientFactory";
}
},

MSTAGE_JDBC_SCHEMA_REFACTOR("ms.jdbc.schema.refactor", String.class) {
@Override
public <T> T getDefaultValue() {
Expand Down Expand Up @@ -538,16 +539,6 @@ public <T> T getDefaultValue() {
return (T) retention;
}
},
/**
* s3.client.factory define an indirect way to specify the type of S3 Client to use.
* default = {@link DefaultS3ClientFactory}
*/
MSTAGE_S3_CLIENT_FACTORY("ms.s3.client.factory", String.class) {
@Override
public <T> T getDefaultValue() {
return (T) "com.linkedin.cdi.factory.DefaultS3ClientFactory";
}
},
/**
* Schema cleansing will replace special characters in the schema element names based
* on a pattern. By default it will replace all blank spaces, $, and @ to underscores.
Expand Down Expand Up @@ -764,6 +755,27 @@ public <T> T getDefaultValue() {
*/
MSTAGE_WATERMARK("ms.watermark", JsonArray.class),
MSTAGE_WATERMARK_GROUPS("ms.watermark.groups", JsonArray.class),
/**
* Minimum records to be present in order for the work unit to be successful,
* below the minimum value, the work unit will be failed.
*/
MSTAGE_WORK_UNIT_MIN_RECORDS("ms.work.unit.min.records", Long.class) {
@Override
public <T> T getDefaultValue() {
return (T) Long.valueOf(0);
}
},
/**
* Minimum number of work units to be present in order for the job to proceed,
* below the minimum value, the job will be failed. This parameter shold be used
* only when there is a unit watermark.
*/
MSTAGE_WORK_UNIT_MIN_UNITS("ms.work.unit.min.units", Long.class) {
@Override
public <T> T getDefaultValue() {
return (T) Long.valueOf(0);
}
},
MSTAGE_WORK_UNIT_PARALLELISM_MAX("ms.work.unit.parallelism.max", Integer.class) {
@Override
public boolean validateNonblank(State state) {
Expand Down Expand Up @@ -830,6 +842,16 @@ public <T> T getDefaultValue() {
return (T) Long.valueOf(500L);
}
},
MSTAGE_AUDIT_ENABLED("ms.audit.enabled", Boolean.class) {
@Override
public <T> T getDefaultValue() {
return (T) Boolean.FALSE;
}
},
MSTAGE_KAFKA_BROKERS("ms.kafka.brokers", String.class),
MSTAGE_KAFKA_SCHEMA_REGISTRY_URL("ms.kafka.schema.registry.url", String.class),
MSTAGE_KAFKA_CLIENT_ID("ms.kafka.clientId", String.class),
MSTAGE_KAFKA_TOPIC_NAME("ms.kafka.audit.topic.name", String.class),
// Properties defined in Gobblin, redefine here to leverage the new features like validation
CONVERTER_CLASSES("converter.classes", String.class),
DATASET_URN_KEY("dataset.urn", String.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,20 @@ public interface StaticConstants {
String KEY_WORD_FAIL = "fail";
String KEY_WORD_SUCCESS = "success";
String KEY_WORD_ERROR_COLUMN = "errorColumn";
String KEY_WORD_INT = "int";
String KEY_WORD_LONG = "long";
String KEY_WORD_DOUBLE = "double";
String KEY_WORD_FLOAT = "float";
String KEY_WORD_JSON = "json";
String KEY_WORD_CSV = "csv";
String KEY_WORD_AVRO = "avro";

String EXCEPTION_WORK_UNIT_MINIMUM = "Job requires a minimum of %s work unit(s) to proceed because ms.work.unit.min.units = %s.";
String EXCEPTION_RECORD_MINIMUM = "Work unit requires a minimum of %s record(s) to succeed because ms.work.unit.min.records = %s.";

String MSG_ROWS_PROCESSED = "Processed %s records, work unit: %s";
String MSG_WORK_UNIT_ALWAYS = "There should be a work unit.";
String MSG_LOW_WATER_MARK_ALWAYS = "There should be a low watermark.";
String MSG_WORK_UNIT_INFO = "Generating Work Unit: %s, watermark: %s";
Gson GSON = new Gson();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.gobblin.configuration.State;
import com.linkedin.cdi.configuration.MultistageProperties;
import com.linkedin.cdi.exception.RetriableAuthenticationException;
import com.linkedin.cdi.factory.HttpClientFactory;
import com.linkedin.cdi.factory.ConnectionClientFactory;
import com.linkedin.cdi.keys.ExtractorKeys;
import com.linkedin.cdi.keys.HttpKeys;
import com.linkedin.cdi.keys.JobKeys;
Expand Down Expand Up @@ -79,9 +79,9 @@ synchronized HttpClient getHttpClient(State state) {
if (httpClient == null) {
try {
Class<?> factoryClass = Class.forName(
MultistageProperties.MSTAGE_HTTP_CLIENT_FACTORY.getValidNonblankWithDefault(state));
HttpClientFactory factory = (HttpClientFactory) factoryClass.newInstance();
httpClient = factory.get(state);
MultistageProperties.MSTAGE_CONNECTION_CLIENT_FACTORY.getValidNonblankWithDefault(state));
ConnectionClientFactory factory = (ConnectionClientFactory) factoryClass.newInstance();
httpClient = factory.getHttpClient(state);
} catch (Exception e) {
log.error("Error creating HttpClient: {}", e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.linkedin.cdi.factory.ConnectionClientFactory;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
Expand All @@ -26,7 +27,6 @@
import org.apache.gobblin.configuration.State;
import com.linkedin.cdi.configuration.MultistageProperties;
import com.linkedin.cdi.exception.RetriableAuthenticationException;
import com.linkedin.cdi.factory.JdbcClientFactory;
import com.linkedin.cdi.keys.ExtractorKeys;
import com.linkedin.cdi.keys.JdbcKeys;
import com.linkedin.cdi.keys.JobKeys;
Expand Down Expand Up @@ -101,10 +101,10 @@ public WorkUnitStatus executeNext(WorkUnitStatus workUnitStatus) throws Retriabl
*/
private synchronized Connection getJdbcConnection(State state) {
try {
Class<?> factoryClass = Class.forName(MultistageProperties.MSTAGE_JDBC_CLIENT_FACTORY.getValidNonblankWithDefault(state));
JdbcClientFactory factory = (JdbcClientFactory) factoryClass.newInstance();
Class<?> factoryClass = Class.forName(MultistageProperties.MSTAGE_CONNECTION_CLIENT_FACTORY.getValidNonblankWithDefault(state));
ConnectionClientFactory factory = (ConnectionClientFactory) factoryClass.newInstance();

return factory.getConnection(
return factory.getJdbcConnection(
jdbcSourceKeys.getSourceUri(),
MultistageProperties.SOURCE_CONN_USERNAME.getValidNonblankWithDefault(state),
MultistageProperties.SOURCE_CONN_PASSWORD.getValidNonblankWithDefault(state),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package com.linkedin.cdi.connection;

import com.google.common.collect.Lists;
import com.linkedin.cdi.factory.ConnectionClientFactory;
import java.net.URI;
import java.time.Duration;
import java.util.List;
Expand All @@ -16,7 +17,6 @@
import org.apache.gobblin.configuration.State;
import com.linkedin.cdi.configuration.MultistageProperties;
import com.linkedin.cdi.exception.RetriableAuthenticationException;
import com.linkedin.cdi.factory.S3ClientFactory;
import com.linkedin.cdi.keys.ExtractorKeys;
import com.linkedin.cdi.keys.JobKeys;
import com.linkedin.cdi.keys.S3Keys;
Expand Down Expand Up @@ -135,8 +135,8 @@ public WorkUnitStatus executeFirst(WorkUnitStatus workUnitStatus) throws Retriab
synchronized S3Client getS3HttpClient(State state) {
if (s3Client == null) {
try {
Class<?> factoryClass = Class.forName(MultistageProperties.MSTAGE_S3_CLIENT_FACTORY.getValidNonblankWithDefault(state));
S3ClientFactory factory = (S3ClientFactory) factoryClass.newInstance();
Class<?> factoryClass = Class.forName(MultistageProperties.MSTAGE_CONNECTION_CLIENT_FACTORY.getValidNonblankWithDefault(state));
ConnectionClientFactory factory = (ConnectionClientFactory) factoryClass.getDeclaredConstructor().newInstance();

Integer connectionTimeout = s3SourceV2Keys.getConnectionTimeout();
AttributeMap config = connectionTimeout == null ? GLOBAL_HTTP_DEFAULTS
Expand All @@ -147,7 +147,7 @@ synchronized S3Client getS3HttpClient(State state) {
s3Client = S3Client.builder()
.region(this.s3SourceV2Keys.getRegion())
.endpointOverride(URI.create(s3SourceV2Keys.getEndpoint()))
.httpClient(factory.getHttpClient(state, config))
.httpClient(factory.getS3Client(state, config))
.credentialsProvider(getCredentialsProvider(state))
.build();
} catch (Exception e) {
Expand Down
Loading

0 comments on commit ffa3d4a

Please sign in to comment.