diff --git a/README.md b/README.md index 7de3150..8710018 100644 --- a/README.md +++ b/README.md @@ -62,8 +62,12 @@ To contribute, please use submit Pull Request (PR) for committers to merge. - `git push origin +master` - check your fork should be in sync with the main repository -# Detailed Documents +# User Guides -- [Job Properties](https://github.com/linkedin/data-integration-library/blob/master/docs/parameters/summary.md) -- [Job Properties by Category](https://github.com/linkedin/data-integration-library/blob/master/docs/parameters/categories.md) -- [Deprecated Job Properties](https://github.com/linkedin/data-integration-library/blob/master/docs/parameters/deprecated.md) \ No newline at end of file +- [1. Job Properties](https://github.com/linkedin/data-integration-library/blob/master/docs/parameters/summary.md) +- [2. Job Properties by Category](https://github.com/linkedin/data-integration-library/blob/master/docs/parameters/categories.md) +- [3. Deprecated Job Properties](https://github.com/linkedin/data-integration-library/blob/master/docs/parameters/deprecated.md) +- [4. Flow Design Patterns](docs/patterns/summary.md) +- [5. How-tos](docs/how-to/summary.md) +- [6. Key Concepts](docs/concepts/summary.md) +- [7. Sample Flow Configurations](docs/sample-configs/summary.md) \ No newline at end of file diff --git a/cdi-core/src/main/java/com/linkedin/cdi/configuration/JsonArrayProperties.java b/cdi-core/src/main/java/com/linkedin/cdi/configuration/JsonArrayProperties.java index 868755b..87ddcd0 100644 --- a/cdi-core/src/main/java/com/linkedin/cdi/configuration/JsonArrayProperties.java +++ b/cdi-core/src/main/java/com/linkedin/cdi/configuration/JsonArrayProperties.java @@ -5,6 +5,9 @@ package com.linkedin.cdi.configuration; import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import com.linkedin.cdi.util.VariableUtils; +import java.io.IOException; import org.apache.commons.lang3.StringUtils; import org.apache.gobblin.configuration.State; import org.slf4j.Logger; @@ -110,4 +113,22 @@ protected JsonArray getValidNonblankWithDefault(State state) { } return getDefaultValue(); } + + /** + * Retrieves property value from state object if valid and not blank, then apply dynamic variables, + * otherwise, return default value of the property type + * + * @param state state + * @param parameters dynamic parameters + * @return JsonArray of the property with variables substituted + */ + public JsonArray get(State state, JsonObject parameters) { + String propertyValue = get(state).toString(); + try { + propertyValue = VariableUtils.replaceWithTracking(propertyValue, parameters, false).getKey(); + } catch (IOException e) { + LOG.error("Invalid parameter: " + parameters); + } + return GSON.fromJson(propertyValue, JsonArray.class); + } } diff --git a/cdi-core/src/main/java/com/linkedin/cdi/configuration/PropertyCollection.java b/cdi-core/src/main/java/com/linkedin/cdi/configuration/PropertyCollection.java index 7918a5d..94a41bb 100644 --- a/cdi-core/src/main/java/com/linkedin/cdi/configuration/PropertyCollection.java +++ b/cdi-core/src/main/java/com/linkedin/cdi/configuration/PropertyCollection.java @@ -222,7 +222,7 @@ public JsonObject getDefaultValue() { IntegerProperties MSTAGE_S3_LIST_MAX_KEYS = new IntegerProperties("ms.s3.list.max.keys", 1000, Integer.MAX_VALUE, 1); JsonObjectProperties MSTAGE_SCHEMA_CLEANSING = new JsonObjectProperties("ms.schema.cleansing"); - JsonArrayProperties MSTAGE_SECONDARY_INPUT = new JsonArrayProperties("ms.secondary.input"); + SecondaryInputProperties MSTAGE_SECONDARY_INPUT = new SecondaryInputProperties("ms.secondary.input"); JsonObjectProperties MSTAGE_SESSION_KEY_FIELD = new JsonObjectProperties("ms.session.key.field"); // default: 60 seconds, minimum: 0, maximum: - diff --git a/cdi-core/src/main/java/com/linkedin/cdi/configuration/SecondaryInputProperties.java b/cdi-core/src/main/java/com/linkedin/cdi/configuration/SecondaryInputProperties.java new file mode 100644 index 0000000..96708fe --- /dev/null +++ b/cdi-core/src/main/java/com/linkedin/cdi/configuration/SecondaryInputProperties.java @@ -0,0 +1,160 @@ +// Copyright 2021 LinkedIn Corporation. All rights reserved. +// Licensed under the BSD-2 Clause license. +// See LICENSE in the project root for license information. + +package com.linkedin.cdi.configuration; + +import com.google.common.collect.Lists; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonNull; +import com.google.gson.JsonObject; +import com.linkedin.cdi.util.HdfsReader; +import com.linkedin.cdi.util.JsonUtils; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.gobblin.configuration.State; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.linkedin.cdi.configuration.StaticConstants.*; + + +public class SecondaryInputProperties extends JsonArrayProperties { + private static final Logger LOG = LoggerFactory.getLogger(SecondaryInputProperties.class); + final private static int RETRY_DELAY_IN_SEC_DEFAULT = 300; + final private static int RETRY_COUNT_DEFAULT = 3; + + /** + * Constructor with implicit default value + * @param config property name + */ + SecondaryInputProperties(String config) { + super(config); + } + + @Override + public boolean isValid(State state) { + final List categories = Lists.newArrayList("authentication", "activation", "payload"); + if (super.isValid(state) && !isBlank(state)) { + JsonArray value = GSON.fromJson(state.getProp(getConfig()), JsonArray.class); + + // check categories, make sure they are spelled properly + for (JsonElement entry : value) { + if (!entry.isJsonObject() || !entry.getAsJsonObject().has(KEY_WORD_PATH)) { + return false; + } + } + + // check categories, make sure they are spelled properly + for (JsonElement si : value) { + if (JsonUtils.get(KEY_WORD_CATEGORY, si.getAsJsonObject()) != JsonNull.INSTANCE) { + String category = JsonUtils.get(KEY_WORD_CATEGORY, si.getAsJsonObject()).getAsString(); + if (categories.stream().noneMatch(x -> x.equals(category))) { + return false; + } + } + } + } + return super.isValid(state); + } + + /** + * reads the authentication content + * + * @param state the state object + * @return the secondary input entries + */ + public Map readAuthenticationToken(State state) { + Map secondaryInputs = new HashMap<>(); + JsonArray categoryData = secondaryInputs.computeIfAbsent(KEY_WORD_AUTHENTICATION, x -> new JsonArray()); + categoryData.addAll(new HdfsReader(state).readSecondary(getAuthenticationDefinition(state).getAsJsonObject())); + return secondaryInputs; + } + + /** + * Read authentication and activation secondary input records and payload definitions (not records) + * + * @return a set of JsonArrays of data read from locations specified in SECONDARY_INPUT + * property organized by category, in a Map structure + */ + public Map readAllContext(State state) { + Map secondaryInputs = new HashMap<>(); + for (JsonElement entry: get(state)) { + if (!entry.getAsJsonObject().has(KEY_WORD_PATH)) { + continue; + } + + String category = entry.getAsJsonObject().has(KEY_WORD_CATEGORY) + ? entry.getAsJsonObject().get(KEY_WORD_CATEGORY).getAsString() + : KEY_WORD_ACTIVATION; + + JsonArray categoryData = secondaryInputs.computeIfAbsent(category, x -> new JsonArray()); + if (category.equalsIgnoreCase(KEY_WORD_ACTIVATION) || category.equalsIgnoreCase(KEY_WORD_AUTHENTICATION)) { + categoryData.addAll(new HdfsReader(state).readSecondary(entry.getAsJsonObject())); + } + + if (entry.getAsJsonObject().has(KEY_WORD_PATH) && category.equalsIgnoreCase(KEY_WORD_PAYLOAD)) { + categoryData.add(entry); + } + + } + return secondaryInputs; + } + + /** + * Check if authentication is configured in secondary input + * @return true if secondary input contains an authentication definition + */ + public boolean isAuthenticationEnabled(State state) { + return getAuthenticationDefinition(state).entrySet().size() > 0; + } + + /** + * Get the authentication part of the secondary input, + * @param state state object + * @return the authentication secondary input + */ + private JsonObject getAuthenticationDefinition(State state) { + for (JsonElement entry : get(state)) { + if (entry.isJsonObject() && entry.getAsJsonObject().has(KEY_WORD_CATEGORY)) { + String category = entry.getAsJsonObject().get(KEY_WORD_CATEGORY).getAsString(); + if (category.equalsIgnoreCase(KEY_WORD_AUTHENTICATION)) { + return entry.getAsJsonObject(); + } + } + } + return new JsonObject(); + } + + /** + * This method populates the retry parameters (delayInSec, retryCount) via the secondary input. + * These values are used to retry connection whenever the "authentication" type category is defined and the token hasn't + * been populated yet. If un-defined, they will retain the default values as specified by RETRY_DEFAULT_DELAY and + * RETRY_DEFAULT_COUNT. + * + * For e.g. + * ms.secondary.input : "[{"path": "/util/avro_retry", "fields": ["uuid"], + * "category": "authentication", "retry": {"delayInSec" : "1", "retryCount" : "2"}}]" + * @param state the state record + * @return the retry delay and count in a map structure + */ + public Map getAuthenticationRetry(State state) { + long retryDelay = RETRY_DELAY_IN_SEC_DEFAULT; + long retryCount = RETRY_COUNT_DEFAULT; + Map retry = new HashMap<>(); + + if (JsonUtils.get(KEY_WORD_RETRY, getAuthenticationDefinition(state)) != JsonNull.INSTANCE) { + JsonObject retryFields = getAuthenticationDefinition(state).get(KEY_WORD_RETRY).getAsJsonObject(); + retryDelay = retryFields.has(KEY_WORD_RETRY_DELAY_IN_SEC) + ? retryFields.get(KEY_WORD_RETRY_DELAY_IN_SEC).getAsLong() : retryDelay; + retryCount = retryFields.has(KEY_WORD_RETRY_COUNT) + ? retryFields.get(KEY_WORD_RETRY_COUNT).getAsLong() : retryCount; + } + + retry.put(KEY_WORD_RETRY_DELAY_IN_SEC, retryDelay); + retry.put(KEY_WORD_RETRY_COUNT, retryCount); + return retry; + } +} diff --git a/cdi-core/src/main/java/com/linkedin/cdi/configuration/WatermarkProperties.java b/cdi-core/src/main/java/com/linkedin/cdi/configuration/WatermarkProperties.java index fdfcebd..efda549 100644 --- a/cdi-core/src/main/java/com/linkedin/cdi/configuration/WatermarkProperties.java +++ b/cdi-core/src/main/java/com/linkedin/cdi/configuration/WatermarkProperties.java @@ -36,8 +36,6 @@ public class WatermarkProperties extends JsonArrayProperties { public boolean isValid(State state) { final List types = Lists.newArrayList("unit", "datetime"); if (super.isValid(state) && !isBlank(state)) { - // Derived fields should meet general JsonArray configuration requirements - // and contain only JsonObject items that each has a "name" element and a "formula" element JsonArray value = GSON.fromJson(state.getProp(getConfig()), JsonArray.class); if (value.size() == 0) { return false; diff --git a/cdi-core/src/main/java/com/linkedin/cdi/keys/JobKeys.java b/cdi-core/src/main/java/com/linkedin/cdi/keys/JobKeys.java index 3264119..f79d7c5 100644 --- a/cdi-core/src/main/java/com/linkedin/cdi/keys/JobKeys.java +++ b/cdi-core/src/main/java/com/linkedin/cdi/keys/JobKeys.java @@ -15,7 +15,6 @@ import com.linkedin.cdi.factory.ConnectionClientFactory; import com.linkedin.cdi.factory.reader.SchemaReader; import com.linkedin.cdi.util.DateTimeUtils; -import com.linkedin.cdi.util.HdfsReader; import com.linkedin.cdi.util.JsonUtils; import com.linkedin.cdi.util.ParameterTypes; import com.linkedin.cdi.util.WorkUnitPartitionTypes; @@ -23,7 +22,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.apache.gobblin.configuration.State; import org.joda.time.DateTime; @@ -51,10 +49,6 @@ public class JobKeys { private static final Logger LOG = LoggerFactory.getLogger(JobKeys.class); final static public Gson GSON = new Gson(); - final private static int RETRY_DELAY_IN_SEC_DEFAULT = 300; - final private static int RETRY_COUNT_DEFAULT = 3; - final private static String ITEMS_KEY = "items"; - private Map> derivedFields = new HashMap<>(); private Map defaultFieldTypes = new HashMap<>(); @@ -81,7 +75,6 @@ public class JobKeys { private long retryDelayInSec; private long retryCount; private Boolean isPartialPartition; - private JsonArray secondaryInputs = new JsonArray(); private WorkUnitPartitionTypes workUnitPartitionType; private Boolean isSecondaryAuthenticationEnabled = false; private String sourceUri = StringUtils.EMPTY; @@ -128,11 +121,10 @@ public void initialize(State state) { setIsPartialPartition(MSTAGE_WORK_UNIT_PARTIAL_PARTITION.get(state)); setWorkUnitPartitionType(parsePartitionType(state)); setWatermarkDefinition(MSTAGE_WATERMARK.get(state)); - Map retry = parseSecondaryInputRetry(MSTAGE_SECONDARY_INPUT.get(state)); + Map retry = MSTAGE_SECONDARY_INPUT.getAuthenticationRetry(state); setRetryDelayInSec(retry.get(KEY_WORD_RETRY_DELAY_IN_SEC)); setRetryCount(retry.get(KEY_WORD_RETRY_COUNT)); - setSecondaryInputs(MSTAGE_SECONDARY_INPUT.get(state)); - setIsSecondaryAuthenticationEnabled(checkSecondaryAuthenticationEnabled()); + setIsSecondaryAuthenticationEnabled(MSTAGE_SECONDARY_INPUT.isAuthenticationEnabled(state)); setSourceSchema(readSourceSchemaFromUrn(state, MSTAGE_SOURCE_SCHEMA_URN.get(state))); setTargetSchema(readTargetSchemaFromUrn(state, MSTAGE_TARGET_SCHEMA_URN.get(state))); @@ -464,73 +456,6 @@ WorkUnitPartitionTypes parsePartitionType(State state) { return partitionType; } - /** - * This method populates the retry parameters (delayInSec, retryCount) via the secondary input. - * These values are used to retry connection whenever the "authentication" type category is defined and the token hasn't - * been populated yet. If un-defined, they will retain the default values as specified by RETRY_DEFAULT_DELAY and - * RETRY_DEFAULT_COUNT. - * - * For e.g. - * ms.secondary.input : "[{"path": "/util/avro_retry", "fields": ["uuid"], - * "category": "authentication", "retry": {"delayInSec" : "1", "retryCount" : "2"}}]" - * @param jsonArray the raw secondary input - * @return the retry delay and count in a map structure - */ - private Map parseSecondaryInputRetry(JsonArray jsonArray) { - long retryDelay = RETRY_DELAY_IN_SEC_DEFAULT; - long retryCount = RETRY_COUNT_DEFAULT; - Map retry = new HashMap<>(); - for (JsonElement field: jsonArray) { - JsonObject retryFields = (JsonObject) field.getAsJsonObject().get(KEY_WORD_RETRY); - if (retryFields != null && !retryFields.isJsonNull()) { - retryDelay = retryFields.has(KEY_WORD_RETRY_DELAY_IN_SEC) - ? retryFields.get(KEY_WORD_RETRY_DELAY_IN_SEC).getAsLong() : retryDelay; - retryCount = retryFields.has(KEY_WORD_RETRY_COUNT) - ? retryFields.get(KEY_WORD_RETRY_COUNT).getAsLong() : retryCount; - } - } - retry.put(KEY_WORD_RETRY_DELAY_IN_SEC, retryDelay); - retry.put(KEY_WORD_RETRY_COUNT, retryCount); - return retry; - } - - /** - * Check if authentication is configured in secondary input - * @return true if secondary input contains an authentication definition - */ - protected boolean checkSecondaryAuthenticationEnabled() { - for (JsonElement entry: getSecondaryInputs()) { - if (entry.isJsonObject() - && entry.getAsJsonObject().has(KEY_WORD_CATEGORY) - && entry.getAsJsonObject().get(KEY_WORD_CATEGORY).getAsString() - .equalsIgnoreCase(KEY_WORD_AUTHENTICATION)) { - return true; - } - } - return false; - } - - public Map readSecondaryInputs(State state, final long retries) throws InterruptedException { - LOG.info("Trying to read secondary input with retry = {}", retries); - Map secondaryInputs = readContext(state); - - // Check if authentication is ready, and if not, whether retry is required - JsonArray authentications = secondaryInputs.get(KEY_WORD_AUTHENTICATION); - if ((authentications == null || authentications.size() == 0) && this.getIsSecondaryAuthenticationEnabled() - && retries > 0) { - LOG.info("Authentication tokens are expected from secondary input, but not ready"); - LOG.info("Will wait for {} seconds and then retry reading the secondary input", this.getRetryDelayInSec()); - TimeUnit.SECONDS.sleep(this.getRetryDelayInSec()); - return readSecondaryInputs(state, retries - 1); - } - LOG.info("Successfully read secondary input, no more retry"); - return secondaryInputs; - } - - private Map readContext(State state) { - return new HdfsReader(state, this.getSecondaryInputs()).readAll(); - } - /** * Call the reader factory and read schema of the URN * @param urn the dataset URN @@ -749,14 +674,6 @@ public void setIsPartialPartition(Boolean partialPartition) { isPartialPartition = partialPartition; } - public JsonArray getSecondaryInputs() { - return secondaryInputs; - } - - public void setSecondaryInputs(JsonArray secondaryInputs) { - this.secondaryInputs = secondaryInputs; - } - public WorkUnitPartitionTypes getWorkUnitPartitionType() { return workUnitPartitionType; } diff --git a/cdi-core/src/main/java/com/linkedin/cdi/source/MultistageSource.java b/cdi-core/src/main/java/com/linkedin/cdi/source/MultistageSource.java index bc91a57..bff29ec 100644 --- a/cdi-core/src/main/java/com/linkedin/cdi/source/MultistageSource.java +++ b/cdi-core/src/main/java/com/linkedin/cdi/source/MultistageSource.java @@ -14,7 +14,6 @@ import com.linkedin.cdi.extractor.MultistageExtractor; import com.linkedin.cdi.keys.JobKeys; import com.linkedin.cdi.util.EndecoUtils; -import com.linkedin.cdi.util.HdfsReader; import com.linkedin.cdi.util.WatermarkDefinition; import java.lang.reflect.Constructor; import java.util.ArrayList; @@ -23,7 +22,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutablePair; @@ -127,7 +125,7 @@ public List getWorkunits(SourceState state) { jobKeys.getWorkUnitPartitionType())); } - Map secondaryInputs = readSecondaryInputs(sourceState, jobKeys.getRetryCount()); + Map secondaryInputs = MSTAGE_SECONDARY_INPUT.readAllContext(sourceState); JsonArray authentications = secondaryInputs.get(KEY_WORD_AUTHENTICATION); JsonArray activations = secondaryInputs.computeIfAbsent(KEY_WORD_ACTIVATION, x -> new JsonArray()); JsonArray payloads = secondaryInputs.computeIfAbsent(KEY_WORD_PAYLOAD, x -> new JsonArray()); @@ -157,45 +155,18 @@ public List getWorkunits(SourceState state) { jobKeys.getMinWorkUnits())); } - if (authentications != null && authentications.size() == 1) { - for (WorkUnit wu : wuList) { + for (WorkUnit wu : wuList) { + if (authentications != null && authentications.size() == 1) { wu.setProp(MSTAGE_ACTIVATION_PROPERTY.toString(), getUpdatedWorkUnitActivation(wu, authentications.get(0).getAsJsonObject())); - + } // unlike activation secondary inputs, payloads will be processed in each work unit // and payloads will not be loaded until the Connection executes the command wu.setProp(MSTAGE_PAYLOAD_PROPERTY.toString(), payloads); - } } return wuList; } - /** - * reads the multistage source to get the secondary input categories - authentication and activation - * In case the token is missing, it will retry accessing the tokens as per the retry parameters - * ("delayInSec", "retryCount") - */ - private Map readSecondaryInputs(State state, final long retries) { - LOG.info("Trying to read secondary input with retry = {}", retries); - Map secondaryInputs = readContext(state); - - // Check if authentication is ready, and if not, whether retry is required - JsonArray authentications = secondaryInputs.get(KEY_WORD_AUTHENTICATION); - if ((authentications == null || authentications.size() == 0) - && jobKeys.getIsSecondaryAuthenticationEnabled() && retries > 0) { - LOG.info("Authentication tokens are expected from secondary input, but not ready"); - LOG.info("Will wait for {} seconds and then retry reading the secondary input", jobKeys.getRetryDelayInSec()); - try { - TimeUnit.SECONDS.sleep(jobKeys.getRetryDelayInSec()); - } catch (Exception e) { - throw new RuntimeException("Sleep() interrupted", e); - } - return readSecondaryInputs(state, retries - 1); - } - LOG.info("Successfully read secondary input, no more retry"); - return secondaryInputs; - } - /** * Default multi-stage source behavior, each protocol shall override this with more concrete function * @param state WorkUnitState passed in from Gobblin framework @@ -394,44 +365,6 @@ private List> getDatetimePartitions(ImmutablePair structure - */ - private Map readContext(State state) { - Map secondaryInputs = new HashMap<>(); - for (JsonElement entry: jobKeys.getSecondaryInputs()) { - if (!entry.getAsJsonObject().has(KEY_WORD_PATH)) { - continue; - } - - String category = entry.getAsJsonObject().has(KEY_WORD_CATEGORY) - ? entry.getAsJsonObject().get(KEY_WORD_CATEGORY).getAsString() - : KEY_WORD_ACTIVATION; - - JsonArray categoryData = secondaryInputs.computeIfAbsent(category, x -> new JsonArray()); - if (category.equalsIgnoreCase(KEY_WORD_ACTIVATION) || category.equalsIgnoreCase(KEY_WORD_AUTHENTICATION)) { - categoryData.addAll(new HdfsReader(state).readSecondary(entry.getAsJsonObject())); - } - - if (entry.getAsJsonObject().has(KEY_WORD_PATH) && category.equalsIgnoreCase(KEY_WORD_PAYLOAD)) { - categoryData.add(entry); - } - - } - return secondaryInputs; - } - /** * Get all previous highest high watermarks, by dataset URN. If a dataset URN * had multiple work units, the highest high watermark is retrieved for that @@ -477,18 +410,12 @@ private String getWorkUnitSignature( /** * retrieve the authentication data from secondary input - * TODO there is a slight inefficiency here - * @param retries number of retries remaining + * TODO this is not being used in handling HTTP 403 error * @return the authentication JsonObject */ - protected JsonObject readSecondaryAuthentication(State state, final long retries) throws InterruptedException { - Map secondaryInputs = readSecondaryInputs(state, retries); - if (secondaryInputs.containsKey(KEY_WORD_ACTIVATION) - && secondaryInputs.get(KEY_WORD_AUTHENTICATION).isJsonArray() - && secondaryInputs.get(KEY_WORD_AUTHENTICATION).getAsJsonArray().size() > 0) { - return secondaryInputs.get(KEY_WORD_AUTHENTICATION).get(0).getAsJsonObject(); - } - return new JsonObject(); + protected JsonObject readSecondaryAuthentication(State state) throws InterruptedException { + Map secondaryInputs = MSTAGE_SECONDARY_INPUT.readAuthenticationToken(state); + return secondaryInputs.get(KEY_WORD_AUTHENTICATION).get(0).getAsJsonObject(); } /** diff --git a/cdi-core/src/test/java/com/linkedin/cdi/configuration/MultistagePropertiesIndividualTest.java b/cdi-core/src/test/java/com/linkedin/cdi/configuration/MultistagePropertiesIndividualTest.java index 4275d20..558a372 100644 --- a/cdi-core/src/test/java/com/linkedin/cdi/configuration/MultistagePropertiesIndividualTest.java +++ b/cdi-core/src/test/java/com/linkedin/cdi/configuration/MultistagePropertiesIndividualTest.java @@ -262,4 +262,12 @@ public void testWorkUnitParallelismMax() { Assert.assertFalse(MSTAGE_WORK_UNIT_PARALLELISM_MAX.isValid(state)); } + @Test + public void testSecondaryInput() throws Exception { + SourceState state = new SourceState(); + Assert.assertTrue(MSTAGE_SECONDARY_INPUT.isValid(state)); + state.setProp("ms.secondary.input", "[{\"path\": \"dummy\", \"fields\": [\"access_token\"], \"category\": \"authentication\",\"retry\": {\"threadpool\": 5}}]"); + Assert.assertEquals((long) MSTAGE_SECONDARY_INPUT.getAuthenticationRetry(state).get("delayInSec"), 300L); + Assert.assertEquals((long) MSTAGE_SECONDARY_INPUT.getAuthenticationRetry(state).get("retryCount"), 3); + } } diff --git a/cdi-core/src/test/java/com/linkedin/cdi/keys/JobKeysTest.java b/cdi-core/src/test/java/com/linkedin/cdi/keys/JobKeysTest.java index da1110e..3e718ed 100644 --- a/cdi-core/src/test/java/com/linkedin/cdi/keys/JobKeysTest.java +++ b/cdi-core/src/test/java/com/linkedin/cdi/keys/JobKeysTest.java @@ -154,22 +154,6 @@ public void testGetDefaultFieldTypes() throws Exception { Assert.assertEquals(method.invoke(jobkeys, state).toString(), "{testField=100}"); } - @Test - public void testParseSecondaryInputRetry() throws Exception { - JobKeys jobkeys = new JobKeys(); - JsonArray input = gson.fromJson("[{\"retry\": {\"threadpool\": 5}}]", JsonArray.class); - Method method = JobKeys.class.getDeclaredMethod("parseSecondaryInputRetry", JsonArray.class); - method.setAccessible(true); - Map actual = (Map) method.invoke(jobkeys, input); - Assert.assertEquals((long) actual.get("delayInSec"), 300L); - Assert.assertEquals((long) actual.get("retryCount"), 3); - - input = gson.fromJson("[{\"retry\": {\"delayInSec\": 500,\"retryCount\": 5}}]", JsonArray.class); - actual = (Map) method.invoke(jobkeys, input); - Assert.assertEquals((long) actual.get("delayInSec"), 500L); - Assert.assertEquals((long) actual.get("retryCount"), 5); - } - @Test public void testGetPaginationInitialValues() throws Exception { JobKeys jobkeys = new JobKeys(); diff --git a/cdi-core/src/test/java/com/linkedin/cdi/source/MultistageSourceTest.java b/cdi-core/src/test/java/com/linkedin/cdi/source/MultistageSourceTest.java index 26914a6..129cb29 100644 --- a/cdi-core/src/test/java/com/linkedin/cdi/source/MultistageSourceTest.java +++ b/cdi-core/src/test/java/com/linkedin/cdi/source/MultistageSourceTest.java @@ -13,7 +13,6 @@ import com.linkedin.cdi.util.EndecoUtils; import com.linkedin.cdi.util.WatermarkDefinition; import com.linkedin.cdi.util.WorkUnitPartitionTypes; -import com.mchange.util.AssertException; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; @@ -225,25 +224,6 @@ public void testUnitWatermark(){ Assert.assertEquals(source.getWorkunits(state).size(), 3); } - @Test - public void testIsSecondaryAuthenticationEnabledWithInvalidSecondaryInput() { - JobKeys jobKeys = Mockito.mock(JobKeys.class); - source.jobKeys = jobKeys; - JsonArray secondaryInput = gson.fromJson("[\"test_field\"]", JsonArray.class); - when(jobKeys.getSecondaryInputs()).thenReturn(secondaryInput); - Assert.assertFalse(source.jobKeys.getIsSecondaryAuthenticationEnabled()); - } - - @Test - public void testReadSecondaryAuthentication() throws InterruptedException { - JsonArray secondaryInput = gson.fromJson("[{\"fields\": [\"access_token\"], \"category\": \"authentication\"}]", JsonArray.class); - JobKeys jobKeys = Mockito.mock(JobKeys.class); - State state = Mockito.mock(State.class); - when(jobKeys.getSecondaryInputs()).thenReturn(secondaryInput); - source.jobKeys = jobKeys; - Assert.assertEquals(source.readSecondaryAuthentication(state, 1L).toString(), "{}"); - } - @Test public void testGetUpdatedWorkUnitActivation() { WorkUnit workUnit = Mockito.mock(WorkUnit.class); diff --git a/docs/how-to/data-conversion.md b/docs/how-to/data-conversion.md new file mode 100644 index 0000000..95c9a14 --- /dev/null +++ b/docs/how-to/data-conversion.md @@ -0,0 +1,77 @@ +# Config Data Transformation + +Data conversion for ingestion includes the following two types: +- To create derived fields +- Data format conversion +- Dataset and schema tagging +- Encrypting sensitive information + +## Config Derived Fields + +Derived fields are used in the following scenarios: + +- Create one or more primary or delta fields for incremental data compaction +- Push global information down to each row to denormalize a data structure +- Pull a nested data element up to top row level so that it can be used as primary or delta field + +Derived fields are configured through [ms.derived.fields](../parameters/ms.derived.fields.md). + +## Config Data Format Conversion + +Data format conversion includes: + +- Converting CSV data to JSON +- Converting JSON data to Avro +- Converting rows into batches of rows + +Data format conversion are handled by converters, the configuration is [converter.classes](../parameters/converter.classes.md). + +Converters are optional, and there could be multiple converters, i.e, the number of converters can be 0 or more. +Typical converters are: +- org.apache.gobblin.converter.avro.JsonIntermediateToAvroConverter +- org.apache.gobblin.converter.csv.CsvToJsonConverterV2 +- com.linkedin.cdi.converter.JsonNormalizerConverter +- com.linkedin.cdi.converter.AvroNormalizerConverter +- org.apache.gobblin.converter.LumosAttributesConverter +- com.linkedin.cdi.converter.InFlowValidationConverter +- org.apache.gobblin.converter.IdentityConverter + +Each converter can have its own set of properties. + +### CSV to JSON Converter Properties + +- [ms.csv](../parameters/ms.csv.md) specifies the CSV attributes like header line position and column projection, etc. + +### JSON to Avro Converter Properties + +- [converter.avro.date.format](../parameters/converter.avro.date.format.md), optional, only needed if there are "date" type fields +- [converter.avro.time.format](../parameters/converter.avro.time.format.md), optional, only needed if there are "time" type fields +- [converter.avro.timestamp.format](../parameters/converter.avro.timestamp.format.md), optional, only needed if there are "timestamp" type fields + +### Normalizer Properties + +- [ms.normalizer.batch.size](../parameters/ms.normalizer.batch.size.md), optional +- [ms.data.explicit.eof](../parameters/ms.data.explicit.eof.md), required to be true + +### Data Validation Properties + +- [ms.validation.attributes](../parameters/ms.validation.attributes.md) + +## Config Dataset and Schema Tagging + +The tagging converters tag attributes to the ingested dataset, at the dataset level or field level. + +Currently, the following properties are for dataset tagging: +- **extract.primary.key.fields**, one or more fields that can be used as the logical primary key of the dataset. A primary + key field can be a nested field. +- **extract.delta.fields**, one of more fields that can be used as the delta key of the newly extracted records so that they + can be merged with previously extracted records properly. Delta fields need to be of TIMESTAMP or LONG type. When it is LONG type + the data need to be EPOCH values. + +## Config Field Encryption + +Fields that have to stored with encryption for security can be configured through [ms.encryption.fields](../parameters/ms.encryption.fields.md). + +Those fields will be encrypted using Gobblin encryption codec. + +[Back to Summary](summary.md#config-data-transformation) \ No newline at end of file diff --git a/docs/how-to/data-persistence.md b/docs/how-to/data-persistence.md new file mode 100644 index 0000000..68e8d85 --- /dev/null +++ b/docs/how-to/data-persistence.md @@ -0,0 +1,29 @@ +# Config Data Persistence + +Data persistence includes writer and publisher configuration. Writer writes data into a working directory, and publisher +pushes data to the final storage, such as a data lake. + +To persist data into a HDFS folder in AVRO format, the job need the following optional configuration: + +- `writer.builder.class=gobblin.writer.AvroDataWriterBuilder`, optional writer class, default is Avro data writer +- `writer.file.path.type=namespace_table`, optionally change how the sub-folders should be structured, default +- `data.publisher.replace.final.dir`, optionally truncate the directory if it is a staging area for onetime use in each execution + +and the following standard configurations: + +- `data.publisher.final.dir=/path` +- `writer.destination.type=HDFS` +- `writer.fs.uri=hdfs://host:port` +- `writer.dir.permissions=750` +- `writer.include.record.count.in.file.names=true` +- `writer.output.format=AVRO` + +To persist data into ORC format, the following configurations are needed: + +TODO + +To persist data into partitioned folder structures, the following configurations are needed: + +TODO + +[Back to Summary](summary.md#config-data-persistence) diff --git a/docs/how-to/data-processing.md b/docs/how-to/data-processing.md index 8f49ff0..fe17118 100644 --- a/docs/how-to/data-processing.md +++ b/docs/how-to/data-processing.md @@ -43,5 +43,31 @@ To parse the incoming data, the job might need specify one or more of the follow - [ms.session.key.field](../parameters/ms.session.key.field.md) if the session key is available under the specific field, for example, `ms.session.key.field="name": "records.cursor"}` - [ms.pagination](../parameters/ms.pagination.md) if the pagination information like page start, page size, and page number etc are available, for example, `ms.pagination={"fields": ["offset", "limit"], "initialvalues": [0, 25000]}` - [ms.output.schema](../parameters/ms.output.schema.md) if the data format cannot be reliably inferred from the actual data, for example, `ms.output.schema=[{"columnName":"s3key","isNullable":"true","dataType":{"type":"string"}}]` +- [ms.http.response.type](../parameters/ms.http.response.type.md) if the response from the source system has content-type other than what the extractor is expecting. The + default expected content-type of `JsonExtractor` is application/json, and the default expected content-type of `CsvExtractor` is application/csv. + +## Schema Cleansing + +Incoming data may have schema names that are not supported in downstream processing, including converters or writers. Invalid +characters and white spaces can be replaced with more acceptable characters, such as "_" (underscore). + +For nested data, such as JSON, schema cleansing will go into nested schema and cleanse up to the lowest level. + +Schema cleansing is configured through: + +- [ms.schema.cleansing](../parameters/ms.schema.cleansing.md) + +## Column Projection + +Column projection allows: + +- Filter out some unwanted fields +- Reorder output fields + +For JSON data, [ms.output.schema](../parameters/ms.output.schema.md) includes fields to be output, and any fields not +in output schema will be excluded if [ms.enable.schema.based.filtering](../parameters/ms.enable.schema.based.filtering.md) is true, which +is the default value. + +For CSV data [ms.csv](../parameters/ms.csv.md) can specify what and how columns should be output. [Back to Summary](summary.md#config-data-processing) \ No newline at end of file diff --git a/docs/how-to/source-authentication.md b/docs/how-to/source-authentication.md index 125e714..04449e9 100644 --- a/docs/how-to/source-authentication.md +++ b/docs/how-to/source-authentication.md @@ -109,4 +109,17 @@ The following is a typical OAuth2.0 authentication flow configuration: - one or more subsequent jobs use the [token based authentication](../concepts/authentication-method.md#bearer-token-credentials) method leveraging the token from the first job through secondary input +## Config Source and Authentication for Egress + +In egression, data is sent out, and a response is returned. The configuration of egression job is +the same as an ingestion job as if it is ingesting the response from the target system; therefore, +above configuration steps apply to both ingestion and egression. Putting it in simple way: + +**egression = ingestion of the response** + +### Extra Egress Configurations + +In egression, the payload that will be sent out is supplied through [ms.secondary.input](../parameters/ms.secondary.input.md). +The secondary input type of "payload" indicating that the path contains files to be sent out. + [Back to Summary](summary.md#config-source-and-authentication) \ No newline at end of file diff --git a/docs/how-to/summary.md b/docs/how-to/summary.md index 1441a8d..06350ac 100644 --- a/docs/how-to/summary.md +++ b/docs/how-to/summary.md @@ -5,13 +5,6 @@ The source URI can contain variables. Using variables makes it dynamic. For exam entities from the same API endpoint, the entity ID be represented with a variable so that we just need one job configuration, not many repeating job configurations. -# [Config an Authentication Job](authentication-job.md) - -The purpose of the authentication job is to retrieve an authentication token from the third party system, so that -subsequent data extraction jobs can use the token to authenticate with the third party system. An authentication job is -needed mostly in cases where credentials or tokens have to be refreshed in each flow execution, such as in -the case of OAuth2.0 authentication. - # [Config Data Processing](data-processing.md) For data ingestion, data processing includes decrypting, uncompressing, and parsing extracted or downloaded data. The parsing @@ -20,6 +13,26 @@ step also includes retrieving metadata in order to decide the next action for pa For data egression, data processing includes reading and formatting payload, and structure the egress plan through proper pagination. +# [Config Data Transformation](data-conversion.md) + +Data conversion for ingestion includes the following two types: +- To create derived fields +- Data format conversion +- Dataset and schema tagging +- Encrypting sensitive information + +# [Config Data Persistence](data-persistence.md) + +Data persistence includes writer and publisher configuration. Writer writes data into a working directory, and publisher +pushes data to the final storage, such as a data lake. + +# [Config an Authentication Job](authentication-job.md) + +The purpose of the authentication job is to retrieve an authentication token from the third party system, so that +subsequent data extraction jobs can use the token to authenticate with the third party system. An authentication job is +needed mostly in cases where credentials or tokens have to be refreshed in each flow execution, such as in +the case of OAuth2.0 authentication. + # [Config a Status Checking Job](status-check-job.md) A status checking job ensures the data is ready for consumption on the third party system. diff --git a/docs/parameters/categories.md b/docs/parameters/categories.md index 63473ec..5baca65 100644 --- a/docs/parameters/categories.md +++ b/docs/parameters/categories.md @@ -137,4 +137,4 @@ The following properties are inherited from Gobblin and enhanced with explicitly - [extract.table.name](extract.table.name.md) - [job.commit.policy](job.commit.policy.md) - [source.class](source.class.md) -- [converter.class](converter.class.md) \ No newline at end of file +- [converter.class](converter.classes.md) \ No newline at end of file diff --git a/docs/parameters/gobblin.converter.class.md b/docs/parameters/converter.classes.md similarity index 100% rename from docs/parameters/gobblin.converter.class.md rename to docs/parameters/converter.classes.md diff --git a/docs/parameters/deprecated.md b/docs/parameters/deprecated.md index 01f6406..d704c78 100644 --- a/docs/parameters/deprecated.md +++ b/docs/parameters/deprecated.md @@ -17,9 +17,26 @@ have this job property will have no harm. ## Factories +These factory properties are replaced by [ms.connection.client.factory](ms.connection.client.factory.md) + - ms.http.client.factory - ms.source.schema.reader.factory - ms.target.schema.reader.factory -Above factories are replaced by [ms.connection.client.factory](ms.connection.client.factory.md) - +## CSV Properties + +The following properties are replaced by [ms.csv](ms.csv.md) +- `ms.csv.column.header` +- `ms.csv.column.header.index` +- `ms.csv.column.projection` +- `ms.csv.default.field.type` +- `ms.csv.escape.character` +- `ms.csv.quote.character` +- `ms.csv.separator` +- `ms.csv.skip.lines` +- `ms.converter.csv.max.failures` +- `ms.converter.keep.null.strings` +- `csv.max.failures` + +## Other Deprecations +- `dataset.name` replaced by [extract.table.name](extract.table.name.md) \ No newline at end of file diff --git a/docs/parameters/gobblin.source.class.md b/docs/parameters/source.class.md similarity index 100% rename from docs/parameters/gobblin.source.class.md rename to docs/parameters/source.class.md diff --git a/docs/parameters/summary.md b/docs/parameters/summary.md index 9c56210..1dd2b44 100644 --- a/docs/parameters/summary.md +++ b/docs/parameters/summary.md @@ -488,4 +488,4 @@ Writers and some converters don't work without it. "full" or "successful". ## [source.class](source.class.md) -## [converter.class](converter.class.md) +## [converter.class](converter.classes.md) diff --git a/docs/sample-configs/summary.md b/docs/sample-configs/summary.md new file mode 100644 index 0000000..e69de29