Skip to content

Commit

Permalink
Extract Session Key code into JobKeys
Browse files Browse the repository at this point in the history
This previous commit added initial session key value code into DIL MultistageExtractor. This PR abstracts that logic into the JobKeys class.
  • Loading branch information
Frank Rodriguez committed Sep 19, 2022
1 parent 4543b1b commit 8ba95cb
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -852,12 +852,8 @@ private JsonObject getInitialWorkUnitVariableValues() {
variableValues.addProperty(entry.getKey().toString(), entry.getValue());
}

JsonObject sessionKeyField = jobKeys.getSessionKeyField();
if (Objects.nonNull(sessionKeyField)) {
if (sessionKeyField.has("initValue")) {
variableValues.addProperty(ParameterTypes.SESSION.toString(), sessionKeyField.get("initValue").toString());
}
}
Optional<String> initialSessionValue = jobKeys.getSessionInitialValue();
initialSessionValue.ifPresent(sessionValue -> variableValues.addProperty(ParameterTypes.SESSION.toString(), sessionValue));

return variableValues;
}
Expand Down
14 changes: 14 additions & 0 deletions cdi-core/src/main/java/com/linkedin/cdi/keys/JobKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.configuration.State;
Expand Down Expand Up @@ -173,6 +175,18 @@ public String getSessionStateFailCondition() {
return retValue;
}

/**
* Return the optional initial session value if provided, otherwise return empty optional.
*/
public Optional<String> getSessionInitialValue() {
if (Objects.nonNull(sessionKeyField)) {
if (sessionKeyField.has("initValue")) {
return Optional.of(sessionKeyField.get("initValue").toString());
}
}
return Optional.empty();
}

public boolean hasSourceSchema() {
return sourceSchema.size() > 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public void setUp() throws RetriableAuthenticationException {
// mock for source keys
when(jobKeys.getOutputSchema()).thenReturn(outputJsonSchema);
when(jobKeys.getDerivedFields()).thenReturn(new HashMap<>());
when(jobKeys.getSessionInitialValue()).thenReturn(java.util.Optional.empty());

avroExtractor = new AvroExtractor(state, multiStageSource.getJobKeys());
avroExtractor.setAvroExtractorKeys(avroExtractorKeys);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.WorkUnitState;
Expand Down Expand Up @@ -84,6 +85,7 @@ public void setUp() throws RetriableAuthenticationException {
workUnit.setProp(DATASET_URN.getConfig(), DATA_SET_URN_KEY);
when(source.getJobKeys()).thenReturn(jobKeys);
when(jobKeys.getPaginationInitValues()).thenReturn(new HashMap<>());
when(jobKeys.getSessionInitialValue()).thenReturn(Optional.empty());
when(jobKeys.getSchemaCleansingPattern()).thenReturn("(\\s|\\$|@)");
when(jobKeys.getSchemaCleansingReplacement()).thenReturn("_");
when(jobKeys.getSchemaCleansingNullable()).thenReturn(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.WorkUnitState;
Expand Down Expand Up @@ -73,6 +74,7 @@ public void setUp() throws RetriableAuthenticationException {
when(jobKeys.getSchemaCleansingPattern()).thenReturn("(\\s|\\$|@)");
when(jobKeys.getSchemaCleansingReplacement()).thenReturn("_");
when(jobKeys.getSchemaCleansingNullable()).thenReturn(false);
when(jobKeys.getSessionInitialValue()).thenReturn(Optional.empty());
jsonExtractorKeys = Mockito.mock(JsonExtractorKeys.class);
textDumpJsonExtractor = new TextExtractor(state, source.getJobKeys());
textDumpJsonExtractor.setJsonExtractorKeys(jsonExtractorKeys);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.configuration.State;
import org.mockito.Mockito;
Expand Down

0 comments on commit 8ba95cb

Please sign in to comment.