Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No rebalance after empty assignment in consumer group #1314

Open
arxon31 opened this issue Aug 6, 2024 · 0 comments
Open

No rebalance after empty assignment in consumer group #1314

arxon31 opened this issue Aug 6, 2024 · 0 comments
Labels

Comments

@arxon31
Copy link

arxon31 commented Aug 6, 2024

Describe the bug

PartitionWatcher in the consumer group cannot track the creation of a topic. The problem occurs if the topic is created after JoinGroup, but before running PartitionWatcher.

I noticed that Kafka creates a topic asynchronously and the completion of this operation can occur at a time when consumer in the consumer group has already received an empty assignment, but the partitionWatcher has not started yet. Then partitionWatcher at the start counts the partitions of the created topic from Kafka, sees any partitions there (depends on what parameters the topic was created with) and enters a loop in which it polls Kafka and checks the current number of partitions in the topic with the initial one.

But my flow of working with Kafka does not imply adding partitions to a topic in runtime, and accordingly, a consumer in a consumer group will never start consuming data from a topic partition.

Kafka Version

To Reproduce

Resources to reproduce the behavior:

x-kafka-broker: &kafka-broker  
  image: confluentinc/cp-kafka:7.6.0  
  expose:  
    - 29092  
    - 9092  
    - 9093  
  deploy:  
    resources:  
      limits:  
        cpus: '2'  
        memory: 1024M  
  
x-kafka-env: &kafka-env  
  CLUSTER_ID: Atx8sxWjSXuyC43GCCUpAA  
  KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT  
  KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT  
  KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER  
  KAFKA_PROCESS_ROLES: controller,broker  
  KAFKA_LOG_DIRS: /tmp/kraft-combined-logs  
  KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1  
  KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka1:9093,2@kafka2:9093,3@kafka3:9093  
  
services:  
  kafka-node-1:  
    <<: *kafka-broker  
    hostname: kafka1  
    container_name: kafka-node-1  
    links:  
      - kafka-node-2  
      - kafka-node-3  
    environment:  
      <<: *kafka-env  
      KAFKA_NODE_ID: 1  
      KAFKA_BROKER_ID: 1  
      KAFKA_LISTENERS: PLAINTEXT://kafka1:29092,CONTROLLER://kafka1:9093,PLAINTEXT_HOST://kafka1:9092  
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://kafka1:9092  
    networks:  
      default:  
        ipv4_address: 172.18.0.11  
  
  kafka-node-2:  
    <<: *kafka-broker  
    hostname: kafka2  
    container_name: kafka-node-2  
    environment:  
      <<: *kafka-env  
      KAFKA_NODE_ID: 2  
      KAFKA_BROKER_ID: 2  
      KAFKA_LISTENERS: PLAINTEXT://kafka2:29092,CONTROLLER://kafka2:9093,PLAINTEXT_HOST://kafka2:9092  
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29092,PLAINTEXT_HOST://kafka2:9092  
    networks:  
      default:  
        ipv4_address: 172.18.0.12  
  
  kafka-node-3:  
    <<: *kafka-broker  
    hostname: kafka3  
    container_name: kafka-node-3  
    environment:  
      <<: *kafka-env  
      KAFKA_NODE_ID: 3  
      KAFKA_BROKER_ID: 3  
      KAFKA_LISTENERS: PLAINTEXT://kafka3:29092,CONTROLLER://kafka3:9093,PLAINTEXT_HOST://kafka3:9092  
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:29092,PLAINTEXT_HOST://kafka3:9092  
    networks:  
      default:  
        ipv4_address: 172.18.0.13  
  
  schema-registry:  
    image: confluentinc/cp-schema-registry:7.6.0  
    container_name: schema-registry  
    depends_on:  
      - kafka-node-1  
      - kafka-node-2  
      - kafka-node-3  
    environment:  
      SCHEMA_REGISTRY_HOST_NAME: schema-registry  
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka1:29092  
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081  
      SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT  
      SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas  
    ports:  
      - "8081:8081"  
  
  kafka-ui:  
    image: provectuslabs/kafka-ui:v0.4.0  
    container_name: kafka-ui  
    ports:  
      - "8080:8080"  
    depends_on:  
      - kafka-node-1  
      - kafka-node-2  
      - kafka-node-3  
      - schema-registry  
    environment:  
      KAFKA_CLUSTERS_0_NAME: local  
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:29092  
      KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081  
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: connect  
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://connect:8083  
  
networks:  
  default:  
    name: kafka-test  
    ipam:  
      config:  
        - subnet: 172.18.0.0/24  
          gateway: 172.18.0.1
package main  
  
import (  
    "context"  
    "fmt"    
    "github.com/segmentio/kafka-go"    
    "os/signal"    
    "sync"    
    "syscall"    
    "time")  
  
func main() {  
    ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGQUIT)  
    defer cancel()  
  
    brokers := []string{"kafka1", "kafka2", "kafka3"}  
  
    wg := &sync.WaitGroup{}  
  
    for i := 0; i < 20; i++ {  
       topics := []string{fmt.Sprintf("topic-%d", i)}  
       group := fmt.Sprintf("group-%d", i)  
       config := kafka.ReaderConfig{  
          Brokers:                brokers,  
          GroupTopics:            topics,  
          GroupID:                group,  
          WatchPartitionChanges:  true,  
          PartitionWatchInterval: time.Second,  
       }  
  
       r := kafka.NewReader(config)  
  
       wg.Add(1)  
  
       go func(r *kafka.Reader, wg *sync.WaitGroup) {  
          defer func() {  
             r.Close()  
             wg.Done()  
          }()  
          r.ReadMessage(ctx)  
       }(r, wg)  
    }  
  
    <-ctx.Done()  
  
    wg.Wait()  
}

Expected Behavior

All created consumer groups assigned with topics

Observed Behavior

Some consumer groups do not have topics. Consumers don't read messages. In my flow using Kafka, these consumers can only receive their messages after reloading the application.

Often times, pasting the logging output from a kafka.Reader or kafka.Writer will
provide useful details to help maintainers investigate the issue and provide a
fix. If possible, providing stack traces or CPU/memory profiles may also contain
valuable information to understand the conditions that triggered the issue.

Additional Context

Rarely reproduced with a single Kafka instance. Most often with a cluster.

Screenshot from kafka-ui

Screenshot from 2024-08-06 12-44-08

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant