Skip to content

Commit

Permalink
Add a full validation
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Li committed Oct 14, 2021
1 parent bb136e2 commit fbefb17
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

package com.linkedin.cdi.configuration;

import com.google.common.collect.Lists;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.nio.charset.StandardCharsets;
import java.util.List;
import org.apache.gobblin.configuration.State;

import static com.linkedin.cdi.configuration.StaticConstants.*;
Expand Down Expand Up @@ -170,7 +172,7 @@ public Long getMillis(State state) {

JsonArrayProperties MSTAGE_WATERMARK = new JsonArrayProperties("ms.watermark");
JsonArrayProperties MSTAGE_WATERMARK_GROUPS = new JsonArrayProperties("ms.watermark.groups");
LongProperties MSTAGE_WORKUNIT_STARTTIME_KEY= new LongProperties("ms.work.unit.scheduling.starttime");
LongProperties MSTAGE_WORKUNIT_STARTTIME_KEY = new LongProperties("ms.work.unit.scheduling.starttime");
LongProperties MSTAGE_WORK_UNIT_MIN_RECORDS = new LongProperties("ms.work.unit.min.records");
LongProperties MSTAGE_WORK_UNIT_MIN_UNITS = new LongProperties("ms.work.unit.min.units");
IntegerProperties MSTAGE_WORK_UNIT_PACING_SECONDS = new IntegerProperties("ms.work.unit.pacing.seconds") {
Expand Down Expand Up @@ -216,4 +218,92 @@ protected String getValidNonblankWithDefault(State state) {
StringProperties SOURCE_CONN_USE_PROXY_URL = new StringProperties("source.conn.use.proxy.url");
StringProperties SOURCE_CONN_USE_PROXY_PORT = new StringProperties("source.conn.use.proxy.port");
BooleanProperties STATE_STORE_ENABLED = new BooleanProperties("state.store.enabled", Boolean.TRUE);

List<MultistageProperties> allProperties = Lists.newArrayList(
MSTAGE_ABSTINENT_PERIOD_DAYS,
MSTAGE_ACTIVATION_PROPERTY,
MSTAGE_AUTHENTICATION,
MSTAGE_BACKFILL,
MSTAGE_CALL_INTERVAL_MILLIS,
MSTAGE_CONNECTION_CLIENT_FACTORY,
MSTAGE_CONVERTER_CSV_MAX_FAILURES,
MSTAGE_CONVERTER_KEEP_NULL_STRINGS,
MSTAGE_CSV_COLUMN_HEADER,
MSTAGE_CSV_COLUMN_HEADER_INDEX,
MSTAGE_CSV_COLUMN_PROJECTION,
MSTAGE_CSV_DEFAULT_FIELD_TYPE,
MSTAGE_CSV_ESCAPE_CHARACTER,
MSTAGE_CSV_QUOTE_CHARACTER,
MSTAGE_CSV_SEPARATOR,
MSTAGE_CSV_SKIP_LINES,
MSTAGE_DATA_EXPLICIT_EOF,
MSTAGE_DATA_DEFAULT_TYPE,
MSTAGE_DATA_FIELD,
MSTAGE_DERIVED_FIELDS,
MSTAGE_ENABLE_CLEANSING,
MSTAGE_ENABLE_DYNAMIC_FULL_LOAD,
MSTAGE_ENABLE_SCHEMA_BASED_FILTERING,
MSTAGE_ENCRYPTION_FIELDS,
MSTAGE_EXTRACTOR_CLASS,
MSTAGE_EXTRACTOR_TARGET_FILE_NAME,
MSTAGE_EXTRACTOR_TARGET_FILE_PERMISSION,
MSTAGE_EXTRACT_PREPROCESSORS,
MSTAGE_EXTRACT_PREPROCESSORS_PARAMETERS,
MSTAGE_GRACE_PERIOD_DAYS,
MSTAGE_HTTP_REQUEST_HEADERS,
MSTAGE_HTTP_REQUEST_METHOD,
MSTAGE_HTTP_RESPONSE_TYPE,
MSTAGE_HTTP_STATUSES,
MSTAGE_HTTP_STATUS_REASONS,
MSTAGE_JDBC_SCHEMA_REFACTOR,
MSTAGE_JDBC_STATEMENT,
MSTAGE_KAFKA_BROKERS,
MSTAGE_KAFKA_SCHEMA_REGISTRY_URL,
MSTAGE_KAFKA_CLIENT_ID,
MSTAGE_KAFKA_TOPIC_NAME,
MSTAGE_NORMALIZER_BATCH_SIZE,
MSTAGE_OUTPUT_SCHEMA,
MSTAGE_PAGINATION,
MSTAGE_PARAMETERS,
MSTAGE_PAYLOAD_PROPERTY,
MSTAGE_RETENTION,
MSTAGE_S3_LIST_MAX_KEYS,
MSTAGE_SCHEMA_CLENSING,
MSTAGE_SECONDARY_INPUT,
MSTAGE_SESSION_KEY_FIELD,
MSTAGE_SOURCE_DATA_CHARACTER_SET,
MSTAGE_SOURCE_FILES_PATTERN,
MSTAGE_SOURCE_S3_PARAMETERS,
MSTAGE_SOURCE_SCHEMA_URN,
MSTAGE_SOURCE_URI,
MSTAGE_TARGET_SCHEMA,
MSTAGE_TARGET_SCHEMA_URN,
MSTAGE_TOTAL_COUNT_FIELD,
MSTAGE_VALIDATION_ATTRIBUTES,
MSTAGE_WAIT_TIMEOUT_SECONDS,
MSTAGE_WATERMARK,
MSTAGE_WATERMARK_GROUPS,
MSTAGE_WORKUNIT_STARTTIME_KEY,
MSTAGE_WORK_UNIT_MIN_RECORDS,
MSTAGE_WORK_UNIT_MIN_UNITS,
MSTAGE_WORK_UNIT_PACING_SECONDS,
MSTAGE_WORK_UNIT_PARALLELISM_MAX,
MSTAGE_WORK_UNIT_PARTIAL_PARTITION,
MSTAGE_WORK_UNIT_PARTITION,
CONVERTER_CLASSES,
DATA_PUBLISHER_FINAL_DIR,
DATASET_URN_KEY,
ENCRYPT_KEY_LOC,
EXTRACTOR_CLASSES,
EXTRACT_IS_FULL,
EXTRACT_NAMESPACE_NAME_KEY,
EXTRACT_TABLE_NAME_KEY,
EXTRACT_TABLE_TYPE_KEY,
SOURCE_CLASS,
SOURCE_CONN_USERNAME,
SOURCE_CONN_PASSWORD,
SOURCE_CONN_USE_PROXY_URL,
SOURCE_CONN_USE_PROXY_PORT,
STATE_STORE_ENABLED
);
}
15 changes: 15 additions & 0 deletions cdi-core/src/main/java/com/linkedin/cdi/keys/JobKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,21 @@ public JobKeys setSourceSchema(JsonArray sourceSchema) {
* @return true if validation was successful, otherwise false
*/
public boolean validate(State state) {

// Validate all job parameters
boolean allValid = true;
for (MultistageProperties p: allProperties) {
if (!p.isBlank(state) && !p.isValid(state)) {
LOG.error(String.format(EXCEPTION_INCORRECT_CONFIGURATION,
p.getConfig(), state.getProp(p.getConfig())));
allValid = false;
}
}

if(!allValid) {
return false;
}

/**
* If pagination is enabled, we need one of following ways to stop pagination
* 1. through a total count field, i.e. ms.total.count.field = data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public List<WorkUnit> getWorkunits(SourceState state) {
initialize(state);

if (!jobKeys.validate(state)) {
LOG.error("Some parameters are invalid, job will do nothing until they are fixed.");
return new ArrayList<>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package com.linkedin.cdi.configuration;

import com.google.gson.JsonObject;
import com.linkedin.cdi.keys.JobKeys;
import org.apache.gobblin.configuration.SourceState;
import org.testng.Assert;
import org.testng.annotations.Test;
Expand Down

0 comments on commit fbefb17

Please sign in to comment.