Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter operation is not working. #126

Open
himanshuankercloud opened this issue Jan 16, 2024 · 4 comments · May be fixed by #127
Open

Filter operation is not working. #126

himanshuankercloud opened this issue Jan 16, 2024 · 4 comments · May be fixed by #127

Comments

@himanshuankercloud
Copy link

himanshuankercloud commented Jan 16, 2024

We are currently setting up the MSK Connect pipeline to transfer data from a Kafka Topic to OpenSearch using the Aiven connector as a plugin. We have successfully created a topic and produced dummy data on that topic. Additionally, we've configured an MSK connector to read and write the data, successfully writing it into OpenSearch.

However, when attempting to filter the data based on a specific field, data is not being written to OpenSearch(It is also not creating the Index in OpenSearch). Below is the configuration for the MSK Connect:

connector.class=io.aiven.kafka.connect.opensearch.OpensearchSinkConnector
type.name=kafka-connect
connection.password=***********
transforms.Filter.type=io.aiven.kafka.connect.transforms.FilterByFieldValue$Value
transforms.Filter.field.name=EventId
tasks.max=1
topics=json-topic
connection.username=sample
transforms=Filter
bootstrap.servers="**************************"
key.ignore=true
schema.ignore=true
key.converter.schemas.enable=false
value.converter.schemas.enable=false
transforms.Filter.field.value=4690
transforms.Filter.field.value.matches=true
value.converter=org.apache.kafka.connect.json.JsonConverter
connection.url="
"
key.converter=org.apache.kafka.connect.storage.StringConverter

It is not working. I am not getting any errors but the filter data is not written to OpenSearch. I checked the data in topic also it contains the event id 4690.

Can you please help us.

@jeqo
Copy link
Contributor

jeqo commented Jan 23, 2024

Hi @himanshuankercloud.

It's hard to help troubleshooting this without an idea of how input messages look like. Could you share some sample input messages to see how this transformation may be applied?
You could also disable the SMT, and share how input and output messages look like. We can get some idea of what may be missing from these.

You could also look into the FilterByFieldValueTest tests to potentially reproduce the issue.

PS. I suggest to not copying real endpoints on public issues as it could lead to security issues.

@himanshuankercloud
Copy link
Author

himanshuankercloud commented Jan 25, 2024 via email

@himanshuankercloud
Copy link
Author

himanshuankercloud commented Jan 31, 2024

@jeqo
Can you please share your feedback.
Its urgent. your help will be very useful for us.
Thanks in Advance.

@jeqo
Copy link
Contributor

jeqo commented Jan 31, 2024

@himanshuankercloud thanks for sharing details on how to reproduce.

Can confirm that this is an issue. Seems to be a bug on how value schema is inferred on the Connect schema library and how validation happens on the transform:

This cause the filter test to always fail. See test https://github.com/Aiven-Open/transforms-for-apache-kafka-connect/pull/127/files

I see a couple of alternatives on how to fix:

  • Add a new config to set the type for expected value. In this case INT16.
  • If value is numeric, then validate against every type possible. In this case, instead of only testing input value as INT16, also test against INT8 and INT32.

Given this knowledge of mismatching numeric types, I wonder if you could apply some workaround with other SMTs to cast the value properly and apply the filter condition correctly, e.g. set the EventId as Key, then cast to INT16, and apply filter on the Key instead of Value. Maybe this would be enough to fix your use-case while this bug is getting fixed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants