Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closing kafka Writer during WriteMessages causes a potential hang #1307

Open
ionutboangiu opened this issue Jul 8, 2024 · 0 comments
Open
Labels

Comments

@ionutboangiu
Copy link

Describe the bug

Closing the kafka.Writer while attempting to write a message can cause the process to hang indefinitely. This issue is resolved by closing the writer again.

Kafka Version

  • Kafka Version: 3.7.0
  • kafka-go Version: current main branch

To Reproduce

package main

import (
	"context"
	"log"
	"net"
	"strconv"
	"time"

	"github.com/segmentio/kafka-go"
)

func main() {
	brokerURL := "localhost:9092"
	topic := "test"
	createTopic(topic, brokerURL)
	writer := &kafka.Writer{
		Addr:   kafka.TCP(brokerURL),
		Topic:  topic,
		Logger: log.Default(),

		Transport: &kafka.Transport{
			Dial: (&net.Dialer{
				Timeout: 3 * time.Second,
			}).DialContext,
		},
	}
	done := make(chan struct{})
	go func() {
		time.Sleep(1 * time.Millisecond)
		writer.Close()
		close(done)
	}()
	if err := writer.WriteMessages(context.Background(), kafka.Message{
		Value: []byte("payload"),
	}); err != nil {
		log.Fatal(err)
	}

	<-done
	log.Print("does not reach this point")
}

func createTopic(name, brokerURL string) {
	conn, err := kafka.Dial("tcp", brokerURL)
	if err != nil {
		panic(err)
	}
	defer conn.Close()

	controller, err := conn.Controller()
	if err != nil {
		panic(err)
	}
	var controllerConn *kafka.Conn
	controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
	if err != nil {
		panic(err)
	}
	defer controllerConn.Close()

	err = controllerConn.CreateTopics(kafka.TopicConfig{
		Topic:             name,
		NumPartitions:     1,
		ReplicationFactor: 1,
	})
	if err != nil {
		panic(err)
	}
}

Expected Behavior

Expecting kafka.Writer.Close() not to hang.

Observed Behavior

Hangs here: w.group.Wait()

*batchQueue.cond.Wait() waits for a broadcast signal, preventing the decrementing of the waitgroup counter. The signal would typically come from closing the writer here, but at that point, the w.writers map is still empty.

Sequence of function calls leading to it:

Closing the writer again resolves the issue.

Additional Context

This issue happened in an application that creates writers with different transports, which are closed after a configurable period of inactivity. The transports are not reused, causing the related goroutines (*connPool).discover and (*conn).run to also hang indefinitely.

This is almost a non-issue in real scenarios unless the inactivity period is set to a very low duration. I can also work around it by closing the writer again if the previous close doesn't complete in time. However, I thought you might want to know about it regardless.

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant