Skip to content

Commit

Permalink
Merge pull request #76 from frodr33/master
Browse files Browse the repository at this point in the history
Extract Session Key code into JobKeys
  • Loading branch information
booddu authored Sep 19, 2022
2 parents 4543b1b + 8ba95cb commit 6177daf
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -852,12 +852,8 @@ private JsonObject getInitialWorkUnitVariableValues() {
variableValues.addProperty(entry.getKey().toString(), entry.getValue());
}

JsonObject sessionKeyField = jobKeys.getSessionKeyField();
if (Objects.nonNull(sessionKeyField)) {
if (sessionKeyField.has("initValue")) {
variableValues.addProperty(ParameterTypes.SESSION.toString(), sessionKeyField.get("initValue").toString());
}
}
Optional<String> initialSessionValue = jobKeys.getSessionInitialValue();
initialSessionValue.ifPresent(sessionValue -> variableValues.addProperty(ParameterTypes.SESSION.toString(), sessionValue));

return variableValues;
}
Expand Down
14 changes: 14 additions & 0 deletions cdi-core/src/main/java/com/linkedin/cdi/keys/JobKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.configuration.State;
Expand Down Expand Up @@ -173,6 +175,18 @@ public String getSessionStateFailCondition() {
return retValue;
}

/**
* Return the optional initial session value if provided, otherwise return empty optional.
*/
public Optional<String> getSessionInitialValue() {
if (Objects.nonNull(sessionKeyField)) {
if (sessionKeyField.has("initValue")) {
return Optional.of(sessionKeyField.get("initValue").toString());
}
}
return Optional.empty();
}

public boolean hasSourceSchema() {
return sourceSchema.size() > 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public void setUp() throws RetriableAuthenticationException {
// mock for source keys
when(jobKeys.getOutputSchema()).thenReturn(outputJsonSchema);
when(jobKeys.getDerivedFields()).thenReturn(new HashMap<>());
when(jobKeys.getSessionInitialValue()).thenReturn(java.util.Optional.empty());

avroExtractor = new AvroExtractor(state, multiStageSource.getJobKeys());
avroExtractor.setAvroExtractorKeys(avroExtractorKeys);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.WorkUnitState;
Expand Down Expand Up @@ -84,6 +85,7 @@ public void setUp() throws RetriableAuthenticationException {
workUnit.setProp(DATASET_URN.getConfig(), DATA_SET_URN_KEY);
when(source.getJobKeys()).thenReturn(jobKeys);
when(jobKeys.getPaginationInitValues()).thenReturn(new HashMap<>());
when(jobKeys.getSessionInitialValue()).thenReturn(Optional.empty());
when(jobKeys.getSchemaCleansingPattern()).thenReturn("(\\s|\\$|@)");
when(jobKeys.getSchemaCleansingReplacement()).thenReturn("_");
when(jobKeys.getSchemaCleansingNullable()).thenReturn(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.WorkUnitState;
Expand Down Expand Up @@ -73,6 +74,7 @@ public void setUp() throws RetriableAuthenticationException {
when(jobKeys.getSchemaCleansingPattern()).thenReturn("(\\s|\\$|@)");
when(jobKeys.getSchemaCleansingReplacement()).thenReturn("_");
when(jobKeys.getSchemaCleansingNullable()).thenReturn(false);
when(jobKeys.getSessionInitialValue()).thenReturn(Optional.empty());
jsonExtractorKeys = Mockito.mock(JsonExtractorKeys.class);
textDumpJsonExtractor = new TextExtractor(state, source.getJobKeys());
textDumpJsonExtractor.setJsonExtractorKeys(jsonExtractorKeys);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.configuration.State;
import org.mockito.Mockito;
Expand Down

0 comments on commit 6177daf

Please sign in to comment.