Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flatten transform causes this Connector to fail #139

Open
KPull opened this issue Nov 5, 2022 · 0 comments
Open

Flatten transform causes this Connector to fail #139

KPull opened this issue Nov 5, 2022 · 0 comments
Labels
help wanted Extra attention is needed

Comments

@KPull
Copy link

KPull commented Nov 5, 2022

The HTTP connector I had configured for my Aiven Kafka service started failing after I configured a 'Flatten' transform (org.apache.kafka.connect.transforms.Flatten$Value) on the connector.

I kept getting the following exception:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.connect.errors.DataException: Record value must be String, Schema Struct or HashMap, but class java.util.LinkedHashMap is given
	at io.aiven.kafka.connect.http.converter.RecordValueConverter.convert(RecordValueConverter.java:44)
	at io.aiven.kafka.connect.http.recordsender.BatchRecordSender.createRequestBody(BatchRecordSender.java:77)
	at io.aiven.kafka.connect.http.recordsender.BatchRecordSender.send(BatchRecordSender.java:60)
	at io.aiven.kafka.connect.http.HttpSinkTask.put(HttpSinkTask.java:97)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
	... 10 more

I emphasize that this only occurred after applying the "Flatten" transform on the value.

Here is the configuration I was using:

{
    "http.url": "PLACEHOLDER_URL",
    "batching.enabled": "true",
    "name": "clickhouse_player_view_events",
    "http.authorization.type": "static",
    "batch.max.size": "2500",
    "transforms": "transform-1",
    "connector.class": "io.aiven.kafka.connect.http.HttpSinkConnector",
    "http.headers.authorization": "PLACEHOLDER_AUTHORIZATION_HEADER",
    "tasks.max": "2",
    "http.headers.content.type": "application/json",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "errors.log.enable": "true",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "errors.log.include.messages": "true",
    "topics": "player_analytics",
    "request.method": "POST",
    "retry.on.status.codes": "400-500",
    "ssl.enabled": "true",
    "value.converter.schemas.enable": "false",
    "transforms.transform-1.delimiter": ".",
    "transforms.transform-1.type": "org.apache.kafka.connect.transforms.Flatten$Value"
}
@jeqo jeqo added the help wanted Extra attention is needed label Jul 5, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted Extra attention is needed
Projects
None yet
Development

No branches or pull requests

2 participants