Skip to content

Commit

Permalink
Upgrade Gobblin to 0.17.0-dev-174
Browse files Browse the repository at this point in the history
Upgrade Gobblin to 0.17.0-dev-174
  • Loading branch information
chris9692 authored Jan 10, 2022
2 parents f772d9f + 4fb30c3 commit e56cfbe
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 48 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ buildscript {
ext.avroVersion = '1.8.1'
ext.awsVersion = '2.10.15'
ext.findBugsVersion = '3.0.0'
ext.gobblinVersion = '0.15.0-dev-9467'
ext.gobblinVersion = '0.17.0-dev-174'
ext.hadoopVersion = '2.3.0'
ext.hiveVersion = '1.0.1'
ext.javaVersion = JavaVersion.VERSION_1_8
Expand Down
2 changes: 2 additions & 0 deletions cdi-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies {
compile externalDependency.'commonsLang3'
compile externalDependency.'testng'
compile externalDependency.'jhyde'
compile externalDependency.'opencsv'

runtime externalDependency.'gobblin-azkaban'
runtime externalDependency.'gobblin-kafka-08'
Expand Down Expand Up @@ -75,6 +76,7 @@ configurations {
all*.exclude group: 'ch.qos.logback'
all*.exclude group: 'com.ibm.icu', module: 'icu4j'
all*.exclude group: 'org.pentaho'
all*.exclude group: 'org.slf4j', module: 'log4j-over-slf4j'
}

shadowJar {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,21 @@
package com.linkedin.cdi.extractor;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.primitives.Doubles;
import com.google.common.primitives.Floats;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonNull;
import com.google.gson.JsonObject;
import com.linkedin.cdi.configuration.StaticConstants;
import com.linkedin.cdi.filter.CsvSchemaBasedFilter;
import com.linkedin.cdi.keys.CsvExtractorKeys;
import com.linkedin.cdi.keys.ExtractorKeys;
import com.linkedin.cdi.keys.JobKeys;
import com.linkedin.cdi.preprocessor.InputStreamProcessor;
import com.linkedin.cdi.preprocessor.StreamProcessor;
import com.linkedin.cdi.util.JsonIntermediateSchema;
import com.linkedin.cdi.util.JsonUtils;
import com.linkedin.cdi.util.SchemaBuilder;
import com.linkedin.cdi.util.SchemaUtils;
import com.opencsv.CSVParser;
Expand All @@ -46,7 +44,6 @@
import org.apache.gobblin.configuration.WorkUnitState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;

import static com.linkedin.cdi.configuration.PropertyCollection.*;
import static com.linkedin.cdi.configuration.StaticConstants.*;
Expand Down Expand Up @@ -151,11 +148,14 @@ public String[] readRecord(String[] reuse) {
String[] row = readerIterator.next();
CsvSchemaBasedFilter csvSchemaBasedFilter = (CsvSchemaBasedFilter) rowFilter;
if (csvSchemaBasedFilter != null) {
row = csvSchemaBasedFilter.filter(row);
// when column projection is specified, the filter data should be the same size as the column projection
if (csvExtractorKeys.getColumnProjection().size() > 0 && row.length != csvExtractorKeys.getColumnProjection()
.size()) {
failWorkUnit("Some indicies in column projection are out of bound");
try {
if (csvExtractorKeys.getColumnProjection().isEmpty()
&& csvExtractorKeys.getHeaderRow() != null) {
csvExtractorKeys.setColumnProjection(mapColumnsDynamically(this.getSchemaArray()));
}
row = csvSchemaBasedFilter.filter(row);
} catch (Exception e) {
failWorkUnit("CSV column projection error");
}
}
return addDerivedFields(row);
Expand Down Expand Up @@ -449,4 +449,31 @@ protected boolean hasNext() {
protected boolean isFirst(long starting) {
return csvExtractorKeys.getCsvIterator() == null;
}

/**
* Dynamically map column index to defined schema
* This dynamic column projection should be called no more than once for each batch
* @param schemaArray defined schema array
* @return dynamically mapped column projection
*/
private List<Integer> mapColumnsDynamically(JsonArray schemaArray) {
if (!csvExtractorKeys.getColumnProjection().isEmpty()) {
return csvExtractorKeys.getColumnProjection();
}

List<Integer> columnProjection = Lists.newArrayList();
if (csvExtractorKeys.getHeaderRow() != null
&& csvExtractorKeys.getIsValidOutputSchema()) {
// use the header and schema to generate column projection, then filter
String[] headerRow = csvExtractorKeys.getHeaderRow();
for (JsonElement column: schemaArray) {
for (int i = 0; i < headerRow.length; i++) {
if (headerRow[i].equalsIgnoreCase(column.getAsJsonObject().get(KEY_WORD_COLUMN_NAME).getAsString())) {
columnProjection.add(i);
}
}
}
}
return columnProjection;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import java.util.Iterator;
import java.util.Map;
import javax.annotation.Nullable;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.WorkUnitState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -37,15 +35,18 @@
/**
* TextExtractor takes an InputStream, applies proper preprocessors, and returns a String output
*/
@Slf4j
public class TextExtractor extends MultistageExtractor<JsonArray, JsonObject> {
private static final Logger logger = LoggerFactory.getLogger(TextExtractor.class);
private static final Logger LOG = LoggerFactory.getLogger(TextExtractor.class);

private final static int TEXT_EXTRACTOR_BYTE_LIMIT = 1048576;
private final static int BUFFER_SIZE = 8192;
private final static String TEXT_EXTRACTOR_SCHEMA =
"[{\"columnName\":\"output\",\"isNullable\":true,\"dataType\":{\"type\":\"string\"}}]";
@Getter

public JsonExtractorKeys getJsonExtractorKeys() {
return jsonExtractorKeys;
}

private JsonExtractorKeys jsonExtractorKeys = new JsonExtractorKeys();

public TextExtractor(WorkUnitState state, JobKeys jobKeys) {
Expand Down Expand Up @@ -100,7 +101,7 @@ public JsonObject readRecord(JsonObject reuse) {
this.jsonExtractorKeys.setTotalCount(1);
StringBuffer output = new StringBuffer();
if (workUnitStatus.getBuffer() == null) {
logger.warn("Received a NULL InputStream, end the work unit");
LOG.warn("Received a NULL InputStream, end the work unit");
return null;
} else {
try {
Expand All @@ -118,7 +119,7 @@ public JsonObject readRecord(JsonObject reuse) {
JsonObject outputJson = addDerivedFields(jsonObject);
return outputJson;
} catch (Exception e) {
logger.error("Error while extracting from source or writing to target", e);
LOG.error("Error while extracting from source or writing to target", e);
this.state.setWorkingState(WorkUnitState.WorkingState.FAILED);
return null;
}
Expand All @@ -142,12 +143,12 @@ private void writeToStringBuffer(InputStream is, StringBuffer output) {
output.append(String.valueOf(buffer, 0, len));
totalBytes += len;
if (totalBytes > TEXT_EXTRACTOR_BYTE_LIMIT) {
logger.warn("Download limit of {} bytes reached for text extractor ", TEXT_EXTRACTOR_BYTE_LIMIT);
LOG.warn("Download limit of {} bytes reached for text extractor ", TEXT_EXTRACTOR_BYTE_LIMIT);
break;
}
}
is.close();
logger.info("TextExtractor: written {} bytes ", totalBytes);
LOG.info("TextExtractor: written {} bytes ", totalBytes);
} catch (IOException e) {
throw new RuntimeException("Unable to extract text in TextExtractor", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package com.linkedin.cdi.filter;

import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.linkedin.cdi.keys.CsvExtractorKeys;
import com.linkedin.cdi.util.JsonIntermediateSchema;
import java.util.Arrays;
Expand All @@ -14,8 +13,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.linkedin.cdi.configuration.StaticConstants.*;


/**
* Filter CSV records by Json Intermediate schema
Expand All @@ -26,41 +23,22 @@
public class CsvSchemaBasedFilter extends MultistageSchemaBasedFilter<String[]> {
private static final Logger LOG = LoggerFactory.getLogger(CsvSchemaBasedFilter.class);
final private CsvExtractorKeys csvExtractorKeys;
final private JsonArray schemaArray;

public CsvSchemaBasedFilter(JsonArray schema, CsvExtractorKeys csvExtractorKeys) {
super(new JsonIntermediateSchema(schema));
this.csvExtractorKeys = csvExtractorKeys;
this.schemaArray = schema;
}

@Override
public String[] filter(String[] input) {
List<Integer> columnProjection = csvExtractorKeys.getColumnProjection();

if (columnProjection.isEmpty()
&& csvExtractorKeys.getHeaderRow() != null
&& csvExtractorKeys.getIsValidOutputSchema()) {
// use the header and schema to generate column projection, then filter
String[] headerRow = csvExtractorKeys.getHeaderRow();
for (JsonElement column: schemaArray) {
for (int i = 0; i < headerRow.length; i++) {
if (headerRow[i].equalsIgnoreCase(column.getAsJsonObject().get(KEY_WORD_COLUMN_NAME).getAsString())) {
columnProjection.add(i);
}
}
}
csvExtractorKeys.setColumnProjection(columnProjection);
if (!csvExtractorKeys.getColumnProjection().isEmpty()) {
return filter(input, csvExtractorKeys.getColumnProjection());
}

if (columnProjection.size() > 0) {
return filter(input, columnProjection);
} else {
LOG.info("Defaulting to project first N columns");
// take first N column, where N is the number of columns in the schema
// if the schema's size larger than input, then the extra columns will be padded with null
return Arrays.copyOf(input, schema.getColumns().size());
}
// LOG.debug("Defaulting to project first N columns");
// take first N column, where N is the number of columns in the schema
// if the schema's size larger than input, then the extra columns will be padded with null
return Arrays.copyOf(input, schema.getColumns().size());
}

/**
Expand All @@ -77,6 +55,12 @@ String[] filter(String[] input, List<Integer> columnProjection) {
String[] output = new String[columnProjection.size()];
for (int i = 0; i < output.length; i++) {
if (columnProjection.get(i) < input.length) {
if (columnProjection.get(i) >= input.length) {
LOG.info("Input columns: {}", input.length);
LOG.info("Column projection at position {} is {}", i, columnProjection.get(i));
LOG.info("Input: {}", Arrays.toString(input));
throw new RuntimeException("Index in column projection out of bound");
}
output[i] = input[columnProjection.get(i)];
} else {
output[i] = StringUtils.EMPTY;
Expand Down
2 changes: 1 addition & 1 deletion gradle/scripts/dependencyDefinitions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ ext.externalDependency = [
"googleAnalytics": "com.google.apis:google-api-services-analytics:v3-rev134-1.22.0",
"googleDrive": "com.google.apis:google-api-services-drive:v3-rev42-1.22.0",
"googleWebmasters": "com.google.apis:google-api-services-webmasters:v3-rev17-1.22.0",
"opencsv": "com.opencsv:opencsv:3.8",
"opencsv": "com.opencsv:opencsv:3.9",
"grok": "io.thekraken:grok:0.1.5",
"hadoopAdl" : "org.apache.hadoop:hadoop-azure-datalake:3.0.0-alpha2",
"orcMapreduce":"org.apache.orc:orc-mapreduce:1.6.5:nohive",
Expand Down

0 comments on commit e56cfbe

Please sign in to comment.