Skip to content

Commit

Permalink
Merge pull request #27 from chris9692/master
Browse files Browse the repository at this point in the history
Allow variables in payload path and documentation change
  • Loading branch information
yogi1324 authored Nov 23, 2021
2 parents b4fba07 + e6f8d2a commit fd53653
Show file tree
Hide file tree
Showing 14 changed files with 117 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public interface StaticConstants {
String EXCEPTION_INCORRECT_CONFIGURATION = "Property %s has incorrect configuration: %s, see: %s";
String EXCEPTION_DEPRECATED_CONFIGURATION = "Property %s has been deprecated, and the replacement is: %s, see: %s";

String ERROR_READING_SECONDARY_INPUT = "Error reading %s secondary input for work unit %s";

String MSG_ROWS_PROCESSED = "Processed %s records, work unit: %s";
String MSG_WORK_UNIT_ALWAYS = "There should be a work unit.";
String MSG_LOW_WATER_MARK_ALWAYS = "There should be a low watermark.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.linkedin.cdi.util.DateTimeUtils;
import com.linkedin.cdi.util.JsonUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -139,7 +140,9 @@ public List<String> getUnits(State state) {
}
return unitList;
} else if (units.isJsonPrimitive()) {
return Lists.newArrayList(units.getAsString().split(KEY_WORD_COMMA));
List<String> unitList = Lists.newArrayList();
Arrays.stream(units.getAsString().split(KEY_WORD_COMMA)).forEach(x -> unitList.add(x.trim()));
return unitList;
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,6 @@ public void setEof(Boolean eof) {
this.eof = eof;
}

public Iterator<JsonElement> getPayloadIterator() {
return payloadIterator;
}

public void setPayloadIterator(Iterator<JsonElement> payloadIterator) {
this.payloadIterator = payloadIterator;
}

public ExtractorKeys getExtractorKeys() {
return extractorKeys;
}
Expand Down Expand Up @@ -183,8 +175,7 @@ protected void initialize(ExtractorKeys keys) {
extractorKeys.setExplictEof(MSTAGE_DATA_EXPLICIT_EOF.get(state));
extractorKeys.setSignature(DATASET_URN.get(state));
extractorKeys.setPreprocessors(getPreprocessors(state));
extractorKeys.setPayloads(getPayloads(state));
payloadIterator = extractorKeys.getPayloads().iterator();
readPayloads(state);
extractorKeys.logDebugAll(state.getWorkunit());
}

Expand Down Expand Up @@ -795,12 +786,15 @@ public boolean closeConnection() {
*/
protected JsonObject getInitialWorkUnitParameters() {
JsonObject definedParameters =
JsonParameter.getParametersAsJson(jobKeys.getSourceParameters().toString(), getInitialWorkUnitVariableValues(),
JsonParameter.getParametersAsJson(MSTAGE_PARAMETERS.get(this.state).toString(), getInitialWorkUnitVariableValues(),
this.state);
JsonObject initialParameters = replaceVariablesInParameters(appendActivationParameter(definedParameters));
if (this.payloadIterator.hasNext()) {

// payload Iterator might not have been initialized yet
if (payloadIterator != null && payloadIterator.hasNext()) {
initialParameters.add("payload", payloadIterator.next());
}

return initialParameters;
}

Expand Down Expand Up @@ -869,10 +863,11 @@ private JsonObject appendActivationParameter(JsonObject parameters) {
}

protected JsonObject getCurrentWorkUnitParameters() {
JsonObject definedParameters = JsonParameter.getParametersAsJson(jobKeys.getSourceParameters().toString(),
JsonObject definedParameters = JsonParameter.getParametersAsJson(MSTAGE_PARAMETERS.get(state).toString(),
getUpdatedWorkUnitVariableValues(getInitialWorkUnitVariableValues()), state);
JsonObject currentParameters = replaceVariablesInParameters(appendActivationParameter(definedParameters));
if (this.payloadIterator.hasNext()) {

if (payloadIterator != null && payloadIterator.hasNext()) {
currentParameters.add("payload", payloadIterator.next());
}
return currentParameters;
Expand Down Expand Up @@ -941,15 +936,21 @@ private JsonObject getUpdatedWorkUnitVariableValues(JsonObject initialVariableVa
* override this to process payload differently.
*
* @param state WorkUnitState
* @return the payload records
*/
protected JsonArray getPayloads(State state) {
JsonArray payloads = MSTAGE_PAYLOAD_PROPERTY.get(state);
protected void readPayloads(State state) {
JsonArray payloads = MSTAGE_PAYLOAD_PROPERTY.get(state, getInitialWorkUnitParameters());
JsonArray records = new JsonArray();
for (JsonElement entry : payloads) {
records.addAll(new HdfsReader(state).readSecondary(entry.getAsJsonObject()));
try {
records.addAll(new HdfsReader(state).readSecondary(entry.getAsJsonObject()));
extractorKeys.setPayloads(records);
payloadIterator = records.iterator();
} catch (Exception e) {
// in exception, put payload definition as payload, keep iterator as null
LOG.error(String.format(ERROR_READING_SECONDARY_INPUT, KEY_WORD_PAYLOAD, DATASET_URN.get(state)));
extractorKeys.setPayloads(payloads);
}
}
return records;
}

/**
Expand Down
10 changes: 0 additions & 10 deletions cdi-core/src/main/java/com/linkedin/cdi/keys/JobKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ public class JobKeys {

private JsonObject sessionKeyField = new JsonObject();
private String totalCountField = StringUtils.EMPTY;
private JsonArray sourceParameters = new JsonArray();
private Map<ParameterTypes, String> paginationFields = new HashMap<>();
private Map<ParameterTypes, Long> paginationInitValues = new HashMap<>();
private long sessionTimeout;
Expand All @@ -90,7 +89,6 @@ public void initialize(State state) {
parsePaginationInitialValues(state);
setSessionKeyField(MSTAGE_SESSION_KEY_FIELD.get(state));
setTotalCountField(MSTAGE_TOTAL_COUNT_FIELD.get(state));
setSourceParameters(MSTAGE_PARAMETERS.get(state));
setSourceUri(MSTAGE_SOURCE_URI.get(state));
setDefaultFieldTypes(parseDefaultFieldTypes(state));
setDerivedFields(parseDerivedFields(state));
Expand Down Expand Up @@ -578,14 +576,6 @@ public void setTotalCountField(String totalCountField) {
this.totalCountField = totalCountField;
}

public JsonArray getSourceParameters() {
return sourceParameters;
}

public void setSourceParameters(JsonArray sourceParameters) {
this.sourceParameters = sourceParameters;
}

public Map<ParameterTypes, String> getPaginationFields() {
return paginationFields;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import static com.linkedin.cdi.configuration.PropertyCollection.*;
import static com.linkedin.cdi.configuration.StaticConstants.*;
import static com.linkedin.cdi.util.WatermarkDefinition.WatermarkTypes.*;


/**
Expand Down Expand Up @@ -119,7 +120,6 @@ public List<WorkUnit> getWorkunits(SourceState state) {
// Parse watermark settings if defined
List<WatermarkDefinition> definedWatermarks = Lists.newArrayList();
for (JsonElement definitionJson : jobKeys.getWatermarkDefinition()) {
Assert.assertTrue(definitionJson.isJsonObject());
definedWatermarks.add(new WatermarkDefinition(
definitionJson.getAsJsonObject(), jobKeys.getIsPartialPartition(),
jobKeys.getWorkUnitPartitionType()));
Expand All @@ -130,7 +130,10 @@ public List<WorkUnit> getWorkunits(SourceState state) {
JsonArray activations = secondaryInputs.computeIfAbsent(KEY_WORD_ACTIVATION, x -> new JsonArray());
JsonArray payloads = secondaryInputs.computeIfAbsent(KEY_WORD_PAYLOAD, x -> new JsonArray());

if (activations.size() == 0 && payloads.size() != 0) {
// create a dummy activation if there is no activation secondary input nor defined unit watermark
if (activations.size() == 0
&& definedWatermarks.stream().noneMatch(x -> x.getType().equals(UNIT))
&& payloads.size() != 0) {
JsonObject simpleActivation = new JsonObject();
activations.add(simpleActivation);
}
Expand Down Expand Up @@ -212,7 +215,7 @@ List<WorkUnit> generateWorkUnits(List<WatermarkDefinition> definitions, Map<Stri
}
datetimeWatermark = wmd;
}
if (wmd.getType() == WatermarkDefinition.WatermarkTypes.UNIT) {
if (wmd.getType() == UNIT) {
if (unitWatermark != null) {
throw new RuntimeException("1 and only unit type watermark is allowed"
+ ", including the unit watermark generated from secondary input.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public WatermarkDefinition setUnits(String name, String commaSeparatedUnits) {
List<String> units = Lists.newArrayList(commaSeparatedUnits.split(StringUtils.COMMA_STR));
for (String unit: units) {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty(name, unit);
jsonObject.addProperty(name, unit.trim());
unitArray.add(jsonObject);
}
this.setUnits(unitArray.toString());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2021 LinkedIn Corporation. All rights reserved.
// Licensed under the BSD-2 Clause license.
// See LICENSE in the project root for license information.

package com.linkedin.cdi.configuration;

import com.google.gson.JsonArray;
import com.linkedin.cdi.extractor.MultistageExtractor;
import com.linkedin.cdi.keys.JobKeys;
import com.linkedin.cdi.source.HdfsSource;
import gobblin.runtime.JobState;
import java.util.List;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.testng.Assert;
import org.testng.annotations.Test;

import static com.linkedin.cdi.configuration.PropertyCollection.*;
import static com.linkedin.cdi.configuration.StaticConstants.*;


public class SecondaryInputPropertiesTest {
@Test
public void testVariableInPayloadPath() {
SourceState sourceState = new SourceState();
sourceState.setProp("extract.table.name", "xxx");
sourceState.setProp("ms.secondary.input", "[{\"path\": \"{{customerId}}\", \"fields\": [\"dummy\"], \"category\": \"payload\"}]");
sourceState.setProp("ms.source.uri", "/data/test?RE=.*");
sourceState.setProp("ms.watermark", "[{\"name\":\"customerId\",\"type\":\"unit\",\"units\":\"dir1, dir2, dir3\"}]");
sourceState.setProp(MSTAGE_CONNECTION_CLIENT_FACTORY.getConfig(),
"com.linkedin.cdi.factory.DefaultConnectionClientFactory");

Assert.assertTrue(new JobKeys().validate(sourceState));

HdfsSource source = new HdfsSource();
List<WorkUnit> wus = source.getWorkunits(sourceState);

// check 1st work unit
WorkUnitState wuState = new WorkUnitState(wus.get(0), new JobState());
wuState.setProp("ms.extractor.class", "com.linkedin.cdi.extractor.CsvExtractor");
MultistageExtractor extractor = (MultistageExtractor) source.getExtractor(wuState);
Assert.assertNotNull(extractor);
JsonArray payloads = extractor.getExtractorKeys().getPayloads();
Assert.assertEquals(payloads.size(), 1);
Assert.assertEquals(payloads.get(0).getAsJsonObject().get(KEY_WORD_PATH).getAsString(), "dir1");

// check 2nd work unit
wuState = new WorkUnitState(wus.get(1), new JobState());
wuState.setProp("ms.extractor.class", "com.linkedin.cdi.extractor.CsvExtractor");
extractor = (MultistageExtractor) source.getExtractor(wuState);
Assert.assertNotNull(extractor);
payloads = extractor.getExtractorKeys().getPayloads();
Assert.assertEquals(payloads.size(), 1);
Assert.assertEquals(payloads.get(0).getAsJsonObject().get(KEY_WORD_PATH).getAsString(), "dir2");

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public void testGetNext() throws RetriableAuthenticationException {
JobKeys jobKeys = Mockito.mock(JobKeys.class);
when(jobKeys.getCallInterval()).thenReturn(1L);
conn.setJobKeys(jobKeys);
when(jobKeys.getSourceParameters()).thenReturn(new JsonArray());
when(jobKeys.getCallInterval()).thenThrow(Mockito.mock(IllegalArgumentException.class));
conn.executeNext(workUnitStatus);
Assert.assertEquals(conn.executeNext(workUnitStatus), workUnitStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,6 @@ public void testMSDataField1() throws Exception {
realHttpSource.getWorkunits(sourceState);
avroExtractor.jobKeys = jobKeys;
avroExtractor.setAvroExtractorKeys(new AvroExtractorKeys());
when(jobKeys.getSourceParameters()).thenReturn(realHttpSource.getJobKeys().getSourceParameters());
when(jobKeys.getDataField()).thenReturn("results");
when(multistageConnection.executeFirst(avroExtractor.workUnitStatus)).thenReturn(status);

Expand Down Expand Up @@ -433,7 +432,6 @@ public void testMSDataField2() throws Exception {
realHttpSource.getWorkunits(sourceState);
avroExtractor.jobKeys = jobKeys;
avroExtractor.setAvroExtractorKeys(new AvroExtractorKeys());
when(jobKeys.getSourceParameters()).thenReturn(realHttpSource.getJobKeys().getSourceParameters());
when(jobKeys.getDataField()).thenReturn("results");
when(multistageConnection.executeFirst(avroExtractor.workUnitStatus)).thenReturn(status);

Expand Down Expand Up @@ -517,7 +515,6 @@ public void testMSDataField3() throws Exception {
realHttpSource.getWorkunits(sourceState);
avroExtractor.jobKeys = jobKeys;
avroExtractor.setAvroExtractorKeys(new AvroExtractorKeys());
when(jobKeys.getSourceParameters()).thenReturn(realHttpSource.getJobKeys().getSourceParameters());
when(jobKeys.getDataField()).thenReturn("results.0.wrapper.field1");
when(multistageConnection.executeFirst(avroExtractor.workUnitStatus)).thenReturn(status);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ public void setUp() throws RetriableAuthenticationException {
when(state.getWorkunit()).thenReturn(workUnit);
workUnit.setProp(DATASET_URN.getConfig(), DATA_SET_URN_KEY);
when(source.getJobKeys()).thenReturn(jobKeys);
when(jobKeys.getSourceParameters()).thenReturn(new JsonArray());
when(jobKeys.getPaginationInitValues()).thenReturn(new HashMap<>());
when(jobKeys.getSchemaCleansingPattern()).thenReturn("(\\s|\\$|@)");
when(jobKeys.getSchemaCleansingReplacement()).thenReturn("_");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,48 +4,30 @@

package com.linkedin.cdi.extractor;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonNull;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import com.linkedin.cdi.configuration.MultistageProperties;
import com.linkedin.cdi.connection.MultistageConnection;
import com.linkedin.cdi.exception.RetriableAuthenticationException;
import com.linkedin.cdi.keys.JobKeys;
import com.linkedin.cdi.keys.JsonExtractorKeys;
import com.linkedin.cdi.source.HttpSource;
import com.linkedin.cdi.source.MultistageSource;
import com.linkedin.cdi.util.JsonUtils;
import com.linkedin.cdi.util.ParameterTypes;
import com.linkedin.cdi.util.SchemaBuilder;
import com.linkedin.cdi.util.WorkUnitStatus;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.powermock.reflect.Whitebox;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static com.linkedin.cdi.configuration.PropertyCollection.*;

import static com.linkedin.cdi.configuration.MultistageProperties.*;
import static com.linkedin.cdi.configuration.PropertyCollection.*;
import static org.mockito.Mockito.*;


Expand Down Expand Up @@ -87,7 +69,6 @@ public void setUp() throws RetriableAuthenticationException {
when(state.getWorkunit()).thenReturn(workUnit);
workUnit.setProp(DATASET_URN.getConfig(), DATA_SET_URN_KEY);
when(source.getJobKeys()).thenReturn(jobKeys);
when(jobKeys.getSourceParameters()).thenReturn(new JsonArray());
when(jobKeys.getPaginationInitValues()).thenReturn(new HashMap<>());
when(jobKeys.getSchemaCleansingPattern()).thenReturn("(\\s|\\$|@)");
when(jobKeys.getSchemaCleansingReplacement()).thenReturn("_");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,8 @@ public void testSourceParameters(){
sourceState.setProp(MSTAGE_OUTPUT_SCHEMA.getConfig(), "");
MultistageSource source = new MultistageSource();
source.getWorkunits(sourceState);
Assert.assertNotNull(source.getJobKeys().getSourceParameters());

sourceState.setProp("ms.parameters", "[{\"name\":\"cursor\",\"type\":\"session\"}]");
source.getWorkunits(sourceState);
Assert.assertNotNull(source.getJobKeys().getSourceParameters());
}

@Test
Expand Down
12 changes: 9 additions & 3 deletions docs/concepts/work-unit.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
# Work Unit

"Time" watermarks can generate partitions, and "unit" watermarks have units.
A time partition and an activation unit make a work unit. DIL
maintains execution states, including watermarks, for each work unit.

Time watermark and unit watermark together creates work units, and DIL
maintains execution state including watermarks for each work unit.
A "time" watermark can generate partitions. Time partitions are defined by
[ms.watermark](../parameters/ms.watermark.md) and
[ms.work.unit.partition](../parameters/ms.work.unit.partition.md).

Activation units can have 2 sources:
- units of a unit watermark defined in [ms.watermark](../parameters/ms.watermark.md)
- activation entries of a secondary input from [ms.secondary.input](../parameters/ms.secondary.input.md)

Partitions and Units make a matrix. Assuming we have m periods and n units,
the matrix will be n x m. **Each combination of partitions and units makes
Expand Down
Loading

0 comments on commit fd53653

Please sign in to comment.