diff --git a/cdi-core/src/main/java/com/linkedin/cdi/configuration/CsvProperties.java b/cdi-core/src/main/java/com/linkedin/cdi/configuration/CsvProperties.java index 6ff075e..f05ae3c 100644 --- a/cdi-core/src/main/java/com/linkedin/cdi/configuration/CsvProperties.java +++ b/cdi-core/src/main/java/com/linkedin/cdi/configuration/CsvProperties.java @@ -7,11 +7,11 @@ import com.google.common.collect.Lists; import com.google.gson.JsonObject; import com.linkedin.cdi.util.CsvUtils; +import java.util.ArrayList; import java.util.List; import org.apache.commons.lang3.StringUtils; import org.apache.gobblin.configuration.State; -import static com.linkedin.cdi.configuration.PropertyCollection.*; import static com.linkedin.cdi.configuration.StaticConstants.*; @@ -59,6 +59,10 @@ public boolean isValid(State state) { if (columnProjections.trim().isEmpty()) { return false; } + + if (expandColumnProjection(columnProjections).size() == 0) { + return false; + } } State tmpState = new State(); @@ -160,12 +164,12 @@ public Integer getColumnHeaderIndex(State state) { return -1; } - public String getColumnProjection(State state) { + public List getColumnProjection(State state) { JsonObject value = get(state); if (value.has(COLUMN_PROJECTION)) { - return value.get(COLUMN_PROJECTION).getAsString(); + return expandColumnProjection(value.get(COLUMN_PROJECTION).getAsString()); } - return StringUtils.EMPTY; + return new ArrayList<>(); } public Long getMaxFailures(State state) { @@ -183,4 +187,46 @@ public Boolean getKeepNullString(State state) { } return false; } + + /** + * Expand a column projection string into a list of indices + * @param columnProjection columns to project + * @return a list of column indices + */ + private List expandColumnProjection(String columnProjection) { + List expandedColumnProjection = new ArrayList<>(); + if (StringUtils.isNotBlank(columnProjection)) { + for (String val : columnProjection.split(",")) { + if (val.matches("^(\\d+)-(\\d+)$")) { // range + try { + int left = Integer.parseInt(val.split("-")[0]); + int right = Integer.parseInt(val.split("-")[1]); + if (left < 0 || right < 0 || left >= right) { + return Lists.newArrayList(); + } else { + for (int i = left; i <= right; i++) { + expandedColumnProjection.add(i); + } + } + } catch (Exception e) { + return Lists.newArrayList(); + } + } else if (val.matches("^\\d+$")) { // single number + try { + int col = Integer.parseInt(val); + if (col < 0) { + return Lists.newArrayList(); + } else { + expandedColumnProjection.add(col); + } + } catch (Exception e) { + return Lists.newArrayList(); + } + } else { // unknown patterns + return Lists.newArrayList(); + } + } + } + return expandedColumnProjection; + } } diff --git a/cdi-core/src/main/java/com/linkedin/cdi/extractor/AvroExtractor.java b/cdi-core/src/main/java/com/linkedin/cdi/extractor/AvroExtractor.java index 7226264..8fb4dd0 100644 --- a/cdi-core/src/main/java/com/linkedin/cdi/extractor/AvroExtractor.java +++ b/cdi-core/src/main/java/com/linkedin/cdi/extractor/AvroExtractor.java @@ -191,7 +191,8 @@ protected boolean processInputStream(long starting) { List schemaColumns = new ArrayList<>(new JsonIntermediateSchema(jobKeys.getOutputSchema()) .getColumns().keySet()); List fieldNames = AvroSchemaUtils.getSchemaFieldNames(avroExtractorKeys.getAvroOutputSchema()); - avroExtractorKeys.setIsValidOutputSchema(SchemaUtils.isValidOutputSchema(schemaColumns, fieldNames)); + avroExtractorKeys.setIsValidOutputSchema( + SchemaUtils.isValidSchemaDefinition(schemaColumns, fieldNames, jobKeys.getDerivedFields().size())); } } catch (Exception e) { LOG.error("Source Error: {}", e.getMessage()); diff --git a/cdi-core/src/main/java/com/linkedin/cdi/extractor/CsvExtractor.java b/cdi-core/src/main/java/com/linkedin/cdi/extractor/CsvExtractor.java index edfa17b..059e890 100644 --- a/cdi-core/src/main/java/com/linkedin/cdi/extractor/CsvExtractor.java +++ b/cdi-core/src/main/java/com/linkedin/cdi/extractor/CsvExtractor.java @@ -85,8 +85,7 @@ protected void initialize(ExtractorKeys keys) { // check if user has defined the output schema if (jobKeys.hasOutputSchema()) { JsonArray outputSchema = jobKeys.getOutputSchema(); - csvExtractorKeys.setColumnProjection(expandColumnProjection(MSTAGE_CSV.getColumnProjection(state), - outputSchema.size())); + csvExtractorKeys.setColumnProjection(MSTAGE_CSV.getColumnProjection(state)); // initialize the column name to index map based on the schema when derived fields are present if (jobKeys.getDerivedFields().entrySet().size() > 0) { buildColumnToIndexMap(outputSchema); @@ -113,14 +112,7 @@ protected void setCsvExtractorKeys(CsvExtractorKeys csvExtractorKeys) { */ @Override public String getSchema() { - LOG.debug("Retrieving schema definition"); - JsonArray schemaArray = super.getOrInferSchema(); - Assert.assertNotNull(schemaArray); - if (jobKeys.getDerivedFields().size() > 0 && JsonUtils.get(StaticConstants.KEY_WORD_COLUMN_NAME, - jobKeys.getDerivedFields().keySet().iterator().next(), StaticConstants.KEY_WORD_COLUMN_NAME, schemaArray) == JsonNull.INSTANCE) { - schemaArray.addAll(addDerivedFieldsToAltSchema()); - } - return schemaArray.toString(); + return getSchemaArray().toString(); } /** @@ -262,51 +254,9 @@ protected boolean processInputStream(long starting) { protected void setRowFilter(JsonArray schemaArray) { if (rowFilter == null) { if (MSTAGE_ENABLE_SCHEMA_BASED_FILTERING.get(state)) { - rowFilter = new CsvSchemaBasedFilter(new JsonIntermediateSchema(schemaArray), csvExtractorKeys); - } - } - } - - /** - * Expand a column projection input string - * @param columnProjection columns to project - * @param numColumnsInPredefinedSchema number of columns - * @return a set of column indices - */ - private Set expandColumnProjection(String columnProjection, int numColumnsInPredefinedSchema) { - Set expandedColumnProjection = new HashSet<>(); - if (columnProjection != null && columnProjection.length() > 0) { - for (String val : columnProjection.split(",")) { - if (val.matches("^(\\d+)-(\\d+)$")) { // range - int left = Integer.parseInt(val.split("-")[0]); - int right = Integer.parseInt(val.split("-")[1]); - if (left < 0 || right < 0 || left >= right) { - failWorkUnit(String.format("Invalid range in column projection input %s", val)); - break; - } else { - for (int i = left; i <= right; i++) { - expandedColumnProjection.add(i); - } - } - } else if (val.matches("^\\d+$")) { // single number - int col = Integer.parseInt(val); - if (col < 0) { - failWorkUnit(String.format("Invalid index in column projection input %s", val)); - break; - } else { - expandedColumnProjection.add(col); - } - } else { // unknown patterns - failWorkUnit(String.format("Invalid value in column projection input %s", val)); - break; - } - } - - if (expandedColumnProjection.size() != numColumnsInPredefinedSchema) { - failWorkUnit("The number of columns in column projection does not match the size of the predefined schema"); + rowFilter = new CsvSchemaBasedFilter(schemaArray, csvExtractorKeys); } } - return expandedColumnProjection; } /** @@ -391,7 +341,8 @@ private void skipRowAndSaveHeader(Iterator readerIterator) { List schemaColumns = new ArrayList<>(new JsonIntermediateSchema(jobKeys.getOutputSchema()).getColumns().keySet()); List headerRow = Arrays.asList(csvExtractorKeys.getHeaderRow()); - csvExtractorKeys.setIsValidOutputSchema(SchemaUtils.isValidOutputSchema(schemaColumns, headerRow)); + csvExtractorKeys.setIsValidOutputSchema( + SchemaUtils.isValidSchemaDefinition(schemaColumns, headerRow, jobKeys.getDerivedFields().size())); } } linesRead++; diff --git a/cdi-core/src/main/java/com/linkedin/cdi/extractor/JsonExtractor.java b/cdi-core/src/main/java/com/linkedin/cdi/extractor/JsonExtractor.java index d158f62..b76d7ef 100644 --- a/cdi-core/src/main/java/com/linkedin/cdi/extractor/JsonExtractor.java +++ b/cdi-core/src/main/java/com/linkedin/cdi/extractor/JsonExtractor.java @@ -147,14 +147,7 @@ protected void setJsonExtractorKeys(JsonExtractorKeys jsonExtractorKeys) { */ @Override public JsonArray getSchema() { - LOG.debug("Retrieving schema definition"); - JsonArray schemaArray = super.getOrInferSchema(); - Assert.assertNotNull(schemaArray); - if (jobKeys.getDerivedFields().size() > 0 && JsonUtils.get(StaticConstants.KEY_WORD_COLUMN_NAME, - jobKeys.getDerivedFields().keySet().iterator().next(), StaticConstants.KEY_WORD_COLUMN_NAME, schemaArray) == JsonNull.INSTANCE) { - schemaArray.addAll(addDerivedFieldsToAltSchema()); - } - return schemaArray; + return getSchemaArray(); } @Nullable diff --git a/cdi-core/src/main/java/com/linkedin/cdi/extractor/MultistageExtractor.java b/cdi-core/src/main/java/com/linkedin/cdi/extractor/MultistageExtractor.java index 4bd95c3..a9deb7c 100644 --- a/cdi-core/src/main/java/com/linkedin/cdi/extractor/MultistageExtractor.java +++ b/cdi-core/src/main/java/com/linkedin/cdi/extractor/MultistageExtractor.java @@ -8,7 +8,9 @@ import com.google.common.collect.ImmutableList; import com.google.gson.JsonArray; import com.google.gson.JsonElement; +import com.google.gson.JsonNull; import com.google.gson.JsonObject; +import com.linkedin.cdi.configuration.StaticConstants; import com.linkedin.cdi.connection.MultistageConnection; import com.linkedin.cdi.exception.RetriableAuthenticationException; import com.linkedin.cdi.filter.JsonSchemaBasedFilter; @@ -51,6 +53,7 @@ import org.joda.time.Period; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.Assert; import static com.linkedin.cdi.configuration.PropertyCollection.*; import static com.linkedin.cdi.configuration.StaticConstants.*; @@ -974,4 +977,23 @@ protected Object endProcessingAndValidateCount() { } return null; } + + /** + * Add derived fields to defined schema if they are not in already. + * + * In a LKG (last known good) source schema definition, the derived fields could + * have been included in the schedule definition already, hence no action. + * + * @return schema that is structured as a JsonArray with derived fields if they are not added already + */ + protected JsonArray getSchemaArray() { + LOG.debug("Retrieving schema definition"); + JsonArray schemaArray = getOrInferSchema(); + Assert.assertNotNull(schemaArray); + if (jobKeys.getDerivedFields().size() > 0 && JsonUtils.get(StaticConstants.KEY_WORD_COLUMN_NAME, + jobKeys.getDerivedFields().keySet().iterator().next(), StaticConstants.KEY_WORD_COLUMN_NAME, schemaArray) == JsonNull.INSTANCE) { + schemaArray.addAll(addDerivedFieldsToAltSchema()); + } + return schemaArray; + } } diff --git a/cdi-core/src/main/java/com/linkedin/cdi/filter/CsvSchemaBasedFilter.java b/cdi-core/src/main/java/com/linkedin/cdi/filter/CsvSchemaBasedFilter.java index 2106110..9dad07b 100644 --- a/cdi-core/src/main/java/com/linkedin/cdi/filter/CsvSchemaBasedFilter.java +++ b/cdi-core/src/main/java/com/linkedin/cdi/filter/CsvSchemaBasedFilter.java @@ -4,13 +4,18 @@ package com.linkedin.cdi.filter; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; import com.linkedin.cdi.keys.CsvExtractorKeys; import com.linkedin.cdi.util.JsonIntermediateSchema; import java.util.Arrays; -import java.util.Set; +import java.util.List; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.linkedin.cdi.configuration.StaticConstants.*; + /** * Filter CSV records by Json Intermediate schema @@ -20,31 +25,38 @@ */ public class CsvSchemaBasedFilter extends MultistageSchemaBasedFilter { private static final Logger LOG = LoggerFactory.getLogger(CsvSchemaBasedFilter.class); - private CsvExtractorKeys csvExtractorKeys; + final private CsvExtractorKeys csvExtractorKeys; + final private JsonArray schemaArray; - public CsvSchemaBasedFilter(JsonIntermediateSchema schema, CsvExtractorKeys csvExtractorKeys) { - super(schema); + public CsvSchemaBasedFilter(JsonArray schema, CsvExtractorKeys csvExtractorKeys) { + super(new JsonIntermediateSchema(schema)); this.csvExtractorKeys = csvExtractorKeys; + this.schemaArray = schema; } @Override public String[] filter(String[] input) { - Set columnProjection = csvExtractorKeys.getColumnProjection(); - if (columnProjection.size() > 0) { - // use user-defined column projection to filter - return filter(input, columnProjection); - } else if (csvExtractorKeys.getHeaderRow() != null && csvExtractorKeys.getIsValidOutputSchema()) { + List columnProjection = csvExtractorKeys.getColumnProjection(); + + if (columnProjection.isEmpty() + && csvExtractorKeys.getHeaderRow() != null + && csvExtractorKeys.getIsValidOutputSchema()) { // use the header and schema to generate column projection, then filter String[] headerRow = csvExtractorKeys.getHeaderRow(); - for (int i = 0; i < headerRow.length; i++) { - if (schema.getColumns().keySet().stream().anyMatch(headerRow[i]::equalsIgnoreCase)) { - columnProjection.add(i); + for (JsonElement column: schemaArray) { + for (int i = 0; i < headerRow.length; i++) { + if (headerRow[i].equalsIgnoreCase(column.getAsJsonObject().get(KEY_WORD_COLUMN_NAME).getAsString())) { + columnProjection.add(i); + } } } csvExtractorKeys.setColumnProjection(columnProjection); + } + + if (columnProjection.size() > 0) { return filter(input, columnProjection); } else { - LOG.debug("Defaulting to project first N columns"); + LOG.info("Defaulting to project first N columns"); // take first N column, where N is the number of columns in the schema // if the schema's size larger than input, then the extra columns will be padded with null return Arrays.copyOf(input, schema.getColumns().size()); @@ -57,19 +69,19 @@ public String[] filter(String[] input) { * @param columnProjection column projection * @return modified row */ - private String[] filter(String[] input, Set columnProjection) { - int curr = 0; - for (int i = 0; i < input.length; i++) { - if (columnProjection.contains(i)) { - swap(input, i, curr++); - } + String[] filter(String[] input, List columnProjection) { + if (columnProjection.size() == 0) { + return null; } - return Arrays.copyOf(input, curr); - } - private void swap(String[] input, int i, int j) { - String temp = input[i]; - input[i] = input[j]; - input[j] = temp; + String[] output = new String[columnProjection.size()]; + for (int i = 0; i < output.length; i++) { + if (columnProjection.get(i) < input.length) { + output[i] = input[columnProjection.get(i)]; + } else { + output[i] = StringUtils.EMPTY; + } + } + return output; } } diff --git a/cdi-core/src/main/java/com/linkedin/cdi/keys/CsvExtractorKeys.java b/cdi-core/src/main/java/com/linkedin/cdi/keys/CsvExtractorKeys.java index 3d839df..34fc7ab 100644 --- a/cdi-core/src/main/java/com/linkedin/cdi/keys/CsvExtractorKeys.java +++ b/cdi-core/src/main/java/com/linkedin/cdi/keys/CsvExtractorKeys.java @@ -4,11 +4,13 @@ package com.linkedin.cdi.keys; +import com.beust.jcommander.internal.Lists; import java.util.ArrayDeque; import java.util.Deque; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; import org.apache.commons.lang3.StringUtils; @@ -35,7 +37,7 @@ public class CsvExtractorKeys extends ExtractorKeys { // This is necessary as the input stream can only be read once private Deque sampleRows = new ArrayDeque<>(); private String[] headerRow; - private Set columnProjection = new HashSet<>(); + private List columnProjection = Lists.newArrayList(); private Boolean isValidOutputSchema = true; private String defaultFieldType = StringUtils.EMPTY; @@ -99,11 +101,11 @@ public void setHeaderRow(String[] headerRow) { this.headerRow = headerRow; } - public Set getColumnProjection() { + public List getColumnProjection() { return columnProjection; } - public void setColumnProjection(Set columnProjection) { + public void setColumnProjection(List columnProjection) { this.columnProjection = columnProjection; } diff --git a/cdi-core/src/main/java/com/linkedin/cdi/util/SchemaUtils.java b/cdi-core/src/main/java/com/linkedin/cdi/util/SchemaUtils.java index ca35f03..ab5446c 100644 --- a/cdi-core/src/main/java/com/linkedin/cdi/util/SchemaUtils.java +++ b/cdi-core/src/main/java/com/linkedin/cdi/util/SchemaUtils.java @@ -7,7 +7,9 @@ import com.google.common.collect.Lists; import com.google.gson.JsonArray; import com.google.gson.JsonNull; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,34 +24,38 @@ private SchemaUtils() { } /** - * A schema is valid when all its valid schema are present in source and in the same order. - * Column names' matching is case insensitive. - * @param schemaColumns column names defined in the output schema + * A schema definition is valid when all defined columns are present in the source. + * - To determine existence, column names are case insensitive. + * - The order of columns can be different. + * - Defined columns can contain extra ones to allow derived fields. + * + * @param definedColumns column names defined in the output schema * @param sourceColumns column names at the source - * @return true if all columns are matching and false other wise + * @param derivedFields the number of derived fields + * @return true if first N columns are all existing in source and false other wise * * - * Example 1: schemaColumns: [A, c], sourceColumns: [a, B, C] ==> true - * Example 2: schemaColumns: [A, e], sourceColumns: [a, B, C] ==> false + * Example 1: definedColumns: [A, c], sourceColumns: [a, B, C] ==> true, B in source will be ignored in projection + * Example 2: definedColumns: [A, e], sourceColumns: [a, B, C] ==> false + * Example 3: definedColumns: [A, B, C], sourceColumns: [A, B] ==> true, C is assumed to be a derived field * */ - public static boolean isValidOutputSchema(List schemaColumns, List sourceColumns) { - int i = 0; - int j = 0; - while (i < sourceColumns.size() && j < schemaColumns.size()) { - if (sourceColumns.get(i).equalsIgnoreCase(schemaColumns.get(j))) { - j++; + public static boolean isValidSchemaDefinition( + List definedColumns, + List sourceColumns, + int derivedFields) { + Set columns = new HashSet<>(); + sourceColumns.forEach(x -> columns.add(x.toLowerCase())); + + for (int i = 0; i < definedColumns.size() - derivedFields; i++) { + if (!columns.contains(definedColumns.get(i).toLowerCase())) { + LOG.error("Defined Schema does not match source."); + LOG.error("Schema column: {}", definedColumns); + LOG.error("Source columns: {}", sourceColumns); + return false; } - i++; - } - boolean isValidSchema = j == schemaColumns.size(); - if (!isValidSchema) { - LOG.error( - "Schema columns and source columns do not match: " + "undefined columns in schema or column order mismatch"); - LOG.debug("Schema column: {}", schemaColumns); - LOG.debug("Source columns: {}", sourceColumns); } - return isValidSchema; + return true; } /** diff --git a/cdi-core/src/test/java/com/linkedin/cdi/configuration/MultistagePropertiesIndividualTest.java b/cdi-core/src/test/java/com/linkedin/cdi/configuration/MultistagePropertiesIndividualTest.java index a5f371f..29c5aa3 100644 --- a/cdi-core/src/test/java/com/linkedin/cdi/configuration/MultistagePropertiesIndividualTest.java +++ b/cdi-core/src/test/java/com/linkedin/cdi/configuration/MultistagePropertiesIndividualTest.java @@ -98,7 +98,7 @@ public void testCsv() { csv.addProperty("defaultFieldType", "xxx"); csv.addProperty("fieldSeparator", "u0003"); csv.addProperty("recordSeparator", "u0003"); - csv.addProperty("columnProjection", "xxx"); + csv.addProperty("columnProjection", "1,2,3"); csv.addProperty("maxFailures", 1); csv.addProperty("keepNullString", true); state.setProp("ms.csv", csv.toString()); @@ -124,6 +124,11 @@ public void testCsv() { state.setProp("ms.csv", csv.toString()); Assert.assertFalse(MSTAGE_CSV.isValid(state)); + // column projection has to be numbers or ranges of numbers + csv = new JsonObject(); + csv.addProperty("columnProjection", "x,y,z"); + state.setProp("ms.csv", csv.toString()); + Assert.assertFalse(MSTAGE_CSV.isValid(state)); } @Test diff --git a/cdi-core/src/test/java/com/linkedin/cdi/filter/CsvSchemaBasedFilterTest.java b/cdi-core/src/test/java/com/linkedin/cdi/filter/CsvSchemaBasedFilterTest.java new file mode 100644 index 0000000..5f33334 --- /dev/null +++ b/cdi-core/src/test/java/com/linkedin/cdi/filter/CsvSchemaBasedFilterTest.java @@ -0,0 +1,42 @@ +// Copyright 2021 LinkedIn Corporation. All rights reserved. +// Licensed under the BSD-2 Clause license. +// See LICENSE in the project root for license information. + +package com.linkedin.cdi.filter; + +import com.google.common.collect.Lists; +import com.google.gson.JsonArray; +import com.linkedin.cdi.keys.CsvExtractorKeys; +import java.util.List; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class CsvSchemaBasedFilterTest { + @Test + public void testFilter() { + String[] input = "AA,BB,CC".split(","); + List columnProjection = Lists.newArrayList(0, 2, 1); + String[] output = new CsvSchemaBasedFilter(new JsonArray(), + new CsvExtractorKeys()).filter(input, columnProjection); + Assert.assertEquals(output[0], input[0]); + Assert.assertEquals(output[1], input[2]); + Assert.assertEquals(output[2], input[1]); + + columnProjection = Lists.newArrayList(0, 2, 1, 0); + output = new CsvSchemaBasedFilter(new JsonArray(), + new CsvExtractorKeys()).filter(input, columnProjection); + Assert.assertEquals(output[0], input[0]); + Assert.assertEquals(output[1], input[2]); + Assert.assertEquals(output[2], input[1]); + Assert.assertEquals(output[3], input[0]); + + columnProjection = Lists.newArrayList(0, 1, 2, 3); + output = new CsvSchemaBasedFilter(new JsonArray(), + new CsvExtractorKeys()).filter(input, columnProjection); + Assert.assertTrue(output[3].isEmpty()); + + Assert.assertNull(new CsvSchemaBasedFilter(new JsonArray(), + new CsvExtractorKeys()).filter(input, Lists.newArrayList())); + } +} diff --git a/cdi-core/src/test/java/com/linkedin/cdi/util/SchemaUtilsTest.java b/cdi-core/src/test/java/com/linkedin/cdi/util/SchemaUtilsTest.java index a252a93..b84d0b1 100644 --- a/cdi-core/src/test/java/com/linkedin/cdi/util/SchemaUtilsTest.java +++ b/cdi-core/src/test/java/com/linkedin/cdi/util/SchemaUtilsTest.java @@ -18,25 +18,30 @@ public class SchemaUtilsTest { @Test public void testIsValidOutputSchema() { - // valid schema + // valid schema, subset same order List schemaColumns = Arrays.asList("a", "b"); List sourceColumns = Arrays.asList("a", "B", "C"); - Assert.assertTrue(SchemaUtils.isValidOutputSchema(schemaColumns, sourceColumns)); + Assert.assertTrue(SchemaUtils.isValidSchemaDefinition(schemaColumns, sourceColumns, 0)); - // valid schema + // valid schema, subset with derived fields + schemaColumns = Arrays.asList("a", "b", "x"); + sourceColumns = Arrays.asList("a", "b"); + Assert.assertTrue(SchemaUtils.isValidSchemaDefinition(schemaColumns, sourceColumns, 1)); + + // valid schema, subset with skipped columns schemaColumns = Arrays.asList("a", "c"); sourceColumns = Arrays.asList("a", "B", "C"); - Assert.assertTrue(SchemaUtils.isValidOutputSchema(schemaColumns, sourceColumns)); + Assert.assertTrue(SchemaUtils.isValidSchemaDefinition(schemaColumns, sourceColumns, 0)); - // some columns in the schema is nowhere to be found in the source + // some columns in the schema is not in the source schemaColumns = Arrays.asList("a", "e"); sourceColumns = Arrays.asList("a", "B", "C"); - Assert.assertFalse(SchemaUtils.isValidOutputSchema(schemaColumns, sourceColumns)); + Assert.assertFalse(SchemaUtils.isValidSchemaDefinition(schemaColumns, sourceColumns, 0)); - // order mismatch + // order mismatch is allowed schemaColumns = Arrays.asList("c", "a", "b"); sourceColumns = Arrays.asList("a", "B", "C"); - Assert.assertFalse(SchemaUtils.isValidOutputSchema(schemaColumns, sourceColumns)); + Assert.assertTrue(SchemaUtils.isValidSchemaDefinition(schemaColumns, sourceColumns, 0)); } @Test