Skip to content

Commit

Permalink
Allow CSV schema define columns in different order from the header
Browse files Browse the repository at this point in the history
Allow CSV schema define columns in different order from the header
  • Loading branch information
chris9692 authored Jan 5, 2022
2 parents 4c38cca + 97d6175 commit f772d9f
Show file tree
Hide file tree
Showing 11 changed files with 210 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
import com.google.common.collect.Lists;
import com.google.gson.JsonObject;
import com.linkedin.cdi.util.CsvUtils;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.configuration.State;

import static com.linkedin.cdi.configuration.PropertyCollection.*;
import static com.linkedin.cdi.configuration.StaticConstants.*;


Expand Down Expand Up @@ -59,6 +59,10 @@ public boolean isValid(State state) {
if (columnProjections.trim().isEmpty()) {
return false;
}

if (expandColumnProjection(columnProjections).size() == 0) {
return false;
}
}

State tmpState = new State();
Expand Down Expand Up @@ -160,12 +164,12 @@ public Integer getColumnHeaderIndex(State state) {
return -1;
}

public String getColumnProjection(State state) {
public List<Integer> getColumnProjection(State state) {
JsonObject value = get(state);
if (value.has(COLUMN_PROJECTION)) {
return value.get(COLUMN_PROJECTION).getAsString();
return expandColumnProjection(value.get(COLUMN_PROJECTION).getAsString());
}
return StringUtils.EMPTY;
return new ArrayList<>();
}

public Long getMaxFailures(State state) {
Expand All @@ -183,4 +187,46 @@ public Boolean getKeepNullString(State state) {
}
return false;
}

/**
* Expand a column projection string into a list of indices
* @param columnProjection columns to project
* @return a list of column indices
*/
private List<Integer> expandColumnProjection(String columnProjection) {
List<Integer> expandedColumnProjection = new ArrayList<>();
if (StringUtils.isNotBlank(columnProjection)) {
for (String val : columnProjection.split(",")) {
if (val.matches("^(\\d+)-(\\d+)$")) { // range
try {
int left = Integer.parseInt(val.split("-")[0]);
int right = Integer.parseInt(val.split("-")[1]);
if (left < 0 || right < 0 || left >= right) {
return Lists.newArrayList();
} else {
for (int i = left; i <= right; i++) {
expandedColumnProjection.add(i);
}
}
} catch (Exception e) {
return Lists.newArrayList();
}
} else if (val.matches("^\\d+$")) { // single number
try {
int col = Integer.parseInt(val);
if (col < 0) {
return Lists.newArrayList();
} else {
expandedColumnProjection.add(col);
}
} catch (Exception e) {
return Lists.newArrayList();
}
} else { // unknown patterns
return Lists.newArrayList();
}
}
}
return expandedColumnProjection;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ protected boolean processInputStream(long starting) {
List<String> schemaColumns = new ArrayList<>(new JsonIntermediateSchema(jobKeys.getOutputSchema())
.getColumns().keySet());
List<String> fieldNames = AvroSchemaUtils.getSchemaFieldNames(avroExtractorKeys.getAvroOutputSchema());
avroExtractorKeys.setIsValidOutputSchema(SchemaUtils.isValidOutputSchema(schemaColumns, fieldNames));
avroExtractorKeys.setIsValidOutputSchema(
SchemaUtils.isValidSchemaDefinition(schemaColumns, fieldNames, jobKeys.getDerivedFields().size()));
}
} catch (Exception e) {
LOG.error("Source Error: {}", e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ protected void initialize(ExtractorKeys keys) {
// check if user has defined the output schema
if (jobKeys.hasOutputSchema()) {
JsonArray outputSchema = jobKeys.getOutputSchema();
csvExtractorKeys.setColumnProjection(expandColumnProjection(MSTAGE_CSV.getColumnProjection(state),
outputSchema.size()));
csvExtractorKeys.setColumnProjection(MSTAGE_CSV.getColumnProjection(state));
// initialize the column name to index map based on the schema when derived fields are present
if (jobKeys.getDerivedFields().entrySet().size() > 0) {
buildColumnToIndexMap(outputSchema);
Expand All @@ -113,14 +112,7 @@ protected void setCsvExtractorKeys(CsvExtractorKeys csvExtractorKeys) {
*/
@Override
public String getSchema() {
LOG.debug("Retrieving schema definition");
JsonArray schemaArray = super.getOrInferSchema();
Assert.assertNotNull(schemaArray);
if (jobKeys.getDerivedFields().size() > 0 && JsonUtils.get(StaticConstants.KEY_WORD_COLUMN_NAME,
jobKeys.getDerivedFields().keySet().iterator().next(), StaticConstants.KEY_WORD_COLUMN_NAME, schemaArray) == JsonNull.INSTANCE) {
schemaArray.addAll(addDerivedFieldsToAltSchema());
}
return schemaArray.toString();
return getSchemaArray().toString();
}

/**
Expand Down Expand Up @@ -262,51 +254,9 @@ protected boolean processInputStream(long starting) {
protected void setRowFilter(JsonArray schemaArray) {
if (rowFilter == null) {
if (MSTAGE_ENABLE_SCHEMA_BASED_FILTERING.get(state)) {
rowFilter = new CsvSchemaBasedFilter(new JsonIntermediateSchema(schemaArray), csvExtractorKeys);
}
}
}

/**
* Expand a column projection input string
* @param columnProjection columns to project
* @param numColumnsInPredefinedSchema number of columns
* @return a set of column indices
*/
private Set<Integer> expandColumnProjection(String columnProjection, int numColumnsInPredefinedSchema) {
Set<Integer> expandedColumnProjection = new HashSet<>();
if (columnProjection != null && columnProjection.length() > 0) {
for (String val : columnProjection.split(",")) {
if (val.matches("^(\\d+)-(\\d+)$")) { // range
int left = Integer.parseInt(val.split("-")[0]);
int right = Integer.parseInt(val.split("-")[1]);
if (left < 0 || right < 0 || left >= right) {
failWorkUnit(String.format("Invalid range in column projection input %s", val));
break;
} else {
for (int i = left; i <= right; i++) {
expandedColumnProjection.add(i);
}
}
} else if (val.matches("^\\d+$")) { // single number
int col = Integer.parseInt(val);
if (col < 0) {
failWorkUnit(String.format("Invalid index in column projection input %s", val));
break;
} else {
expandedColumnProjection.add(col);
}
} else { // unknown patterns
failWorkUnit(String.format("Invalid value in column projection input %s", val));
break;
}
}

if (expandedColumnProjection.size() != numColumnsInPredefinedSchema) {
failWorkUnit("The number of columns in column projection does not match the size of the predefined schema");
rowFilter = new CsvSchemaBasedFilter(schemaArray, csvExtractorKeys);
}
}
return expandedColumnProjection;
}

/**
Expand Down Expand Up @@ -391,7 +341,8 @@ private void skipRowAndSaveHeader(Iterator<String[]> readerIterator) {
List<String> schemaColumns =
new ArrayList<>(new JsonIntermediateSchema(jobKeys.getOutputSchema()).getColumns().keySet());
List<String> headerRow = Arrays.asList(csvExtractorKeys.getHeaderRow());
csvExtractorKeys.setIsValidOutputSchema(SchemaUtils.isValidOutputSchema(schemaColumns, headerRow));
csvExtractorKeys.setIsValidOutputSchema(
SchemaUtils.isValidSchemaDefinition(schemaColumns, headerRow, jobKeys.getDerivedFields().size()));
}
}
linesRead++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,7 @@ protected void setJsonExtractorKeys(JsonExtractorKeys jsonExtractorKeys) {
*/
@Override
public JsonArray getSchema() {
LOG.debug("Retrieving schema definition");
JsonArray schemaArray = super.getOrInferSchema();
Assert.assertNotNull(schemaArray);
if (jobKeys.getDerivedFields().size() > 0 && JsonUtils.get(StaticConstants.KEY_WORD_COLUMN_NAME,
jobKeys.getDerivedFields().keySet().iterator().next(), StaticConstants.KEY_WORD_COLUMN_NAME, schemaArray) == JsonNull.INSTANCE) {
schemaArray.addAll(addDerivedFieldsToAltSchema());
}
return schemaArray;
return getSchemaArray();
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import com.google.common.collect.ImmutableList;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonNull;
import com.google.gson.JsonObject;
import com.linkedin.cdi.configuration.StaticConstants;
import com.linkedin.cdi.connection.MultistageConnection;
import com.linkedin.cdi.exception.RetriableAuthenticationException;
import com.linkedin.cdi.filter.JsonSchemaBasedFilter;
Expand Down Expand Up @@ -51,6 +53,7 @@
import org.joda.time.Period;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;

import static com.linkedin.cdi.configuration.PropertyCollection.*;
import static com.linkedin.cdi.configuration.StaticConstants.*;
Expand Down Expand Up @@ -974,4 +977,23 @@ protected Object endProcessingAndValidateCount() {
}
return null;
}

/**
* Add derived fields to defined schema if they are not in already.
*
* In a LKG (last known good) source schema definition, the derived fields could
* have been included in the schedule definition already, hence no action.
*
* @return schema that is structured as a JsonArray with derived fields if they are not added already
*/
protected JsonArray getSchemaArray() {
LOG.debug("Retrieving schema definition");
JsonArray schemaArray = getOrInferSchema();
Assert.assertNotNull(schemaArray);
if (jobKeys.getDerivedFields().size() > 0 && JsonUtils.get(StaticConstants.KEY_WORD_COLUMN_NAME,
jobKeys.getDerivedFields().keySet().iterator().next(), StaticConstants.KEY_WORD_COLUMN_NAME, schemaArray) == JsonNull.INSTANCE) {
schemaArray.addAll(addDerivedFieldsToAltSchema());
}
return schemaArray;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@

package com.linkedin.cdi.filter;

import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.linkedin.cdi.keys.CsvExtractorKeys;
import com.linkedin.cdi.util.JsonIntermediateSchema;
import java.util.Arrays;
import java.util.Set;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.linkedin.cdi.configuration.StaticConstants.*;


/**
* Filter CSV records by Json Intermediate schema
Expand All @@ -20,31 +25,38 @@
*/
public class CsvSchemaBasedFilter extends MultistageSchemaBasedFilter<String[]> {
private static final Logger LOG = LoggerFactory.getLogger(CsvSchemaBasedFilter.class);
private CsvExtractorKeys csvExtractorKeys;
final private CsvExtractorKeys csvExtractorKeys;
final private JsonArray schemaArray;

public CsvSchemaBasedFilter(JsonIntermediateSchema schema, CsvExtractorKeys csvExtractorKeys) {
super(schema);
public CsvSchemaBasedFilter(JsonArray schema, CsvExtractorKeys csvExtractorKeys) {
super(new JsonIntermediateSchema(schema));
this.csvExtractorKeys = csvExtractorKeys;
this.schemaArray = schema;
}

@Override
public String[] filter(String[] input) {
Set<Integer> columnProjection = csvExtractorKeys.getColumnProjection();
if (columnProjection.size() > 0) {
// use user-defined column projection to filter
return filter(input, columnProjection);
} else if (csvExtractorKeys.getHeaderRow() != null && csvExtractorKeys.getIsValidOutputSchema()) {
List<Integer> columnProjection = csvExtractorKeys.getColumnProjection();

if (columnProjection.isEmpty()
&& csvExtractorKeys.getHeaderRow() != null
&& csvExtractorKeys.getIsValidOutputSchema()) {
// use the header and schema to generate column projection, then filter
String[] headerRow = csvExtractorKeys.getHeaderRow();
for (int i = 0; i < headerRow.length; i++) {
if (schema.getColumns().keySet().stream().anyMatch(headerRow[i]::equalsIgnoreCase)) {
columnProjection.add(i);
for (JsonElement column: schemaArray) {
for (int i = 0; i < headerRow.length; i++) {
if (headerRow[i].equalsIgnoreCase(column.getAsJsonObject().get(KEY_WORD_COLUMN_NAME).getAsString())) {
columnProjection.add(i);
}
}
}
csvExtractorKeys.setColumnProjection(columnProjection);
}

if (columnProjection.size() > 0) {
return filter(input, columnProjection);
} else {
LOG.debug("Defaulting to project first N columns");
LOG.info("Defaulting to project first N columns");
// take first N column, where N is the number of columns in the schema
// if the schema's size larger than input, then the extra columns will be padded with null
return Arrays.copyOf(input, schema.getColumns().size());
Expand All @@ -57,19 +69,19 @@ public String[] filter(String[] input) {
* @param columnProjection column projection
* @return modified row
*/
private String[] filter(String[] input, Set<Integer> columnProjection) {
int curr = 0;
for (int i = 0; i < input.length; i++) {
if (columnProjection.contains(i)) {
swap(input, i, curr++);
}
String[] filter(String[] input, List<Integer> columnProjection) {
if (columnProjection.size() == 0) {
return null;
}
return Arrays.copyOf(input, curr);
}

private void swap(String[] input, int i, int j) {
String temp = input[i];
input[i] = input[j];
input[j] = temp;
String[] output = new String[columnProjection.size()];
for (int i = 0; i < output.length; i++) {
if (columnProjection.get(i) < input.length) {
output[i] = input[columnProjection.get(i)];
} else {
output[i] = StringUtils.EMPTY;
}
}
return output;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@

package com.linkedin.cdi.keys;

import com.beust.jcommander.internal.Lists;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -35,7 +37,7 @@ public class CsvExtractorKeys extends ExtractorKeys {
// This is necessary as the input stream can only be read once
private Deque<String[]> sampleRows = new ArrayDeque<>();
private String[] headerRow;
private Set<Integer> columnProjection = new HashSet<>();
private List<Integer> columnProjection = Lists.newArrayList();
private Boolean isValidOutputSchema = true;
private String defaultFieldType = StringUtils.EMPTY;

Expand Down Expand Up @@ -99,11 +101,11 @@ public void setHeaderRow(String[] headerRow) {
this.headerRow = headerRow;
}

public Set<Integer> getColumnProjection() {
public List<Integer> getColumnProjection() {
return columnProjection;
}

public void setColumnProjection(Set<Integer> columnProjection) {
public void setColumnProjection(List<Integer> columnProjection) {
this.columnProjection = columnProjection;
}

Expand Down
Loading

0 comments on commit f772d9f

Please sign in to comment.