Skip to content

Commit

Permalink
Add a secondary input property class
Browse files Browse the repository at this point in the history
  • Loading branch information
chris9692 committed Nov 23, 2021
1 parent 54ebdf0 commit 0defa73
Show file tree
Hide file tree
Showing 21 changed files with 394 additions and 220 deletions.
12 changes: 8 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,12 @@ To contribute, please use submit Pull Request (PR) for committers to merge.
- `git push origin +master`
- check your fork should be in sync with the main repository

# Detailed Documents
# User Guides

- [Job Properties](https://github.com/linkedin/data-integration-library/blob/master/docs/parameters/summary.md)
- [Job Properties by Category](https://github.com/linkedin/data-integration-library/blob/master/docs/parameters/categories.md)
- [Deprecated Job Properties](https://github.com/linkedin/data-integration-library/blob/master/docs/parameters/deprecated.md)
- [1. Job Properties](https://github.com/linkedin/data-integration-library/blob/master/docs/parameters/summary.md)
- [2. Job Properties by Category](https://github.com/linkedin/data-integration-library/blob/master/docs/parameters/categories.md)
- [3. Deprecated Job Properties](https://github.com/linkedin/data-integration-library/blob/master/docs/parameters/deprecated.md)
- [4. Flow Design Patterns](docs/patterns/summary.md)
- [5. How-tos](docs/how-to/summary.md)
- [6. Key Concepts](docs/concepts/summary.md)
- [7. Sample Flow Configurations](docs/sample-configs/summary.md)
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
package com.linkedin.cdi.configuration;

import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.linkedin.cdi.util.VariableUtils;
import java.io.IOException;
import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.configuration.State;
import org.slf4j.Logger;
Expand Down Expand Up @@ -110,4 +113,22 @@ protected JsonArray getValidNonblankWithDefault(State state) {
}
return getDefaultValue();
}

/**
* Retrieves property value from state object if valid and not blank, then apply dynamic variables,
* otherwise, return default value of the property type
*
* @param state state
* @param parameters dynamic parameters
* @return JsonArray of the property with variables substituted
*/
public JsonArray get(State state, JsonObject parameters) {
String propertyValue = get(state).toString();
try {
propertyValue = VariableUtils.replaceWithTracking(propertyValue, parameters, false).getKey();
} catch (IOException e) {
LOG.error("Invalid parameter: " + parameters);
}
return GSON.fromJson(propertyValue, JsonArray.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public JsonObject getDefaultValue() {
IntegerProperties MSTAGE_S3_LIST_MAX_KEYS = new IntegerProperties("ms.s3.list.max.keys", 1000, Integer.MAX_VALUE, 1);

JsonObjectProperties MSTAGE_SCHEMA_CLEANSING = new JsonObjectProperties("ms.schema.cleansing");
JsonArrayProperties MSTAGE_SECONDARY_INPUT = new JsonArrayProperties("ms.secondary.input");
SecondaryInputProperties MSTAGE_SECONDARY_INPUT = new SecondaryInputProperties("ms.secondary.input");
JsonObjectProperties MSTAGE_SESSION_KEY_FIELD = new JsonObjectProperties("ms.session.key.field");

// default: 60 seconds, minimum: 0, maximum: -
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Copyright 2021 LinkedIn Corporation. All rights reserved.
// Licensed under the BSD-2 Clause license.
// See LICENSE in the project root for license information.

package com.linkedin.cdi.configuration;

import com.google.common.collect.Lists;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonNull;
import com.google.gson.JsonObject;
import com.linkedin.cdi.util.HdfsReader;
import com.linkedin.cdi.util.JsonUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.gobblin.configuration.State;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.linkedin.cdi.configuration.StaticConstants.*;


public class SecondaryInputProperties extends JsonArrayProperties {
private static final Logger LOG = LoggerFactory.getLogger(SecondaryInputProperties.class);
final private static int RETRY_DELAY_IN_SEC_DEFAULT = 300;
final private static int RETRY_COUNT_DEFAULT = 3;

/**
* Constructor with implicit default value
* @param config property name
*/
SecondaryInputProperties(String config) {
super(config);
}

@Override
public boolean isValid(State state) {
final List<String> categories = Lists.newArrayList("authentication", "activation", "payload");
if (super.isValid(state) && !isBlank(state)) {
JsonArray value = GSON.fromJson(state.getProp(getConfig()), JsonArray.class);

// check categories, make sure they are spelled properly
for (JsonElement entry : value) {
if (!entry.isJsonObject() || !entry.getAsJsonObject().has(KEY_WORD_PATH)) {
return false;
}
}

// check categories, make sure they are spelled properly
for (JsonElement si : value) {
if (JsonUtils.get(KEY_WORD_CATEGORY, si.getAsJsonObject()) != JsonNull.INSTANCE) {
String category = JsonUtils.get(KEY_WORD_CATEGORY, si.getAsJsonObject()).getAsString();
if (categories.stream().noneMatch(x -> x.equals(category))) {
return false;
}
}
}
}
return super.isValid(state);
}

/**
* reads the authentication content
*
* @param state the state object
* @return the secondary input entries
*/
public Map<String, JsonArray> readAuthenticationToken(State state) {
Map<String, JsonArray> secondaryInputs = new HashMap<>();
JsonArray categoryData = secondaryInputs.computeIfAbsent(KEY_WORD_AUTHENTICATION, x -> new JsonArray());
categoryData.addAll(new HdfsReader(state).readSecondary(getAuthenticationDefinition(state).getAsJsonObject()));
return secondaryInputs;
}

/**
* Read authentication and activation secondary input records and payload definitions (not records)
*
* @return a set of JsonArrays of data read from locations specified in SECONDARY_INPUT
* property organized by category, in a Map<String, JsonArray> structure
*/
public Map<String, JsonArray> readAllContext(State state) {
Map<String, JsonArray> secondaryInputs = new HashMap<>();
for (JsonElement entry: get(state)) {
if (!entry.getAsJsonObject().has(KEY_WORD_PATH)) {
continue;
}

String category = entry.getAsJsonObject().has(KEY_WORD_CATEGORY)
? entry.getAsJsonObject().get(KEY_WORD_CATEGORY).getAsString()
: KEY_WORD_ACTIVATION;

JsonArray categoryData = secondaryInputs.computeIfAbsent(category, x -> new JsonArray());
if (category.equalsIgnoreCase(KEY_WORD_ACTIVATION) || category.equalsIgnoreCase(KEY_WORD_AUTHENTICATION)) {
categoryData.addAll(new HdfsReader(state).readSecondary(entry.getAsJsonObject()));
}

if (entry.getAsJsonObject().has(KEY_WORD_PATH) && category.equalsIgnoreCase(KEY_WORD_PAYLOAD)) {
categoryData.add(entry);
}

}
return secondaryInputs;
}

/**
* Check if authentication is configured in secondary input
* @return true if secondary input contains an authentication definition
*/
public boolean isAuthenticationEnabled(State state) {
return getAuthenticationDefinition(state).entrySet().size() > 0;
}

/**
* Get the authentication part of the secondary input,
* @param state state object
* @return the authentication secondary input
*/
private JsonObject getAuthenticationDefinition(State state) {
for (JsonElement entry : get(state)) {
if (entry.isJsonObject() && entry.getAsJsonObject().has(KEY_WORD_CATEGORY)) {
String category = entry.getAsJsonObject().get(KEY_WORD_CATEGORY).getAsString();
if (category.equalsIgnoreCase(KEY_WORD_AUTHENTICATION)) {
return entry.getAsJsonObject();
}
}
}
return new JsonObject();
}

/**
* This method populates the retry parameters (delayInSec, retryCount) via the secondary input.
* These values are used to retry connection whenever the "authentication" type category is defined and the token hasn't
* been populated yet. If un-defined, they will retain the default values as specified by RETRY_DEFAULT_DELAY and
* RETRY_DEFAULT_COUNT.
*
* For e.g.
* ms.secondary.input : "[{"path": "/util/avro_retry", "fields": ["uuid"],
* "category": "authentication", "retry": {"delayInSec" : "1", "retryCount" : "2"}}]"
* @param state the state record
* @return the retry delay and count in a map structure
*/
public Map<String, Long> getAuthenticationRetry(State state) {
long retryDelay = RETRY_DELAY_IN_SEC_DEFAULT;
long retryCount = RETRY_COUNT_DEFAULT;
Map<String, Long> retry = new HashMap<>();

if (JsonUtils.get(KEY_WORD_RETRY, getAuthenticationDefinition(state)) != JsonNull.INSTANCE) {
JsonObject retryFields = getAuthenticationDefinition(state).get(KEY_WORD_RETRY).getAsJsonObject();
retryDelay = retryFields.has(KEY_WORD_RETRY_DELAY_IN_SEC)
? retryFields.get(KEY_WORD_RETRY_DELAY_IN_SEC).getAsLong() : retryDelay;
retryCount = retryFields.has(KEY_WORD_RETRY_COUNT)
? retryFields.get(KEY_WORD_RETRY_COUNT).getAsLong() : retryCount;
}

retry.put(KEY_WORD_RETRY_DELAY_IN_SEC, retryDelay);
retry.put(KEY_WORD_RETRY_COUNT, retryCount);
return retry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ public class WatermarkProperties extends JsonArrayProperties {
public boolean isValid(State state) {
final List<String> types = Lists.newArrayList("unit", "datetime");
if (super.isValid(state) && !isBlank(state)) {
// Derived fields should meet general JsonArray configuration requirements
// and contain only JsonObject items that each has a "name" element and a "formula" element
JsonArray value = GSON.fromJson(state.getProp(getConfig()), JsonArray.class);
if (value.size() == 0) {
return false;
Expand Down
87 changes: 2 additions & 85 deletions cdi-core/src/main/java/com/linkedin/cdi/keys/JobKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@
import com.linkedin.cdi.factory.ConnectionClientFactory;
import com.linkedin.cdi.factory.reader.SchemaReader;
import com.linkedin.cdi.util.DateTimeUtils;
import com.linkedin.cdi.util.HdfsReader;
import com.linkedin.cdi.util.JsonUtils;
import com.linkedin.cdi.util.ParameterTypes;
import com.linkedin.cdi.util.WorkUnitPartitionTypes;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.configuration.State;
import org.joda.time.DateTime;
Expand Down Expand Up @@ -51,10 +49,6 @@
public class JobKeys {
private static final Logger LOG = LoggerFactory.getLogger(JobKeys.class);
final static public Gson GSON = new Gson();
final private static int RETRY_DELAY_IN_SEC_DEFAULT = 300;
final private static int RETRY_COUNT_DEFAULT = 3;
final private static String ITEMS_KEY = "items";

private Map<String, Map<String, String>> derivedFields = new HashMap<>();
private Map<String, String> defaultFieldTypes = new HashMap<>();

Expand All @@ -81,7 +75,6 @@ public class JobKeys {
private long retryDelayInSec;
private long retryCount;
private Boolean isPartialPartition;
private JsonArray secondaryInputs = new JsonArray();
private WorkUnitPartitionTypes workUnitPartitionType;
private Boolean isSecondaryAuthenticationEnabled = false;
private String sourceUri = StringUtils.EMPTY;
Expand Down Expand Up @@ -128,11 +121,10 @@ public void initialize(State state) {
setIsPartialPartition(MSTAGE_WORK_UNIT_PARTIAL_PARTITION.get(state));
setWorkUnitPartitionType(parsePartitionType(state));
setWatermarkDefinition(MSTAGE_WATERMARK.get(state));
Map<String, Long> retry = parseSecondaryInputRetry(MSTAGE_SECONDARY_INPUT.get(state));
Map<String, Long> retry = MSTAGE_SECONDARY_INPUT.getAuthenticationRetry(state);
setRetryDelayInSec(retry.get(KEY_WORD_RETRY_DELAY_IN_SEC));
setRetryCount(retry.get(KEY_WORD_RETRY_COUNT));
setSecondaryInputs(MSTAGE_SECONDARY_INPUT.get(state));
setIsSecondaryAuthenticationEnabled(checkSecondaryAuthenticationEnabled());
setIsSecondaryAuthenticationEnabled(MSTAGE_SECONDARY_INPUT.isAuthenticationEnabled(state));

setSourceSchema(readSourceSchemaFromUrn(state, MSTAGE_SOURCE_SCHEMA_URN.get(state)));
setTargetSchema(readTargetSchemaFromUrn(state, MSTAGE_TARGET_SCHEMA_URN.get(state)));
Expand Down Expand Up @@ -464,73 +456,6 @@ WorkUnitPartitionTypes parsePartitionType(State state) {
return partitionType;
}

/**
* This method populates the retry parameters (delayInSec, retryCount) via the secondary input.
* These values are used to retry connection whenever the "authentication" type category is defined and the token hasn't
* been populated yet. If un-defined, they will retain the default values as specified by RETRY_DEFAULT_DELAY and
* RETRY_DEFAULT_COUNT.
*
* For e.g.
* ms.secondary.input : "[{"path": "/util/avro_retry", "fields": ["uuid"],
* "category": "authentication", "retry": {"delayInSec" : "1", "retryCount" : "2"}}]"
* @param jsonArray the raw secondary input
* @return the retry delay and count in a map structure
*/
private Map<String, Long> parseSecondaryInputRetry(JsonArray jsonArray) {
long retryDelay = RETRY_DELAY_IN_SEC_DEFAULT;
long retryCount = RETRY_COUNT_DEFAULT;
Map<String, Long> retry = new HashMap<>();
for (JsonElement field: jsonArray) {
JsonObject retryFields = (JsonObject) field.getAsJsonObject().get(KEY_WORD_RETRY);
if (retryFields != null && !retryFields.isJsonNull()) {
retryDelay = retryFields.has(KEY_WORD_RETRY_DELAY_IN_SEC)
? retryFields.get(KEY_WORD_RETRY_DELAY_IN_SEC).getAsLong() : retryDelay;
retryCount = retryFields.has(KEY_WORD_RETRY_COUNT)
? retryFields.get(KEY_WORD_RETRY_COUNT).getAsLong() : retryCount;
}
}
retry.put(KEY_WORD_RETRY_DELAY_IN_SEC, retryDelay);
retry.put(KEY_WORD_RETRY_COUNT, retryCount);
return retry;
}

/**
* Check if authentication is configured in secondary input
* @return true if secondary input contains an authentication definition
*/
protected boolean checkSecondaryAuthenticationEnabled() {
for (JsonElement entry: getSecondaryInputs()) {
if (entry.isJsonObject()
&& entry.getAsJsonObject().has(KEY_WORD_CATEGORY)
&& entry.getAsJsonObject().get(KEY_WORD_CATEGORY).getAsString()
.equalsIgnoreCase(KEY_WORD_AUTHENTICATION)) {
return true;
}
}
return false;
}

public Map<String, JsonArray> readSecondaryInputs(State state, final long retries) throws InterruptedException {
LOG.info("Trying to read secondary input with retry = {}", retries);
Map<String, JsonArray> secondaryInputs = readContext(state);

// Check if authentication is ready, and if not, whether retry is required
JsonArray authentications = secondaryInputs.get(KEY_WORD_AUTHENTICATION);
if ((authentications == null || authentications.size() == 0) && this.getIsSecondaryAuthenticationEnabled()
&& retries > 0) {
LOG.info("Authentication tokens are expected from secondary input, but not ready");
LOG.info("Will wait for {} seconds and then retry reading the secondary input", this.getRetryDelayInSec());
TimeUnit.SECONDS.sleep(this.getRetryDelayInSec());
return readSecondaryInputs(state, retries - 1);
}
LOG.info("Successfully read secondary input, no more retry");
return secondaryInputs;
}

private Map<String, JsonArray> readContext(State state) {
return new HdfsReader(state, this.getSecondaryInputs()).readAll();
}

/**
* Call the reader factory and read schema of the URN
* @param urn the dataset URN
Expand Down Expand Up @@ -749,14 +674,6 @@ public void setIsPartialPartition(Boolean partialPartition) {
isPartialPartition = partialPartition;
}

public JsonArray getSecondaryInputs() {
return secondaryInputs;
}

public void setSecondaryInputs(JsonArray secondaryInputs) {
this.secondaryInputs = secondaryInputs;
}

public WorkUnitPartitionTypes getWorkUnitPartitionType() {
return workUnitPartitionType;
}
Expand Down
Loading

0 comments on commit 0defa73

Please sign in to comment.