From 8ba95cb32861a88827cd19f3a24f4ceb5bce2843 Mon Sep 17 00:00:00 2001 From: Frank Rodriguez Date: Mon, 19 Sep 2022 12:05:17 -0400 Subject: [PATCH] Extract Session Key code into JobKeys This previous commit added initial session key value code into DIL MultistageExtractor. This PR abstracts that logic into the JobKeys class. --- .../cdi/extractor/MultistageExtractor.java | 10 +++------- .../main/java/com/linkedin/cdi/keys/JobKeys.java | 14 ++++++++++++++ .../linkedin/cdi/extractor/AvroExtractorTest.java | 1 + .../linkedin/cdi/extractor/JsonExtractorTest.java | 2 ++ .../linkedin/cdi/extractor/TextExtractorTest.java | 2 ++ .../java/com/linkedin/cdi/keys/JobKeysTest.java | 1 + 6 files changed, 23 insertions(+), 7 deletions(-) diff --git a/cdi-core/src/main/java/com/linkedin/cdi/extractor/MultistageExtractor.java b/cdi-core/src/main/java/com/linkedin/cdi/extractor/MultistageExtractor.java index 08c84ce..8f1799d 100644 --- a/cdi-core/src/main/java/com/linkedin/cdi/extractor/MultistageExtractor.java +++ b/cdi-core/src/main/java/com/linkedin/cdi/extractor/MultistageExtractor.java @@ -38,7 +38,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; +import java.util.Optional; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.annotation.Nullable; @@ -852,12 +852,8 @@ private JsonObject getInitialWorkUnitVariableValues() { variableValues.addProperty(entry.getKey().toString(), entry.getValue()); } - JsonObject sessionKeyField = jobKeys.getSessionKeyField(); - if (Objects.nonNull(sessionKeyField)) { - if (sessionKeyField.has("initValue")) { - variableValues.addProperty(ParameterTypes.SESSION.toString(), sessionKeyField.get("initValue").toString()); - } - } + Optional initialSessionValue = jobKeys.getSessionInitialValue(); + initialSessionValue.ifPresent(sessionValue -> variableValues.addProperty(ParameterTypes.SESSION.toString(), sessionValue)); return variableValues; } diff --git a/cdi-core/src/main/java/com/linkedin/cdi/keys/JobKeys.java b/cdi-core/src/main/java/com/linkedin/cdi/keys/JobKeys.java index 21f1e1b..ea54e72 100644 --- a/cdi-core/src/main/java/com/linkedin/cdi/keys/JobKeys.java +++ b/cdi-core/src/main/java/com/linkedin/cdi/keys/JobKeys.java @@ -21,6 +21,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Optional; import java.util.Set; import org.apache.commons.lang3.StringUtils; import org.apache.gobblin.configuration.State; @@ -173,6 +175,18 @@ public String getSessionStateFailCondition() { return retValue; } + /** + * Return the optional initial session value if provided, otherwise return empty optional. + */ + public Optional getSessionInitialValue() { + if (Objects.nonNull(sessionKeyField)) { + if (sessionKeyField.has("initValue")) { + return Optional.of(sessionKeyField.get("initValue").toString()); + } + } + return Optional.empty(); + } + public boolean hasSourceSchema() { return sourceSchema.size() > 0; } diff --git a/cdi-core/src/test/java/com/linkedin/cdi/extractor/AvroExtractorTest.java b/cdi-core/src/test/java/com/linkedin/cdi/extractor/AvroExtractorTest.java index 8e7fd72..1f33575 100644 --- a/cdi-core/src/test/java/com/linkedin/cdi/extractor/AvroExtractorTest.java +++ b/cdi-core/src/test/java/com/linkedin/cdi/extractor/AvroExtractorTest.java @@ -112,6 +112,7 @@ public void setUp() throws RetriableAuthenticationException { // mock for source keys when(jobKeys.getOutputSchema()).thenReturn(outputJsonSchema); when(jobKeys.getDerivedFields()).thenReturn(new HashMap<>()); + when(jobKeys.getSessionInitialValue()).thenReturn(java.util.Optional.empty()); avroExtractor = new AvroExtractor(state, multiStageSource.getJobKeys()); avroExtractor.setAvroExtractorKeys(avroExtractorKeys); diff --git a/cdi-core/src/test/java/com/linkedin/cdi/extractor/JsonExtractorTest.java b/cdi-core/src/test/java/com/linkedin/cdi/extractor/JsonExtractorTest.java index 6d861ce..d2fd509 100644 --- a/cdi-core/src/test/java/com/linkedin/cdi/extractor/JsonExtractorTest.java +++ b/cdi-core/src/test/java/com/linkedin/cdi/extractor/JsonExtractorTest.java @@ -27,6 +27,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import org.apache.commons.lang.StringUtils; import org.apache.gobblin.configuration.SourceState; import org.apache.gobblin.configuration.WorkUnitState; @@ -84,6 +85,7 @@ public void setUp() throws RetriableAuthenticationException { workUnit.setProp(DATASET_URN.getConfig(), DATA_SET_URN_KEY); when(source.getJobKeys()).thenReturn(jobKeys); when(jobKeys.getPaginationInitValues()).thenReturn(new HashMap<>()); + when(jobKeys.getSessionInitialValue()).thenReturn(Optional.empty()); when(jobKeys.getSchemaCleansingPattern()).thenReturn("(\\s|\\$|@)"); when(jobKeys.getSchemaCleansingReplacement()).thenReturn("_"); when(jobKeys.getSchemaCleansingNullable()).thenReturn(false); diff --git a/cdi-core/src/test/java/com/linkedin/cdi/extractor/TextExtractorTest.java b/cdi-core/src/test/java/com/linkedin/cdi/extractor/TextExtractorTest.java index 2286638..1ba3301 100644 --- a/cdi-core/src/test/java/com/linkedin/cdi/extractor/TextExtractorTest.java +++ b/cdi-core/src/test/java/com/linkedin/cdi/extractor/TextExtractorTest.java @@ -18,6 +18,7 @@ import java.io.InputStream; import java.util.HashMap; import java.util.List; +import java.util.Optional; import org.apache.commons.lang.StringUtils; import org.apache.gobblin.configuration.SourceState; import org.apache.gobblin.configuration.WorkUnitState; @@ -73,6 +74,7 @@ public void setUp() throws RetriableAuthenticationException { when(jobKeys.getSchemaCleansingPattern()).thenReturn("(\\s|\\$|@)"); when(jobKeys.getSchemaCleansingReplacement()).thenReturn("_"); when(jobKeys.getSchemaCleansingNullable()).thenReturn(false); + when(jobKeys.getSessionInitialValue()).thenReturn(Optional.empty()); jsonExtractorKeys = Mockito.mock(JsonExtractorKeys.class); textDumpJsonExtractor = new TextExtractor(state, source.getJobKeys()); textDumpJsonExtractor.setJsonExtractorKeys(jsonExtractorKeys); diff --git a/cdi-core/src/test/java/com/linkedin/cdi/keys/JobKeysTest.java b/cdi-core/src/test/java/com/linkedin/cdi/keys/JobKeysTest.java index 3e718ed..ee6c110 100644 --- a/cdi-core/src/test/java/com/linkedin/cdi/keys/JobKeysTest.java +++ b/cdi-core/src/test/java/com/linkedin/cdi/keys/JobKeysTest.java @@ -15,6 +15,7 @@ import java.lang.reflect.Method; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import org.apache.commons.lang3.StringUtils; import org.apache.gobblin.configuration.State; import org.mockito.Mockito;