Skip to content

Commit

Permalink
Merge pull request #26 from chris9692/master
Browse files Browse the repository at this point in the history
Move functions reading, checking, and validating secondary input to SecondaryInputProperties
  • Loading branch information
chris9692 authored Nov 23, 2021
2 parents bef6714 + eef9949 commit 85f172b
Show file tree
Hide file tree
Showing 11 changed files with 206 additions and 208 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
package com.linkedin.cdi.configuration;

import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.linkedin.cdi.util.VariableUtils;
import java.io.IOException;
import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.configuration.State;
import org.slf4j.Logger;
Expand Down Expand Up @@ -110,4 +113,22 @@ protected JsonArray getValidNonblankWithDefault(State state) {
}
return getDefaultValue();
}

/**
* Retrieves property value from state object if valid and not blank, then apply dynamic variables,
* otherwise, return default value of the property type
*
* @param state state
* @param parameters dynamic parameters
* @return JsonArray of the property with variables substituted
*/
public JsonArray get(State state, JsonObject parameters) {
String propertyValue = get(state).toString();
try {
propertyValue = VariableUtils.replaceWithTracking(propertyValue, parameters, false).getKey();
} catch (IOException e) {
LOG.error("Invalid parameter: " + parameters);
}
return GSON.fromJson(propertyValue, JsonArray.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public JsonObject getDefaultValue() {
IntegerProperties MSTAGE_S3_LIST_MAX_KEYS = new IntegerProperties("ms.s3.list.max.keys", 1000, Integer.MAX_VALUE, 1);

JsonObjectProperties MSTAGE_SCHEMA_CLEANSING = new JsonObjectProperties("ms.schema.cleansing");
JsonArrayProperties MSTAGE_SECONDARY_INPUT = new JsonArrayProperties("ms.secondary.input");
SecondaryInputProperties MSTAGE_SECONDARY_INPUT = new SecondaryInputProperties("ms.secondary.input");
JsonObjectProperties MSTAGE_SESSION_KEY_FIELD = new JsonObjectProperties("ms.session.key.field");

// default: 60 seconds, minimum: 0, maximum: -
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Copyright 2021 LinkedIn Corporation. All rights reserved.
// Licensed under the BSD-2 Clause license.
// See LICENSE in the project root for license information.

package com.linkedin.cdi.configuration;

import com.google.common.collect.Lists;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonNull;
import com.google.gson.JsonObject;
import com.linkedin.cdi.util.HdfsReader;
import com.linkedin.cdi.util.JsonUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.gobblin.configuration.State;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.linkedin.cdi.configuration.StaticConstants.*;


public class SecondaryInputProperties extends JsonArrayProperties {
private static final Logger LOG = LoggerFactory.getLogger(SecondaryInputProperties.class);
final private static int RETRY_DELAY_IN_SEC_DEFAULT = 300;
final private static int RETRY_COUNT_DEFAULT = 3;

/**
* Constructor with implicit default value
* @param config property name
*/
SecondaryInputProperties(String config) {
super(config);
}

@Override
public boolean isValid(State state) {
final List<String> categories = Lists.newArrayList("authentication", "activation", "payload");
if (super.isValid(state) && !isBlank(state)) {
JsonArray value = GSON.fromJson(state.getProp(getConfig()), JsonArray.class);

// check categories, make sure they are spelled properly
for (JsonElement entry : value) {
if (!entry.isJsonObject() || !entry.getAsJsonObject().has(KEY_WORD_PATH)) {
return false;
}
}

// check categories, make sure they are spelled properly
for (JsonElement si : value) {
if (JsonUtils.get(KEY_WORD_CATEGORY, si.getAsJsonObject()) != JsonNull.INSTANCE) {
String category = JsonUtils.get(KEY_WORD_CATEGORY, si.getAsJsonObject()).getAsString();
if (categories.stream().noneMatch(x -> x.equals(category))) {
return false;
}
}
}
}
return super.isValid(state);
}

/**
* reads the authentication content
*
* @param state the state object
* @return the secondary input entries
*/
public Map<String, JsonArray> readAuthenticationToken(State state) {
Map<String, JsonArray> secondaryInputs = new HashMap<>();
JsonArray categoryData = secondaryInputs.computeIfAbsent(KEY_WORD_AUTHENTICATION, x -> new JsonArray());
categoryData.addAll(new HdfsReader(state).readSecondary(getAuthenticationDefinition(state).getAsJsonObject()));
return secondaryInputs;
}

/**
* Read authentication and activation secondary input records and payload definitions (not records)
*
* @return a set of JsonArrays of data read from locations specified in SECONDARY_INPUT
* property organized by category, in a Map<String, JsonArray> structure
*/
public Map<String, JsonArray> readAllContext(State state) {
Map<String, JsonArray> secondaryInputs = new HashMap<>();
for (JsonElement entry: get(state)) {
if (!entry.getAsJsonObject().has(KEY_WORD_PATH)) {
continue;
}

String category = entry.getAsJsonObject().has(KEY_WORD_CATEGORY)
? entry.getAsJsonObject().get(KEY_WORD_CATEGORY).getAsString()
: KEY_WORD_ACTIVATION;

JsonArray categoryData = secondaryInputs.computeIfAbsent(category, x -> new JsonArray());
if (category.equalsIgnoreCase(KEY_WORD_ACTIVATION) || category.equalsIgnoreCase(KEY_WORD_AUTHENTICATION)) {
categoryData.addAll(new HdfsReader(state).readSecondary(entry.getAsJsonObject()));
}

if (entry.getAsJsonObject().has(KEY_WORD_PATH) && category.equalsIgnoreCase(KEY_WORD_PAYLOAD)) {
categoryData.add(entry);
}

}
return secondaryInputs;
}

/**
* Check if authentication is configured in secondary input
* @return true if secondary input contains an authentication definition
*/
public boolean isAuthenticationEnabled(State state) {
return getAuthenticationDefinition(state).entrySet().size() > 0;
}

/**
* Get the authentication part of the secondary input,
* @param state state object
* @return the authentication secondary input
*/
private JsonObject getAuthenticationDefinition(State state) {
for (JsonElement entry : get(state)) {
if (entry.isJsonObject() && entry.getAsJsonObject().has(KEY_WORD_CATEGORY)) {
String category = entry.getAsJsonObject().get(KEY_WORD_CATEGORY).getAsString();
if (category.equalsIgnoreCase(KEY_WORD_AUTHENTICATION)) {
return entry.getAsJsonObject();
}
}
}
return new JsonObject();
}

/**
* This method populates the retry parameters (delayInSec, retryCount) via the secondary input.
* These values are used to retry connection whenever the "authentication" type category is defined and the token hasn't
* been populated yet. If un-defined, they will retain the default values as specified by RETRY_DEFAULT_DELAY and
* RETRY_DEFAULT_COUNT.
*
* For e.g.
* ms.secondary.input : "[{"path": "/util/avro_retry", "fields": ["uuid"],
* "category": "authentication", "retry": {"delayInSec" : "1", "retryCount" : "2"}}]"
* @param state the state record
* @return the retry delay and count in a map structure
*/
public Map<String, Long> getAuthenticationRetry(State state) {
long retryDelay = RETRY_DELAY_IN_SEC_DEFAULT;
long retryCount = RETRY_COUNT_DEFAULT;
Map<String, Long> retry = new HashMap<>();

if (JsonUtils.get(KEY_WORD_RETRY, getAuthenticationDefinition(state)) != JsonNull.INSTANCE) {
JsonObject retryFields = getAuthenticationDefinition(state).get(KEY_WORD_RETRY).getAsJsonObject();
retryDelay = retryFields.has(KEY_WORD_RETRY_DELAY_IN_SEC)
? retryFields.get(KEY_WORD_RETRY_DELAY_IN_SEC).getAsLong() : retryDelay;
retryCount = retryFields.has(KEY_WORD_RETRY_COUNT)
? retryFields.get(KEY_WORD_RETRY_COUNT).getAsLong() : retryCount;
}

retry.put(KEY_WORD_RETRY_DELAY_IN_SEC, retryDelay);
retry.put(KEY_WORD_RETRY_COUNT, retryCount);
return retry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ public class WatermarkProperties extends JsonArrayProperties {
public boolean isValid(State state) {
final List<String> types = Lists.newArrayList("unit", "datetime");
if (super.isValid(state) && !isBlank(state)) {
// Derived fields should meet general JsonArray configuration requirements
// and contain only JsonObject items that each has a "name" element and a "formula" element
JsonArray value = GSON.fromJson(state.getProp(getConfig()), JsonArray.class);
if (value.size() == 0) {
return false;
Expand Down
87 changes: 2 additions & 85 deletions cdi-core/src/main/java/com/linkedin/cdi/keys/JobKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@
import com.linkedin.cdi.factory.ConnectionClientFactory;
import com.linkedin.cdi.factory.reader.SchemaReader;
import com.linkedin.cdi.util.DateTimeUtils;
import com.linkedin.cdi.util.HdfsReader;
import com.linkedin.cdi.util.JsonUtils;
import com.linkedin.cdi.util.ParameterTypes;
import com.linkedin.cdi.util.WorkUnitPartitionTypes;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.configuration.State;
import org.joda.time.DateTime;
Expand Down Expand Up @@ -51,10 +49,6 @@
public class JobKeys {
private static final Logger LOG = LoggerFactory.getLogger(JobKeys.class);
final static public Gson GSON = new Gson();
final private static int RETRY_DELAY_IN_SEC_DEFAULT = 300;
final private static int RETRY_COUNT_DEFAULT = 3;
final private static String ITEMS_KEY = "items";

private Map<String, Map<String, String>> derivedFields = new HashMap<>();
private Map<String, String> defaultFieldTypes = new HashMap<>();

Expand All @@ -81,7 +75,6 @@ public class JobKeys {
private long retryDelayInSec;
private long retryCount;
private Boolean isPartialPartition;
private JsonArray secondaryInputs = new JsonArray();
private WorkUnitPartitionTypes workUnitPartitionType;
private Boolean isSecondaryAuthenticationEnabled = false;
private String sourceUri = StringUtils.EMPTY;
Expand Down Expand Up @@ -128,11 +121,10 @@ public void initialize(State state) {
setIsPartialPartition(MSTAGE_WORK_UNIT_PARTIAL_PARTITION.get(state));
setWorkUnitPartitionType(parsePartitionType(state));
setWatermarkDefinition(MSTAGE_WATERMARK.get(state));
Map<String, Long> retry = parseSecondaryInputRetry(MSTAGE_SECONDARY_INPUT.get(state));
Map<String, Long> retry = MSTAGE_SECONDARY_INPUT.getAuthenticationRetry(state);
setRetryDelayInSec(retry.get(KEY_WORD_RETRY_DELAY_IN_SEC));
setRetryCount(retry.get(KEY_WORD_RETRY_COUNT));
setSecondaryInputs(MSTAGE_SECONDARY_INPUT.get(state));
setIsSecondaryAuthenticationEnabled(checkSecondaryAuthenticationEnabled());
setIsSecondaryAuthenticationEnabled(MSTAGE_SECONDARY_INPUT.isAuthenticationEnabled(state));

setSourceSchema(readSourceSchemaFromUrn(state, MSTAGE_SOURCE_SCHEMA_URN.get(state)));
setTargetSchema(readTargetSchemaFromUrn(state, MSTAGE_TARGET_SCHEMA_URN.get(state)));
Expand Down Expand Up @@ -464,73 +456,6 @@ WorkUnitPartitionTypes parsePartitionType(State state) {
return partitionType;
}

/**
* This method populates the retry parameters (delayInSec, retryCount) via the secondary input.
* These values are used to retry connection whenever the "authentication" type category is defined and the token hasn't
* been populated yet. If un-defined, they will retain the default values as specified by RETRY_DEFAULT_DELAY and
* RETRY_DEFAULT_COUNT.
*
* For e.g.
* ms.secondary.input : "[{"path": "/util/avro_retry", "fields": ["uuid"],
* "category": "authentication", "retry": {"delayInSec" : "1", "retryCount" : "2"}}]"
* @param jsonArray the raw secondary input
* @return the retry delay and count in a map structure
*/
private Map<String, Long> parseSecondaryInputRetry(JsonArray jsonArray) {
long retryDelay = RETRY_DELAY_IN_SEC_DEFAULT;
long retryCount = RETRY_COUNT_DEFAULT;
Map<String, Long> retry = new HashMap<>();
for (JsonElement field: jsonArray) {
JsonObject retryFields = (JsonObject) field.getAsJsonObject().get(KEY_WORD_RETRY);
if (retryFields != null && !retryFields.isJsonNull()) {
retryDelay = retryFields.has(KEY_WORD_RETRY_DELAY_IN_SEC)
? retryFields.get(KEY_WORD_RETRY_DELAY_IN_SEC).getAsLong() : retryDelay;
retryCount = retryFields.has(KEY_WORD_RETRY_COUNT)
? retryFields.get(KEY_WORD_RETRY_COUNT).getAsLong() : retryCount;
}
}
retry.put(KEY_WORD_RETRY_DELAY_IN_SEC, retryDelay);
retry.put(KEY_WORD_RETRY_COUNT, retryCount);
return retry;
}

/**
* Check if authentication is configured in secondary input
* @return true if secondary input contains an authentication definition
*/
protected boolean checkSecondaryAuthenticationEnabled() {
for (JsonElement entry: getSecondaryInputs()) {
if (entry.isJsonObject()
&& entry.getAsJsonObject().has(KEY_WORD_CATEGORY)
&& entry.getAsJsonObject().get(KEY_WORD_CATEGORY).getAsString()
.equalsIgnoreCase(KEY_WORD_AUTHENTICATION)) {
return true;
}
}
return false;
}

public Map<String, JsonArray> readSecondaryInputs(State state, final long retries) throws InterruptedException {
LOG.info("Trying to read secondary input with retry = {}", retries);
Map<String, JsonArray> secondaryInputs = readContext(state);

// Check if authentication is ready, and if not, whether retry is required
JsonArray authentications = secondaryInputs.get(KEY_WORD_AUTHENTICATION);
if ((authentications == null || authentications.size() == 0) && this.getIsSecondaryAuthenticationEnabled()
&& retries > 0) {
LOG.info("Authentication tokens are expected from secondary input, but not ready");
LOG.info("Will wait for {} seconds and then retry reading the secondary input", this.getRetryDelayInSec());
TimeUnit.SECONDS.sleep(this.getRetryDelayInSec());
return readSecondaryInputs(state, retries - 1);
}
LOG.info("Successfully read secondary input, no more retry");
return secondaryInputs;
}

private Map<String, JsonArray> readContext(State state) {
return new HdfsReader(state, this.getSecondaryInputs()).readAll();
}

/**
* Call the reader factory and read schema of the URN
* @param urn the dataset URN
Expand Down Expand Up @@ -749,14 +674,6 @@ public void setIsPartialPartition(Boolean partialPartition) {
isPartialPartition = partialPartition;
}

public JsonArray getSecondaryInputs() {
return secondaryInputs;
}

public void setSecondaryInputs(JsonArray secondaryInputs) {
this.secondaryInputs = secondaryInputs;
}

public WorkUnitPartitionTypes getWorkUnitPartitionType() {
return workUnitPartitionType;
}
Expand Down
Loading

0 comments on commit 85f172b

Please sign in to comment.