Skip to content

Commit

Permalink
fix(filters): fix regression for configuruing on failure filter (#114)
Browse files Browse the repository at this point in the history
Resolves: #114
  • Loading branch information
fhussonnois committed Mar 30, 2021
1 parent 2332c79 commit 63176ca
Show file tree
Hide file tree
Showing 17 changed files with 203 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ public interface RecordFilter extends Configurable {
@Override
void configure(final Map<String, ?> configs);

default void configure(final Map<String, ?> configs,
final RecordFilterProvider provider) {
configure(configs);
}

/**
* Configuration specification for this filter.
*
Expand Down Expand Up @@ -115,4 +120,11 @@ default RecordFilterPipeline<FileRecord<TypedStruct>> onFailure() {
default boolean ignoreFailure() {
return false;
}


@FunctionalInterface
static public interface RecordFilterProvider {

RecordFilter getRecordForAlias(final String alias);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,12 @@
*/
package io.streamthoughts.kafka.connect.filepulse.config;

import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline;
import io.streamthoughts.kafka.connect.filepulse.filter.RecordFilter;
import io.streamthoughts.kafka.connect.filepulse.filter.RecordFilterPipeline;
import io.streamthoughts.kafka.connect.filepulse.filter.condition.ExpressionFilterCondition;
import io.streamthoughts.kafka.connect.filepulse.filter.condition.FilterCondition;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
import io.streamthoughts.kafka.connect.filepulse.source.TypedFileRecord;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.errors.ConnectException;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -91,25 +84,9 @@ public boolean ignoreFailure() {
return getBoolean(IGNORE_FAILURE_CONFIG);
}

public RecordFilterPipeline<FileRecord<TypedStruct>> onFailure() {
final List<String> filterAliases = getList(ON_FAILURE_CONFIG);

if (filterAliases == null) return null;

final List<RecordFilter> filters = new ArrayList<>(filterAliases.size());
for (String alias : filterAliases) {
final String prefix = "filters." + alias + ".";
try {
final RecordFilter filter = getClass(prefix + "type")
.asSubclass(RecordFilter.class)
.getDeclaredConstructor().newInstance();
filter.configure(originalsWithPrefix(prefix));
filters.add(filter);
} catch (Exception e) {
throw new ConnectException(e);
}
}
return filters.isEmpty() ? null : new DefaultRecordFilterPipeline(filters);
public List<String> onFailure() {
final List<String> aliases = getList(ON_FAILURE_CONFIG);
return aliases == null ? Collections.emptyList() : aliases;
}

public static ConfigDef withOverwrite(final ConfigDef def) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,19 @@
*/
package io.streamthoughts.kafka.connect.filepulse.filter;

import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.data.merger.DefaultTypeValueMerger;
import io.streamthoughts.kafka.connect.filepulse.data.merger.TypeValueMerger;
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public abstract class AbstractMergeRecordFilter<T extends AbstractRecordFilter> extends AbstractRecordFilter<T> {
public abstract class AbstractMergeRecordFilter<T extends AbstractRecordFilter<T>> extends AbstractRecordFilter<T> {

private TypeValueMerger merger = new DefaultTypeValueMerger();
private final TypeValueMerger merger = new DefaultTypeValueMerger();

/**
* {@inheritDoc}
*/
@Override
public void configure(final Map<String, ?> props) {
super.configure(props);
}

/**
* {@inheritDoc}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@
*/
package io.streamthoughts.kafka.connect.filepulse.filter;

import io.streamthoughts.kafka.connect.filepulse.config.CommonFilterConfig;
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.filter.condition.FilterCondition;
import io.streamthoughts.kafka.connect.filepulse.config.CommonFilterConfig;
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
import org.apache.kafka.common.config.ConfigDef;

import java.util.Map;
import java.util.stream.Collectors;

public abstract class AbstractRecordFilter<T extends AbstractRecordFilter> implements RecordFilter {
public abstract class AbstractRecordFilter<T extends AbstractRecordFilter<T>> implements RecordFilter {

private RecordFilterPipeline<FileRecord<TypedStruct>> failurePipeline;

Expand All @@ -45,11 +46,24 @@ public abstract class AbstractRecordFilter<T extends AbstractRecordFilter> imple
* {@inheritDoc}
*/
@Override
public void configure(final Map<String, ?> props) {
public void configure(final Map<String, ?> configs) {
// intentionally left blank
}

/**
* {@inheritDoc}
*/
@Override
public void configure(final Map<String, ?> props, final RecordFilterProvider provider) {
final CommonFilterConfig config = new CommonFilterConfig(configDef(), props);
condition = config.condition();
failurePipeline = config.onFailure();
ignoreFailure = config.ignoreFailure();
if (!config.onFailure().isEmpty()) {
failurePipeline = new DefaultRecordFilterPipeline(config.onFailure().stream()
.map(provider::getRecordForAlias)
.collect(Collectors.toList()));
}
configure(props);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void setUp() {
public void shouldSupportPropertyExpressionForValueConfig() {
configs.put(AppendFilterConfig.APPEND_FIELD_CONFIG, "$.target");
configs.put(AppendFilterConfig.APPEND_VALUE_CONFIG, EXPRESSION);
filter.configure(configs);
filter.configure(configs, alias -> null);

RecordsIterable<TypedStruct> output = filter.apply(context, STRUCT);
Assert.assertNotNull(output);
Expand All @@ -68,7 +68,7 @@ public void shouldSupportPropertyExpressionForValueConfig() {
public void shouldSupportPropertyExpressionForFieldConfig() {
configs.put(AppendFilterConfig.APPEND_FIELD_CONFIG, "$.target");
configs.put(AppendFilterConfig.APPEND_VALUE_CONFIG, EXPRESSION);
filter.configure(configs);
filter.configure(configs, alias -> null);

RecordsIterable<TypedStruct> output = filter.apply(context, STRUCT);
Assert.assertNotNull(output);
Expand All @@ -80,7 +80,7 @@ public void shouldSupportPropertyExpressionForFieldConfig() {
public void shouldSupportSubstitutionExpressionForFieldConfig() {
configs.put(AppendFilterConfig.APPEND_FIELD_CONFIG, "{{ '$.'extract_array($.values,0) }}");
configs.put(AppendFilterConfig.APPEND_VALUE_CONFIG, EXPRESSION);
filter.configure(configs);
filter.configure(configs, alias -> null);

RecordsIterable<TypedStruct> output = filter.apply(context, STRUCT);
Assert.assertNotNull(output);
Expand All @@ -92,7 +92,7 @@ public void shouldSupportSubstitutionExpressionForFieldConfig() {
public void shouldSupportPropertyExpressionWithScopeForFieldConfig() {
configs.put(AppendFilterConfig.APPEND_FIELD_CONFIG, "$topic");
configs.put(AppendFilterConfig.APPEND_VALUE_CONFIG, "my-topic-{{ extract_array($.values,0) }}");
filter.configure(configs);
filter.configure(configs, alias -> null);
filter.apply(context, STRUCT);
Assert.assertEquals("my-topic-foo", context.topic());
}
Expand All @@ -102,7 +102,7 @@ public void shouldOverwriteExistingValueGivenOverwriteTrue() {
configs.put(AppendFilterConfig.APPEND_FIELD_CONFIG, "$value.field");
configs.put(AppendFilterConfig.APPEND_VALUE_CONFIG, "bar");
configs.put(AppendFilterConfig.APPEND_OVERWRITE_CONFIG, "true");
filter.configure(configs);
filter.configure(configs, alias -> null);
final TypedStruct input = TypedStruct.create().put("field", "foo");
RecordsIterable<TypedStruct> results = filter.apply(context, input, false);
Assert.assertEquals("bar", results.last().getString("field"));
Expand All @@ -113,7 +113,7 @@ public void shouldMergeExistingValueGivenOverwriteFalse() {
configs.put(AppendFilterConfig.APPEND_FIELD_CONFIG, "$value.field");
configs.put(AppendFilterConfig.APPEND_VALUE_CONFIG, "bar");
configs.put(AppendFilterConfig.APPEND_OVERWRITE_CONFIG, "false");
filter.configure(configs);
filter.configure(configs, alias -> null);
final TypedStruct input = TypedStruct.create().put("field", "foo");
RecordsIterable<TypedStruct> results = filter.apply(context, input, false);
Assert.assertEquals("[foo, bar]", results.last().getArray("field").toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void setUp() {
public void should_convert_value_given_valid_field() {
configs.put(ConvertFilterConfig.CONVERT_FIELD_CONFIG, "field");
configs.put(ConvertFilterConfig.CONVERT_TO_CONFIG, "boolean");
filter.configure(configs);
filter.configure(configs, alias -> null);

TypedStruct struct = TypedStruct.create().put("field", "yes");
List<TypedStruct> results = filter.apply(context, struct, false).collect();
Expand All @@ -71,7 +71,7 @@ public void should_convert_value_given_valid_field() {
public void should_convert_value_given_valid_path() {
configs.put(ConvertFilterConfig.CONVERT_FIELD_CONFIG, "field.child");
configs.put(ConvertFilterConfig.CONVERT_TO_CONFIG, "boolean");
filter.configure(configs);
filter.configure(configs, alias -> null);

TypedStruct struct = TypedStruct.create().insert("field.child", "yes");
List<TypedStruct> results = filter.apply(context, struct, false).collect();
Expand All @@ -89,7 +89,7 @@ public void should_fail_given_invalid_path_and_ignore_missing_false() {
configs.put(ConvertFilterConfig.CONVERT_FIELD_CONFIG, "field");
configs.put(ConvertFilterConfig.CONVERT_TO_CONFIG, "boolean");
configs.put(ConvertFilterConfig.CONVERT_IGNORE_MISSING_CONFIG, "false");
filter.configure(configs);
filter.configure(configs, alias -> null);
filter.apply(context, TypedStruct.create(), false).collect();
}

Expand All @@ -98,7 +98,7 @@ public void should_fail_given_not_convertible_value_and_not_default() {
configs.put(ConvertFilterConfig.CONVERT_FIELD_CONFIG, "field");
configs.put(ConvertFilterConfig.CONVERT_TO_CONFIG, "integer");
configs.put(ConvertFilterConfig.CONVERT_IGNORE_MISSING_CONFIG, "false");
filter.configure(configs);
filter.configure(configs, alias -> null);

TypedStruct struct = TypedStruct.create().insert("field", "dummy");
filter.apply(context, struct, false).collect();
Expand All @@ -110,7 +110,7 @@ public void should_use_default_given_not_convertible_value() {
configs.put(ConvertFilterConfig.CONVERT_TO_CONFIG, "integer");
configs.put(ConvertFilterConfig.CONVERT_DEFAULT_CONFIG, "-1");
configs.put(ConvertFilterConfig.CONVERT_IGNORE_MISSING_CONFIG, "false");
filter.configure(configs);
filter.configure(configs, alias -> null);

TypedStruct struct = TypedStruct.create().insert("field", "dummy");
List<TypedStruct> results = filter.apply(context, struct, false).collect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void shouldConvertToEpochTimeGivenNoTimezoneAndNoLocale() {
configs.put(DateFilterConfig.DATE_TARGET_CONFIG, "$.timestamp");
configs.put(DateFilterConfig.DATE_FORMATS_CONFIG, Collections.singletonList("yyyy-MM-dd'T'HH:mm:ss"));

filter.configure(configs);
filter.configure(configs, alias -> null);
TypedStruct struct = TypedStruct.create().put("date", "2001-07-04T12:08:56");
List<TypedStruct> results = filter.apply(context, struct, false).collect();

Expand All @@ -69,7 +69,7 @@ public void shouldConvertToEpochTimeGivenTimezone() {
configs.put(DateFilterConfig.DATE_TIMEZONE_CONFIG, "Europe/Paris");
configs.put(DateFilterConfig.DATE_FORMATS_CONFIG, Collections.singletonList("yyyy-MM-dd'T'HH:mm:ss"));

filter.configure(configs);
filter.configure(configs, alias -> null);
TypedStruct struct = TypedStruct.create().put("date", "2001-07-04T14:08:56");
List<TypedStruct> results = filter.apply(context, struct, false).collect();

Expand All @@ -85,7 +85,7 @@ public void shouldConvertToEpochTimeGivenLocale() {
configs.put(DateFilterConfig.DATE_LOCALE_CONFIG, "fr_FR");
configs.put(DateFilterConfig.DATE_FORMATS_CONFIG, Collections.singletonList("EEEE, d MMMM yyyy HH:mm:ss"));

filter.configure(configs);
filter.configure(configs, alias -> null);
TypedStruct struct = TypedStruct.create().put("date", "mercredi, 4 juillet 2001 12:08:56");
List<TypedStruct> results = filter.apply(context, struct, false).collect();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void setUp() {

@Test
public void should_auto_generate_schema_given_no_schema_field() {
filter.configure(configs);
filter.configure(configs, alias -> null);
RecordsIterable<TypedStruct> output = filter.apply(null, DEFAULT_STRUCT, false);
Assert.assertNotNull(output);
Assert.assertEquals(1, output.size());
Expand All @@ -70,7 +70,7 @@ public void should_auto_generate_schema_given_no_schema_field() {
@Test
public void should_extract_column_names_from_given_field() {
configs.put(READER_EXTRACT_COLUMN_NAME_CONFIG, "headers");
filter.configure(configs);
filter.configure(configs, alias -> null);
RecordsIterable<TypedStruct> output = filter.apply(null, DEFAULT_STRUCT, false);
Assert.assertNotNull(output);
Assert.assertEquals(1, output.size());
Expand All @@ -85,7 +85,7 @@ public void should_extract_column_names_from_given_field() {
public void should_extract_repeated_columns_names_from_given_field() {
configs.put(READER_EXTRACT_COLUMN_NAME_CONFIG, "headers");
configs.put(READER_FIELD_DUPLICATE_COLUMNS_AS_ARRAY_CONFIG, "true");
filter.configure(configs);
filter.configure(configs, alias -> null);

final TypedStruct input = TypedStruct.create()
.put("message", "value1;value2-1;value2-2;value2-3;value3;value2-4")
Expand All @@ -106,7 +106,7 @@ public void should_extract_repeated_columns_names_from_given_field() {
public void should_fail_given_repeated_columns_names_and_duplicate_not_allowed() {
configs.put(READER_EXTRACT_COLUMN_NAME_CONFIG, "headers");
configs.put(READER_FIELD_DUPLICATE_COLUMNS_AS_ARRAY_CONFIG, "false");
filter.configure(configs);
filter.configure(configs, alias -> null);

final TypedStruct input = TypedStruct.create()
.put("message", "value1;value2-1;value2-2;value2-3;value3;value2-4")
Expand All @@ -119,7 +119,7 @@ public void should_fail_given_repeated_columns_names_and_duplicate_not_allowed()
@Test
public void should_use_configured_schema() {
configs.put(READER_FIELD_COLUMNS_CONFIG, "c1:STRING;c2:INTEGER;c3:BOOLEAN");
filter.configure(configs);
filter.configure(configs, alias -> null);
RecordsIterable<TypedStruct> output = filter.apply(null, DEFAULT_STRUCT, false);
Assert.assertNotNull(output);
Assert.assertEquals(1, output.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void setUp() {
public void shouldThrownExceptionWhenConditionIsTrue() {
configs.put(FailFilterConfig.MESSAGE_CONFIG, "Unexpected error");
FailFilter filter = new FailFilter();
filter.configure(configs);
filter.configure(configs, alias -> null);

try {
filter.apply(context, DEFAULT_DATA, false);
Expand All @@ -70,7 +70,7 @@ public void shouldThrownExceptionWhenConditionIsTrue() {
public void shouldEvaluateMessageExpression() {
configs.put(FailFilterConfig.MESSAGE_CONFIG, "Unexpected error : {{ $value.message }}");
FailFilter filter = new FailFilter();
filter.configure(configs);
filter.configure(configs, alias -> null);

try {
filter.apply(context, DEFAULT_DATA, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void setUp() {
@Test
public void testGivenDefaultProperties() {
configs.put(GrokFilterConfig.GROK_ROW_PATTERN_CONFIG, GROK_NAMED_CAPTURED_PATTERN);
filter.configure(configs);
filter.configure(configs, alias -> null);
List<TypedStruct> results = filter.apply(null, DATA, false).collect();

Assert.assertEquals(1, results.size());
Expand All @@ -66,7 +66,7 @@ public void testGivenDefaultProperties() {
public void testGivenOverwriteProperty() {
configs.put(GrokFilterConfig.GROK_ROW_PATTERN_CONFIG, GROK_NAMED_CAPTURED_PATTERN);
configs.put(CommonFilterConfig.FILTER_OVERWRITE_CONFIG, "message");
filter.configure(configs);
filter.configure(configs, alias -> null);
List<TypedStruct> results = filter.apply(null, DATA, false).collect();

Assert.assertEquals(1, results.size());
Expand All @@ -80,7 +80,7 @@ public void testGivenOverwriteProperty() {
public void testGivenNotMatchingInput() {
configs.put(GrokFilterConfig.GROK_ROW_PATTERN_CONFIG, GROK_NAMED_CAPTURED_PATTERN);
configs.put(CommonFilterConfig.FILTER_OVERWRITE_CONFIG, "message");
filter.configure(configs);
filter.configure(configs, alias -> null);
filter.apply(null, TypedStruct.create().put("message", "BAD INPUT"), false);
}

Expand All @@ -89,7 +89,7 @@ public void testGivenPatternWithNoGroupWhenCapturedNameOnlyIsFalse() {
configs.put(GrokFilterConfig.GROK_ROW_PATTERN_CONFIG, GROK_PATTERN);
configs.put(CommonFilterConfig.FILTER_OVERWRITE_CONFIG, "message");
configs.put(GrokFilterConfig.GROK_ROW_NAMED_CAPTURES_ONLY_CONFIG, "false");
filter.configure(configs);
filter.configure(configs, alias -> null);
List<TypedStruct> results = filter.apply(null, DATA, false).collect();

Assert.assertEquals(1, results.size());
Expand Down
Loading

0 comments on commit 63176ca

Please sign in to comment.