Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for replica placement constraints topic configuration #1327

Open
rca0 opened this issue Sep 10, 2024 · 0 comments
Open

Support for replica placement constraints topic configuration #1327

rca0 opened this issue Sep 10, 2024 · 0 comments

Comments

@rca0
Copy link

rca0 commented Sep 10, 2024

Currently, I’m working with Kafka in a multi-region setup, where I use replica-placement configurations within topic settings. I developed an internal tool using the segmentio/kafka-go package to automate topic creation in our pipeline whenever a branch is merged. The topics are created automatically based on YAML file configurations..

example topic-config.yaml

topics:
  topic_one:
    partitions: 1
    replicas: 3
    configs:
        retentiton.ms: 720000

  topic_two:
    partitions: 6
    replicas: 2
    configs:
        min.insync.replicas: 3

This configuration works smoothly.
However, we have a Kafka cluster where topics need to be created with the replica-placement option, and we have been doing it manually and we would like to automate it, follow the example;

kafka-topics --bootstrap-server $kafka_broker --create --topic topic_name --partitions 5 --config "min.insync.replicas=3" --replica--placement topic-reg.json

# content of topic-reg.json
{
  "version": 1,
  "replicas": [{"count": 3, "constraints": {"rack": "us-east-1"}]  
  "observers": [{"count": 3, "constraints": {"rack": "us-east-2"}]  
}

I tried implementing this in code using the segmentio/kafka-go package, but it’s not working properly. I attempted to pass this JSON file into a Kafka.ConfigEntry object, and the code looks something like this:

    constraints := PlacementConstraints{
        Version: 1,
        Replicas: []Replica{
            {Count: 3, Constraints: Constraint{Rack: "us-east-1"}},
        },
        Observers: []Replica{
            {Count: 3, Constraints: Constraint{Rack: "us-east-2"}},
        },
    }

    jsonConstraints, _ := json.Marshal(constraints)
    topicConfig := kafka.TopicConfig{
        Topic:             "topic-name",
        ConfigEntries: []kafka.ConfigEntry{
            {
                ConfigName:  "confluent.placement.constraints",
                ConfigValue: string(jsonConstraints)
            },
        },
    }

However, I’m getting an error from the segmentio/kafka-go package:

Invalid Request: This most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker. Check the broker logs for more details.

output should be like this;

Topic: topic_one   TopicId: DRdkdkd0djdjkdkd PartitionCount: 3 Configs: min.insync.replicas=3,retention.ms=720000,confluent.placement.constraints={"version": 1,"replicas": [{"count": 3, "constraints": {"rack": "us-east-1"}], "observers": [{"count": 3, "constraints": {"rack": "us-east-2"}]}

The confluent.placement.constraints option is within the Configs object, but it seems the library doesn’t support it.

Supporting documentation

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant