Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Reader unable to read events from kafka when using StartOffset: kafka.LastOffset and GroupID when groupID is new #1291

Open
SpaskeISO opened this issue May 13, 2024 · 1 comment
Labels

Comments

@SpaskeISO
Copy link

Describe the bug

When using Kafka Reader to read messages from kafka with StartOffset set to last offset and group id was never used before the Reader just doesn't read any messages

Kafka Version

  • What version(s) of Kafka are you testing against?
    7.6.0
  • What version of kafka-go are you using?
    v0.4.47

To Reproduce

Resources to reproduce the behavior:

# docker compose file generated with https://github.com/sknop/kafka-docker-composer
services:
    controller-1:
        image: confluentinc/cp-server:7.6.0
        hostname: controller-1
        container_name: controller-1

        environment:
            KAFKA_NODE_ID: 1
            CLUSTER_ID: Nk018hRAQFytWskYqtQduw
            KAFKA_PROCESS_ROLES: controller
            KAFKA_LISTENERS: CONTROLLER://controller-1:19091
            KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT
            KAFKA_INTER_BROKER_LISTENER_NAME: CONTROLLER
            KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
            KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:19091
            KAFKA_JMX_PORT: 9999
            KAFKA_JMX_HOSTNAME: controller-1
            KAFKA_BROKER_RACK: rack-0
            KAFKA_DEFAULT_REPLICATION_FACTOR: 2
            KAFKA_OFFSET_REPLICATION_FACTOR: 2
            KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 2
            KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 2
            KAFKA_CONFLUENT_METADATA_TOPIC_REPLICATION_FACTOR: 2
            KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 2
            KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
            KAFKA_CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 2
            KAFKA_OPTS: -javaagent:/tmp/jmx_prometheus_javaagent-0.20.0.jar=8091:/tmp/kafka_config.yml
            KAFKA_CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093
        cap_add:
            - NET_ADMIN
        ports:
            - 19091:19091
        volumes:
            - $PWD/volumes/jmx_prometheus_javaagent-0.20.0.jar:/tmp/jmx_prometheus_javaagent-0.20.0.jar
            - $PWD/volumes/kafka_config.yml:/tmp/kafka_config.yml

    kafka-1:
        image: confluentinc/cp-server:7.6.0
        hostname: kafka-1
        container_name: kafka-1

        healthcheck:
            test: curl -fail --silent http://kafka-1:8090/kafka/v3/clusters/ --output /dev/null || exit 1
            interval: 10s
            retries: 10
            start_period: 20s
        depends_on:
            - controller-1
        environment:
            KAFKA_LISTENERS: PLAINTEXT://kafka-1:19092, EXTERNAL://0.0.0.0:9091
            KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
            KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:19092, EXTERNAL://localhost:9091
            KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
            KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
            KAFKA_JMX_PORT: 10001
            KAFKA_JMX_HOSTNAME: localhost
            KAFKA_BROKER_RACK: rack-0
            KAFKA_OPTS: -javaagent:/tmp/jmx_prometheus_javaagent-0.20.0.jar=8091:/tmp/kafka_config.yml
            KAFKA_MIN_INSYNC_REPLICAS: 1
            KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
            KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 2
            KAFKA_CONFLUENT_CLUSTER_LINK_ENABLE: False
            KAFKA_CONFLUENT_REPORTERS_TELEMETRY_AUTO_ENABLE: False
            KAFKA_NODE_ID: 2
            CLUSTER_ID: Nk018hRAQFytWskYqtQduw
            KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:19091
            KAFKA_PROCESS_ROLES: broker
            KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
            KAFKA_CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093
            KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
        cap_add:
            - NET_ADMIN
        ports:
            - 9091:9091
            - 10001:10001
            - 10101:8091
            - 10201:8090
        volumes:
            - $PWD/volumes/jmx_prometheus_javaagent-0.20.0.jar:/tmp/jmx_prometheus_javaagent-0.20.0.jar
            - $PWD/volumes/kafka_config.yml:/tmp/kafka_config.yml

    kafka-2:
        image: confluentinc/cp-server:7.6.0
        hostname: kafka-2
        container_name: kafka-2

        healthcheck:
            test: curl -fail --silent http://kafka-2:8090/kafka/v3/clusters/ --output /dev/null || exit 1
            interval: 10s
            retries: 10
            start_period: 20s
        depends_on:
            - controller-1
        environment:
            KAFKA_LISTENERS: PLAINTEXT://kafka-2:19093, EXTERNAL://0.0.0.0:9092
            KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
            KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:19093, EXTERNAL://localhost:9092
            KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
            KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
            KAFKA_JMX_PORT: 10002
            KAFKA_JMX_HOSTNAME: localhost
            KAFKA_BROKER_RACK: rack-0
            KAFKA_OPTS: -javaagent:/tmp/jmx_prometheus_javaagent-0.20.0.jar=8091:/tmp/kafka_config.yml
            KAFKA_MIN_INSYNC_REPLICAS: 1
            KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
            KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 2
            KAFKA_CONFLUENT_CLUSTER_LINK_ENABLE: False
            KAFKA_CONFLUENT_REPORTERS_TELEMETRY_AUTO_ENABLE: False
            KAFKA_NODE_ID: 3
            CLUSTER_ID: Nk018hRAQFytWskYqtQduw
            KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:19091
            KAFKA_PROCESS_ROLES: broker
            KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
            KAFKA_CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093
            KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
        cap_add:
            - NET_ADMIN
        ports:
            - 9092:9092
            - 10002:10002
            - 10102:8091
            - 10202:8090
        volumes:
            - $PWD/volumes/jmx_prometheus_javaagent-0.20.0.jar:/tmp/jmx_prometheus_javaagent-0.20.0.jar
            - $PWD/volumes/kafka_config.yml:/tmp/kafka_config.yml

    kafka-ui:
        container_name: kafka-ui
        image: provectuslabs/kafka-ui:latest
        ports:
            - 8080:8080
        depends_on:
            - controller-1
            - kafka-1
            - kafka-2
        environment:
            KAFKA_CLUSTERS_0_NAME: local
            KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: controller-1:19091,kafka-1:19092,kafka-2:19093
            KAFKA_CLUSTERS_0_METRICS_PORT: 9999
            DYNAMIC_CONFIG_ENABLED: 'true'




    portainer:
        image: portainer/portainer-ce:2.20.2
        container_name: portainer
        restart: unless-stopped
        ports:
        - 9000:9000
        volumes:
        - /var/run/docker.sock:/var/run/docker.sock
        - portainer_data:/data

volumes:
  portainer_data:
package main

import (
	"context"
	"encoding/json"
	"flag"
	"fmt"
	"log"
	"log/slog"
	"os"
	"os/signal"
	"strings"
	"syscall"
	"time"

	"example.com/kafka/util/configurator"
	timeutil "example.com/kafka/util/time-util"
	"github.com/segmentio/kafka-go"
)

var (
	cfgFileName string
)

func init() {
	flag.StringVar(&cfgFileName, "cfg", "/home/veljko/Desktop/Faks/Master Studije/Prva Godina/2. Semestar/Big Data инфраструктуре и сервиси/Projekat/kod/Kafka/Code/kafka/doc/consumer.conf", "File path to the file containing the configuration")
}

type Message struct {
	ID      string `json:"id"`
	Type    int    `json:"type"`
	Kind    int    `json:"kind"`
	Content string `json:"content"`
}

func main() {
	flag.Parse()
	defer timeutil.MeasureExecutionTime(time.Now())

	var config configurator.KafkaConsumerConfig

	// Load configuration
	err := configurator.LoadConfiguration(cfgFileName, &config)
	if err != nil {
		log.Fatalf("Failed to load configuration: %s", err)
	}

	// Create new Kafka reader
	r := kafka.NewReader(kafka.ReaderConfig{
		Brokers:     strings.Split(config.BootstrapServers, ","),
		GroupID:     config.GroupID,
		Topic:       config.Topic,
		StartOffset: kafka.LastOffset,
		MaxBytes:    config.MaxBytes, // 10MB max message size
	})

	// Close the reader when main function exits
	defer r.Close()

	// Handle OS signals for graceful shutdown
	sigchan := make(chan os.Signal, 1)
	signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

	// Start consuming messages
	for {
		select {
		case <-sigchan:
			slog.Info("Received shutdown signal. Shutting down consumer...")
			return
		default:
			// Read a message from Kafka
			ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
			msg, err := r.ReadMessage(ctx)
			cancel()

			if err != nil {
				slog.Error(fmt.Sprintf("Error reading message: %v", err))
				continue
			}

			// Process the message
			var message Message
			err = json.Unmarshal(msg.Value, &message)
			if err != nil {
				slog.Error(fmt.Sprintf("Error decoding message: %v", err))
				continue
			}

			// Handle the message
			slog.Info(fmt.Sprintf("Received message: %+v\n", message))
		}
	}
}

Config for go program

{
    "topic":"json_test_topic",
    "bootstrapServers":"localhost:9090,localhost:9091,localhost:9092",
    "groupId": "test-group",
    "batchBytes": 1000000
}

Expected Behavior

Program should consume messages from kafka and output them to console

Observed Behavior
Nothing happens

2024/05/13 22:22:21 ERROR Error reading message: fetching message: context deadline exceeded
^C2024/05/13 22:22:26 ERROR Error reading message: fetching message: context deadline exceeded
2024/05/13 22:22:26 INFO Received shutdown signal. Shutting down consumer...
2024/05/13 22:22:34 INFO Elapsed time: 18.00940981s
@SpaskeISO SpaskeISO added the bug label May 13, 2024
@junbach
Copy link

junbach commented Jun 25, 2024

Hi @SpaskeISO are there any new messages coming through all these time ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants