Skip to content

Commit

Permalink
Add a deprecated property check
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Li committed Oct 28, 2021
1 parent 18568ce commit 1f2bcb9
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@

package com.linkedin.cdi.configuration;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.linkedin.cdi.util.SchemaUtils;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import org.apache.gobblin.configuration.State;

import static com.linkedin.cdi.configuration.StaticConstants.*;
Expand Down Expand Up @@ -56,86 +58,8 @@ public boolean isValid(State state) {
StringProperties MSTAGE_CONNECTION_CLIENT_FACTORY = new StringProperties("ms.connection.client.factory",
"com.linkedin.cdi.factory.DefaultConnectionClientFactory");

// default: 0, minimum: 0, maximum: -
LongProperties MSTAGE_CONVERTER_CSV_MAX_FAILURES = new LongProperties("ms.converter.csv.max.failures") {
@Override
public boolean isValid(State state) {
return false;
}
};

// Decprecate this legacy property
LongProperties CSV_MAX_FAILURES = new LongProperties("csv.max.failures") {
@Override
public boolean isValid(State state) {
return false;
}
};

CsvProperties MSTAGE_CSV = new CsvProperties("ms.csv");

BooleanProperties MSTAGE_CONVERTER_KEEP_NULL_STRINGS = new BooleanProperties("ms.converter.keep.null.strings", Boolean.FALSE) {
@Override
public boolean isValid(State state) {
return false;
}
};

BooleanProperties MSTAGE_CSV_COLUMN_HEADER = new BooleanProperties("ms.csv.column.header", Boolean.FALSE) {
@Override
public boolean isValid(State state) {
return false;
}
};

// default: 0, minimum: 0, maximum: -
IntegerProperties MSTAGE_CSV_COLUMN_HEADER_INDEX = new IntegerProperties("ms.csv.column.header.index") {
@Override
public boolean isValid(State state) {
return false;
}
};

StringProperties MSTAGE_CSV_COLUMN_PROJECTION = new StringProperties("ms.csv.column.projection") {
@Override
public boolean isValid(State state) {
return false;
}
};

StringProperties MSTAGE_CSV_DEFAULT_FIELD_TYPE = new StringProperties("ms.csv.default.field.type") {
@Override
public boolean isValid(State state) {
return false;
}
};
StringProperties MSTAGE_CSV_ESCAPE_CHARACTER = new StringProperties("ms.csv.escape.character", "u005C") {
@Override
public boolean isValid(State state) {
return false;
}
};
StringProperties MSTAGE_CSV_QUOTE_CHARACTER = new StringProperties("ms.csv.quote.character", "\"") {
@Override
public boolean isValid(State state) {
return false;
}
};
StringProperties MSTAGE_CSV_SEPARATOR = new StringProperties("ms.csv.separator", KEY_WORD_COMMA) {
@Override
public boolean isValid(State state) {
return false;
}
};

// default: 0, minimum: 0, maximum: -
IntegerProperties MSTAGE_CSV_SKIP_LINES = new IntegerProperties("ms.csv.skip.lines") {
@Override
public boolean isValid(State state) {
return false;
}
};

BooleanProperties MSTAGE_DATA_EXPLICIT_EOF = new BooleanProperties("ms.data.explicit.eof", Boolean.FALSE);
JsonObjectProperties MSTAGE_DATA_DEFAULT_TYPE = new JsonObjectProperties("ms.data.default.type");
StringProperties MSTAGE_DATA_FIELD = new StringProperties("ms.data.field");
Expand Down Expand Up @@ -393,17 +317,7 @@ protected String getValidNonblankWithDefault(State state) {
MSTAGE_BACKFILL,
MSTAGE_CALL_INTERVAL_MILLIS,
MSTAGE_CONNECTION_CLIENT_FACTORY,
MSTAGE_CONVERTER_CSV_MAX_FAILURES,
MSTAGE_CONVERTER_KEEP_NULL_STRINGS,
MSTAGE_CSV,
MSTAGE_CSV_COLUMN_HEADER,
MSTAGE_CSV_COLUMN_HEADER_INDEX,
MSTAGE_CSV_COLUMN_PROJECTION,
MSTAGE_CSV_DEFAULT_FIELD_TYPE,
MSTAGE_CSV_ESCAPE_CHARACTER,
MSTAGE_CSV_QUOTE_CHARACTER,
MSTAGE_CSV_SEPARATOR,
MSTAGE_CSV_SKIP_LINES,
MSTAGE_DATA_EXPLICIT_EOF,
MSTAGE_DATA_DEFAULT_TYPE,
MSTAGE_DATA_FIELD,
Expand Down Expand Up @@ -464,7 +378,6 @@ protected String getValidNonblankWithDefault(State state) {
MSTAGE_WORK_UNIT_PARTIAL_PARTITION,
MSTAGE_WORK_UNIT_PARTITION,
CONVERTER_CLASSES,
CSV_MAX_FAILURES,
DATA_PUBLISHER_FINAL_DIR,
DATASET_URN,
ENCRYPT_KEY_LOC,
Expand All @@ -490,4 +403,26 @@ protected String getValidNonblankWithDefault(State state) {
TASK_MAXRETRIES,
TASKEXECUTOR_THREADPOOL_SIZE
);

Map<String, MultistageProperties<?>> deprecatedProperties =
new ImmutableMap.Builder<String, MultistageProperties<?>>()
.put("ms.csv.column.header", MSTAGE_CSV)
.put("ms.csv.column.header.index", MSTAGE_CSV)
.put("ms.csv.column.projection", MSTAGE_CSV)
.put("ms.csv.default.field.type", MSTAGE_CSV)
.put("ms.csv.escape.character", MSTAGE_CSV)
.put("ms.csv.quote.character", MSTAGE_CSV)
.put("ms.csv.separator", MSTAGE_CSV)
.put("ms.csv.skip.lines", MSTAGE_CSV)
.put("ms.converter.csv.max.failures", MSTAGE_CSV)
.put("ms.converter.keep.null.strings", MSTAGE_CSV)
.put("csv.max.failures", MSTAGE_CSV)
.put("sftpConn.timeout", MSTAGE_SFTP_CONN_TIMEOUT_MILLIS)
.put("gaap.http.maxConnectionsPerRoute", MSTAGE_HTTP_CONN_PER_ROUTE_MAX)
.put("gaap.http.maxConnections", MSTAGE_HTTP_CONN_MAX)
.put("gaap.trustStorePath", MSTAGE_SSL)
.put("gaap.authType", MSTAGE_CONNECTION_CLIENT_FACTORY)
.put("ms.kraken.enabled", MSTAGE_CONNECTION_CLIENT_FACTORY)
.put("gobblinGaapHttpClientFactory.authType", MSTAGE_CONNECTION_CLIENT_FACTORY)
.build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public interface StaticConstants {
String EXCEPTION_WORK_UNIT_MINIMUM = "Job requires a minimum of %s work unit(s) to proceed because ms.work.unit.min.units = %s.";
String EXCEPTION_RECORD_MINIMUM = "Work unit requires a minimum of %s record(s) to succeed because ms.work.unit.min.records = %s.";
String EXCEPTION_INCORRECT_CONFIGURATION = "Property %s has incorrect configuration: %s, see: %s";
String EXCEPTION_DEPRECATED_CONFIGURATION = "Property %s has been deprecated, and the replacement is: %s, see: %s";

String MSG_ROWS_PROCESSED = "Processed %s records, work unit: %s";
String MSG_WORK_UNIT_ALWAYS = "There should be a work unit.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ private String toCsv(final ResultSet resultSet, final ResultSetMetaData resultSe
for (int i = 0; i < resultSetMetadata.getColumnCount(); i++) {
builder.append(StringEscapeUtils.escapeCsv(JdbcUtils.parseColumnAsString(resultSet, resultSetMetadata, i + 1)));
if (i < resultSetMetadata.getColumnCount() - 1) {
builder.append(jdbcSourceKeys.getSeparator());
builder.append(MSTAGE_CSV.getFieldSeparator(getState()));
} else {
builder.append(System.lineSeparator());
}
Expand Down
27 changes: 0 additions & 27 deletions cdi-core/src/main/java/com/linkedin/cdi/keys/JdbcKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ public class JdbcKeys extends JobKeys {
private static final Logger LOG = LoggerFactory.getLogger(JdbcKeys.class);
private String jdbcStatement = null;
private JsonObject initialParameterValues = new JsonObject();
private String separator = MSTAGE_CSV_SEPARATOR.getDefaultValue();
private String quoteCharacter = MSTAGE_CSV_QUOTE_CHARACTER.getDefaultValue();
private String escapeCharacter = MSTAGE_CSV_ESCAPE_CHARACTER.getDefaultValue();
private String schemaRefactorFunction = MSTAGE_JDBC_SCHEMA_REFACTOR.getDefaultValue();

@Override
Expand All @@ -49,30 +46,6 @@ public void setInitialParameterValues(JsonObject initialParameterValues) {
this.initialParameterValues = initialParameterValues;
}

public String getSeparator() {
return separator;
}

public void setSeparator(String separator) {
this.separator = separator;
}

public String getQuoteCharacter() {
return quoteCharacter;
}

public void setQuoteCharacter(String quoteCharacter) {
this.quoteCharacter = quoteCharacter;
}

public String getEscapeCharacter() {
return escapeCharacter;
}

public void setEscapeCharacter(String escapeCharacter) {
this.escapeCharacter = escapeCharacter;
}

public String getSchemaRefactorFunction() {
return schemaRefactorFunction;
}
Expand Down
10 changes: 10 additions & 0 deletions cdi-core/src/main/java/com/linkedin/cdi/keys/JobKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,16 @@ public boolean validate(State state) {
}
}

for (String deprecatedKey: deprecatedProperties.keySet()) {
if (state.contains(deprecatedKey) &&
StringUtils.isNotBlank(state.getProp(deprecatedKey, StringUtils.EMPTY))) {
LOG.error(String.format(EXCEPTION_DEPRECATED_CONFIGURATION, deprecatedKey,
deprecatedProperties.get(deprecatedKey).getConfig(),
deprecatedProperties.get(deprecatedKey).getDocUrl()));
allValid = false;
}
}

if(!allValid) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.linkedin.cdi.connection.JdbcConnection;
import com.linkedin.cdi.extractor.MultistageExtractor;
import com.linkedin.cdi.keys.JdbcKeys;
import com.linkedin.cdi.util.CsvUtils;
import java.sql.Connection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -56,12 +55,6 @@ public JdbcSource() {
protected void initialize(State state) {
super.initialize(state);
jdbcSourceKeys.setJdbcStatement(MSTAGE_JDBC_STATEMENT.get(state));
jdbcSourceKeys.setSeparator(CsvUtils.unescape(MSTAGE_CSV_SEPARATOR
.get(state)));
jdbcSourceKeys.setQuoteCharacter(CsvUtils.unescape(MSTAGE_CSV_QUOTE_CHARACTER
.get(state)));
jdbcSourceKeys.setEscapeCharacter(CsvUtils.unescape(MSTAGE_CSV_ESCAPE_CHARACTER
.get(state)));
jdbcSourceKeys.setSchemaRefactorFunction(MSTAGE_JDBC_SCHEMA_REFACTOR
.get(state));
jdbcSourceKeys.logDebugAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ public void testDefaultValues() {
SourceState state = new SourceState();
Assert.assertEquals(EXTRACT_IS_FULL.get(state), Boolean.FALSE);
Assert.assertEquals(MSTAGE_BACKFILL.get(state), Boolean.FALSE);
Assert.assertEquals(MSTAGE_CONVERTER_KEEP_NULL_STRINGS.get(state), Boolean.FALSE);
Assert.assertEquals(MSTAGE_CSV_COLUMN_HEADER .get(state), Boolean.FALSE);
Assert.assertEquals(MSTAGE_DATA_EXPLICIT_EOF.get(state), Boolean.FALSE);
Assert.assertEquals(MSTAGE_ENABLE_CLEANSING.get(state), Boolean.TRUE);
Assert.assertEquals(MSTAGE_ENABLE_DYNAMIC_FULL_LOAD.get(state), Boolean.TRUE);
Expand All @@ -66,9 +64,6 @@ public void testDefaultValues() {
Assert.assertEquals(MSTAGE_S3_LIST_MAX_KEYS.get(state).intValue(), 1000);
Assert.assertEquals(MSTAGE_NORMALIZER_BATCH_SIZE.get(state).longValue(), 500L);
Assert.assertEquals(MSTAGE_WAIT_TIMEOUT_SECONDS.get(state).longValue(), 600L);
Assert.assertEquals(MSTAGE_CSV_ESCAPE_CHARACTER.get(state), "u005C");
Assert.assertEquals(MSTAGE_CSV_QUOTE_CHARACTER.get(state), "\"");
Assert.assertEquals(MSTAGE_CSV_SEPARATOR.get(state), KEY_WORD_COMMA);
Assert.assertEquals(MSTAGE_JDBC_SCHEMA_REFACTOR.get(state), "none");
Assert.assertEquals(MSTAGE_SOURCE_DATA_CHARACTER_SET.get(state), "UTF-8");
Assert.assertEquals(MSTAGE_SOURCE_FILES_PATTERN.get(state), REGEXP_DEFAULT_PATTERN);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright 2021 LinkedIn Corporation. All rights reserved.
// Licensed under the BSD-2 Clause license.
// See LICENSE in the project root for license information.

package com.linkedin.cdi.keys;

import gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.junit.Assert;
import org.testng.annotations.Test;


public class JobKeysTestNoMock {
@Test
public void testValidate() {
State state = new SourceState();
state.setProp("csv.max.failures", "10");
Assert.assertFalse(new JobKeys().validate(state));
}
}

0 comments on commit 1f2bcb9

Please sign in to comment.