Skip to content

Commit

Permalink
Switch to getProp for getNonblankWithDefault
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Li committed Oct 14, 2021
1 parent 506da9a commit b200413
Show file tree
Hide file tree
Showing 36 changed files with 142 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public boolean validateNonblank(State state) {
* @param state state
* @return property value if non-blank and valid, otherwise the default value
*/
public Boolean getValidNonblankWithDefault(State state) {
protected Boolean getValidNonblankWithDefault(State state) {
if (validateNonblank(state)) {
return Boolean.parseBoolean(state.getProp(getConfig()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public boolean validateNonblank(State state) {
* @param state state
* @return property value if non-blank and valid, otherwise the default value
*/
public Integer getValidNonblankWithDefault(State state) {
protected Integer getValidNonblankWithDefault(State state) {
if (validateNonblank(state)) {
return Integer.parseInt(state.getProp(getConfig()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public boolean validateNonblank(State state) {
* @param state state
* @return property value if non-blank and valid, otherwise the default value
*/
public JsonArray getValidNonblankWithDefault(State state) {
protected JsonArray getValidNonblankWithDefault(State state) {
if (validateNonblank(state)) {
return GSON.fromJson(state.getProp(getConfig()), JsonArray.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public boolean validateNonblank(State state) {
* @param state state
* @return property value if non-blank and valid, otherwise the default value
*/
public JsonObject getValidNonblankWithDefault(State state) {
protected JsonObject getValidNonblankWithDefault(State state) {
if (validateNonblank(state)) {
return GSON.fromJson(state.getProp(getConfig()), JsonObject.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public boolean validateNonblank(State state) {
* @return property value if non-blank and valid, otherwise the default value
* @see #getProp(State)
*/
public Long getValidNonblankWithDefault(State state) {
protected Long getValidNonblankWithDefault(State state) {
if (validateNonblank(state)) {
return Long.parseLong(state.getProp(getConfig()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,5 +89,5 @@ public T getProp(State state) {
* @param state state
* @return subclasses should override
*/
abstract public T getValidNonblankWithDefault(State state);
abstract protected T getValidNonblankWithDefault(State state);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public interface PropertyCollection {
IntegerProperties MSTAGE_ABSTINENT_PERIOD_DAYS = new IntegerProperties("ms.abstinent.period.days") {
@Override
public Long getMillis(State state) {
return 24L * 3600L * 1000L * this.getValidNonblankWithDefault(state);
return 24L * 3600L * 1000L * this.getProp(state);
}
};

Expand Down Expand Up @@ -164,7 +164,7 @@ public JsonObject getDefaultValue() {
LongProperties MSTAGE_WAIT_TIMEOUT_SECONDS = new LongProperties("ms.wait.timeout.seconds", 600L) {
@Override
public Long getMillis(State state) {
return 1000L * this.getValidNonblankWithDefault(state);
return 1000L * this.getProp(state);
}
};

Expand Down Expand Up @@ -205,7 +205,7 @@ public boolean isValid(State state) {
StringProperties EXTRACT_TABLE_NAME_KEY = new StringProperties("extract.table.name");
StringProperties EXTRACT_TABLE_TYPE_KEY = new StringProperties("extract.table.type", "SNAPSHOT_ONLY") {
@Override
public String getValidNonblankWithDefault(State state) {
protected String getValidNonblankWithDefault(State state) {
return super.getValidNonblankWithDefault(state).toUpperCase();
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public boolean validateNonblank(State state) {
* @return property value if non-blank and valid, otherwise the default value
* @see #getProp(State)
*/
public String getValidNonblankWithDefault(State state) {
protected String getValidNonblankWithDefault(State state) {
if (validateNonblank(state)) {
return state.getProp(getConfig());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ synchronized HttpClient getHttpClient(State state) {
if (httpClient == null) {
try {
Class<?> factoryClass = Class.forName(
MSTAGE_CONNECTION_CLIENT_FACTORY.getValidNonblankWithDefault(state));
MSTAGE_CONNECTION_CLIENT_FACTORY.getProp(state));
ConnectionClientFactory factory = (ConnectionClientFactory) factoryClass.newInstance();
httpClient = factory.getHttpClient(state);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,13 @@ public WorkUnitStatus executeNext(WorkUnitStatus workUnitStatus) throws Retriabl
*/
private synchronized Connection getJdbcConnection(State state) {
try {
Class<?> factoryClass = Class.forName(MSTAGE_CONNECTION_CLIENT_FACTORY.getValidNonblankWithDefault(state));
Class<?> factoryClass = Class.forName(MSTAGE_CONNECTION_CLIENT_FACTORY.getProp(state));
ConnectionClientFactory factory = (ConnectionClientFactory) factoryClass.newInstance();

return factory.getJdbcConnection(
jdbcSourceKeys.getSourceUri(),
SOURCE_CONN_USERNAME.getValidNonblankWithDefault(state),
SOURCE_CONN_PASSWORD.getValidNonblankWithDefault(state),
SOURCE_CONN_USERNAME.getProp(state),
SOURCE_CONN_PASSWORD.getProp(state),
state);
} catch (Exception e) {
LOG.error("Error creating Jdbc connection: {}", e.getMessage());
Expand Down Expand Up @@ -170,11 +170,11 @@ private WorkUnitStatus executeStatement(

if (stmt.execute(query)) {
ResultSet resultSet = stmt.getResultSet();
if (MSTAGE_EXTRACTOR_CLASS.getValidNonblankWithDefault(getState()).toString()
if (MSTAGE_EXTRACTOR_CLASS.getProp(getState()).toString()
.matches(".*JsonExtractor.*")) {
wuStatus.setBuffer(new ByteArrayInputStream(toJson(resultSet,
resultSet.getMetaData()).toString().getBytes(StandardCharsets.UTF_8)));
} else if (MSTAGE_EXTRACTOR_CLASS.getValidNonblankWithDefault(getState()).toString()
} else if (MSTAGE_EXTRACTOR_CLASS.getProp(getState()).toString()
.matches(".*CsvExtractor.*")) {
wuStatus.setBuffer(new ByteArrayInputStream(toCsv(resultSet,
resultSet.getMetaData()).getBytes(StandardCharsets.UTF_8)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public WorkUnitStatus executeFirst(WorkUnitStatus workUnitStatus) throws Retriab
synchronized S3Client getS3HttpClient(State state) {
if (s3Client == null) {
try {
Class<?> factoryClass = Class.forName(MSTAGE_CONNECTION_CLIENT_FACTORY.getValidNonblankWithDefault(state));
Class<?> factoryClass = Class.forName(MSTAGE_CONNECTION_CLIENT_FACTORY.getProp(state));
ConnectionClientFactory factory = (ConnectionClientFactory) factoryClass.getDeclaredConstructor().newInstance();

Integer connectionTimeout = s3SourceV2Keys.getConnectionTimeout();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public WorkUnitStatus executeFirst(WorkUnitStatus workUnitStatus) throws Retriab
private SftpClient getFsClient() {
if (this.fsClient == null) {
try {
Class<?> factoryClass = Class.forName(MSTAGE_CONNECTION_CLIENT_FACTORY.getValidNonblankWithDefault(this.getState()));
Class<?> factoryClass = Class.forName(MSTAGE_CONNECTION_CLIENT_FACTORY.getProp(this.getState()));
ConnectionClientFactory factory = (ConnectionClientFactory) factoryClass.getDeclaredConstructor().newInstance();
this.fsClient = factory.getSftpChannelClient(this.getState());
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ public Converter<Schema, Schema, GenericRecord, GenericRecord> init(WorkUnitStat
// Avro Array's max capacity is max int. In case of overflow, use the default value 500.
try {
maxRecordsPerBatch =
Math.toIntExact(MSTAGE_NORMALIZER_BATCH_SIZE.getValidNonblankWithDefault(workUnit));
Math.toIntExact(MSTAGE_NORMALIZER_BATCH_SIZE.getProp(workUnit));
} catch (ArithmeticException e) {
maxRecordsPerBatch = 500;
}

targetSchema = MSTAGE_TARGET_SCHEMA.getValidNonblankWithDefault(workUnit);
targetSchema = MSTAGE_TARGET_SCHEMA.getProp(workUnit);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private void verifyAndUpdateCount(GenericRecord inputRecord) {

private void fillValidationAttributes(WorkUnitState workUnitState) {
JsonObject validationAttributes =
MSTAGE_VALIDATION_ATTRIBUTES.getValidNonblankWithDefault(workUnitState);
MSTAGE_VALIDATION_ATTRIBUTES.getProp(workUnitState);
if (validationAttributes.has(KEY_WORD_THRESHOLD)) {
threshold = validationAttributes.get(KEY_WORD_THRESHOLD).getAsInt();
}
Expand All @@ -119,7 +119,7 @@ private void fillValidationAttributes(WorkUnitState workUnitState) {
*/
private int getBaseRowCount(WorkUnitState workUnitState) {
JsonArray payloads = JsonUtils.filter(KEY_WORD_CATEGORY, KEY_WORD_PAYLOAD,
MSTAGE_SECONDARY_INPUT.getValidNonblankWithDefault(workUnitState));
MSTAGE_SECONDARY_INPUT.getProp(workUnitState));

// by default, we expect 1 record
if (payloads.size() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public class JsonNormalizerConverter extends Converter<JsonArray, JsonArray, Jso

@Override
public Converter<JsonArray, JsonArray, JsonObject, JsonObject> init(WorkUnitState workUnit) {
maxRecordsPerBatch = MSTAGE_NORMALIZER_BATCH_SIZE.getValidNonblankWithDefault(workUnit);
targetSchema = MSTAGE_TARGET_SCHEMA.getValidNonblankWithDefault(workUnit);
maxRecordsPerBatch = MSTAGE_NORMALIZER_BATCH_SIZE.getProp(workUnit);
targetSchema = MSTAGE_TARGET_SCHEMA.getProp(workUnit);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public Schema getSchema() {
@Override
protected void setRowFilter(JsonArray schemaArray) {
if (rowFilter == null) {
if (MSTAGE_ENABLE_SCHEMA_BASED_FILTERING.getValidNonblankWithDefault(state)) {
if (MSTAGE_ENABLE_SCHEMA_BASED_FILTERING.getProp(state)) {
rowFilter = new AvroSchemaBasedFilter(new JsonIntermediateSchema(jobKeys.getOutputSchema()),
avroExtractorKeys, state);
}
Expand Down
24 changes: 11 additions & 13 deletions cdi-core/src/main/java/com/linkedin/cdi/extractor/CsvExtractor.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,12 @@ public CsvExtractor(WorkUnitState state, JobKeys jobKeys) {
@Override
protected void initialize(ExtractorKeys keys) {
csvExtractorKeys.logUsage(state);
csvExtractorKeys.setColumnHeader(
MSTAGE_CSV_COLUMN_HEADER.validateNonblank(state) ? MSTAGE_CSV_COLUMN_HEADER.getProp(
state) : false);
csvExtractorKeys.setRowsToSkip(MSTAGE_CSV_SKIP_LINES.getValidNonblankWithDefault(state));
csvExtractorKeys.setColumnHeader(MSTAGE_CSV_COLUMN_HEADER.getProp(state));
csvExtractorKeys.setRowsToSkip(MSTAGE_CSV_SKIP_LINES.getProp(state));
if (csvExtractorKeys.getColumnHeader()) {
// only set the columnHeaderIndex if ms.csv.column.header is true
csvExtractorKeys.setColumnHeaderIndex(
MSTAGE_CSV_COLUMN_HEADER_INDEX.getValidNonblankWithDefault(state));
MSTAGE_CSV_COLUMN_HEADER_INDEX.getProp(state));
// if no explicit number of lines to skip is set, skip all lines up to the header by default
if (csvExtractorKeys.getRowsToSkip() == 0) {
csvExtractorKeys.setRowsToSkip(csvExtractorKeys.getColumnHeaderIndex() + 1);
Expand All @@ -96,20 +94,20 @@ protected void initialize(ExtractorKeys keys) {
}
}
csvExtractorKeys.setSeparator(
CsvUtils.unescape(MSTAGE_CSV_SEPARATOR.getValidNonblankWithDefault(state)));
CsvUtils.unescape(MSTAGE_CSV_SEPARATOR.getProp(state)));
csvExtractorKeys.setQuoteCharacter(
CsvUtils.unescape(MSTAGE_CSV_QUOTE_CHARACTER.getValidNonblankWithDefault(state)));
CsvUtils.unescape(MSTAGE_CSV_QUOTE_CHARACTER.getProp(state)));
csvExtractorKeys.setEscapeCharacter(
CsvUtils.unescape(MSTAGE_CSV_ESCAPE_CHARACTER.getValidNonblankWithDefault(state)));
CsvUtils.unescape(MSTAGE_CSV_ESCAPE_CHARACTER.getProp(state)));
csvExtractorKeys.setDefaultFieldType(
MSTAGE_CSV_DEFAULT_FIELD_TYPE.getValidNonblankWithDefault(state).toString().toLowerCase());
MSTAGE_CSV_DEFAULT_FIELD_TYPE.getProp(state).toString().toLowerCase());
csvExtractorKeys.setSampleRows(new ArrayDeque<>());

// check if user has defined the output schema
if (jobKeys.hasOutputSchema()) {
JsonArray outputSchema = jobKeys.getOutputSchema();
csvExtractorKeys.setColumnProjection(expandColumnProjection(MSTAGE_CSV_COLUMN_PROJECTION
.getValidNonblankWithDefault(state), outputSchema.size()));
.getProp(state), outputSchema.size()));
// initialize the column name to index map based on the schema when derived fields are present
if (jobKeys.getDerivedFields().entrySet().size() > 0) {
buildColumnToIndexMap(outputSchema);
Expand Down Expand Up @@ -213,7 +211,7 @@ protected boolean processInputStream(long starting) {

// if Content-Type is provided, but not text/csv, the response can have
// useful error information
JsonObject expectedContentType = MSTAGE_HTTP_RESPONSE_TYPE.getValidNonblankWithDefault(state);
JsonObject expectedContentType = MSTAGE_HTTP_RESPONSE_TYPE.getProp(state);
HashSet<String> expectedContentTypeSet = new LinkedHashSet<>(Arrays.asList("text/csv", "application/gzip"));
if (expectedContentType.has(CONTENT_TYPE_KEY) || expectedContentType.has(CONTENT_TYPE_KEY.toLowerCase())) {
for (Map.Entry<String, JsonElement> entry: expectedContentType.entrySet()) {
Expand All @@ -239,7 +237,7 @@ protected boolean processInputStream(long starting) {
.withEscapeChar(csvExtractorKeys.getEscapeCharacter().charAt(0))
.build();
CSVReader reader = new CSVReaderBuilder(new InputStreamReader(input, Charset.forName(
MSTAGE_SOURCE_DATA_CHARACTER_SET.getValidNonblankWithDefault(state)))).withCSVParser(parser)
MSTAGE_SOURCE_DATA_CHARACTER_SET.getProp(state)))).withCSVParser(parser)
.build();
Iterator<String[]> readerIterator = reader.iterator();

Expand Down Expand Up @@ -284,7 +282,7 @@ protected boolean processInputStream(long starting) {
@Override
protected void setRowFilter(JsonArray schemaArray) {
if (rowFilter == null) {
if (MSTAGE_ENABLE_SCHEMA_BASED_FILTERING.getValidNonblankWithDefault(state)) {
if (MSTAGE_ENABLE_SCHEMA_BASED_FILTERING.getProp(state)) {
rowFilter = new CsvSchemaBasedFilter(new JsonIntermediateSchema(schemaArray), csvExtractorKeys);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ protected void initialize(ExtractorKeys keys) {

// file permission is required, but a default value is given in PropertyCollection
fileDumpExtractorKeys.setFileWritePermissions(
MSTAGE_EXTRACTOR_TARGET_FILE_PERMISSION.getValidNonblankWithDefault(state));
MSTAGE_EXTRACTOR_TARGET_FILE_PERMISSION.getProp(state));

// work unit file name is based on a template that is defined by ms.extractor.target.file.name
// and then substituted with activation parameters
Expand Down Expand Up @@ -217,7 +217,7 @@ private void writeToFileSystem(InputStream is, String dumplocation) {
* @return the file name
*/
private String getFileName(WorkUnitState state) {
String fileNameTemplate = MSTAGE_EXTRACTOR_TARGET_FILE_NAME.getValidNonblankWithDefault(state);
String fileNameTemplate = MSTAGE_EXTRACTOR_TARGET_FILE_NAME.getProp(state);
JsonObject activationParameters = extractorKeys.getActivationParameters();
try {
String filePath = VariableUtils.replaceWithTracking(fileNameTemplate, activationParameters).getKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ protected boolean processInputStream(long starting) {

// if Content-Type is provided, but not application/json, the response can have
// useful error information
JsonObject expectedContentType = MSTAGE_HTTP_RESPONSE_TYPE.getValidNonblankWithDefault(state);
JsonObject expectedContentType = MSTAGE_HTTP_RESPONSE_TYPE.getProp(state);
HashSet<String> expectedContentTypeSet = new LinkedHashSet<>(Collections.singletonList("application/json"));
if (expectedContentType.has(CONTENT_TYPE_KEY)) {
for (Map.Entry<String, JsonElement> entry: expectedContentType.entrySet()) {
Expand Down Expand Up @@ -586,7 +586,7 @@ private JsonElement extractJson(InputStream input) throws UnsupportedCharsetExce
JsonElement data = null;
if (input != null) {
data = new JsonParser().parse(new InputStreamReader(input,
Charset.forName(MSTAGE_SOURCE_DATA_CHARACTER_SET.getValidNonblankWithDefault(state))));
Charset.forName(MSTAGE_SOURCE_DATA_CHARACTER_SET.getProp(state))));
connection.closeStream();
}
return data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,9 @@ public MultistageExtractor(WorkUnitState state, JobKeys jobKeys) {

protected void initialize(ExtractorKeys keys) {
extractorKeys = keys;
extractorKeys.setActivationParameters(MSTAGE_ACTIVATION_PROPERTY.getValidNonblankWithDefault(state));
extractorKeys.setActivationParameters(MSTAGE_ACTIVATION_PROPERTY.getProp(state));
extractorKeys.setDelayStartTime(MSTAGE_WORKUNIT_STARTTIME_KEY.getProp(state));
extractorKeys.setExplictEof(MSTAGE_DATA_EXPLICIT_EOF.getValidNonblankWithDefault(state));
extractorKeys.setExplictEof(MSTAGE_DATA_EXPLICIT_EOF.getProp(state));
extractorKeys.setSignature(DATASET_URN_KEY.getProp(state));
extractorKeys.setPreprocessors(getPreprocessors(state));
extractorKeys.setPayloads(getPayloads(state));
Expand Down Expand Up @@ -292,7 +292,7 @@ protected boolean processInputStream(long starting) {
*/
protected void setRowFilter(JsonArray schemaArray) {
if (rowFilter == null) {
if (MSTAGE_ENABLE_SCHEMA_BASED_FILTERING.getValidNonblankWithDefault(state)) {
if (MSTAGE_ENABLE_SCHEMA_BASED_FILTERING.getProp(state)) {
rowFilter = new JsonSchemaBasedFilter(new JsonIntermediateSchema(schemaArray));
}
}
Expand Down Expand Up @@ -372,8 +372,8 @@ protected void holdExecutionUnitPresetStartTime() {
List<StreamProcessor<?>> getPreprocessors(State state) {
ImmutableList.Builder<StreamProcessor<?>> builder = ImmutableList.builder();
JsonObject preprocessorsParams =
MSTAGE_EXTRACT_PREPROCESSORS_PARAMETERS.getValidNonblankWithDefault(state);
String preprocessors = MSTAGE_EXTRACT_PREPROCESSORS.getValidNonblankWithDefault(state);
MSTAGE_EXTRACT_PREPROCESSORS_PARAMETERS.getProp(state);
String preprocessors = MSTAGE_EXTRACT_PREPROCESSORS.getProp(state);
JsonObject preprocessorParams;
for (String preprocessor : preprocessors.split(COMMA_STR)) {
String p = preprocessor.trim();
Expand Down Expand Up @@ -563,7 +563,7 @@ protected String extractText(InputStream input) {
if (input != null) {
try {
data = InputStreamUtils.extractText(input,
MSTAGE_SOURCE_DATA_CHARACTER_SET.getValidNonblankWithDefault(state));
MSTAGE_SOURCE_DATA_CHARACTER_SET.getProp(state));
} catch (Exception e) {
LOG.debug(e.toString());
}
Expand Down Expand Up @@ -941,7 +941,7 @@ protected void logUsage(State state) {
LOG.info("Checking essential (not always mandatory) parameters...");
LOG.info("Values can be default values for the specific type if the property is not configured");
for (MultistageProperties p : JobKeys.ESSENTIAL_PARAMETERS) {
LOG.info("Property {} ({}) has value {} ", p.toString(), p.getClassName(), p.getValidNonblankWithDefault(state));
LOG.info("Property {} ({}) has value {} ", p.toString(), p.getClassName(), p.getProp(state));
}
}

Expand All @@ -953,7 +953,7 @@ protected void logUsage(State state) {
* @return the payload records
*/
protected JsonArray getPayloads(State state) {
JsonArray payloads = MSTAGE_PAYLOAD_PROPERTY.getValidNonblankWithDefault(state);
JsonArray payloads = MSTAGE_PAYLOAD_PROPERTY.getProp(state);
JsonArray records = new JsonArray();
for (JsonElement entry : payloads) {
records.addAll(new HdfsReader(state).readSecondary(entry.getAsJsonObject()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void logDebugAll(WorkUnit workUnit) {
public void logUsage(State state) {
super.logUsage(state);
for (MultistageProperties p: ESSENTIAL_PARAMETERS) {
LOG.info("Property {} ({}) has value {} ", p.toString(), p.getClassName(), p.getValidNonblankWithDefault(state));
LOG.info("Property {} ({}) has value {} ", p.toString(), p.getClassName(), p.getProp(state));
}
}
}
Loading

0 comments on commit b200413

Please sign in to comment.