Skip to content

Commit

Permalink
Fix: Closing kafka Writer during WriteMessages causes a potential hang
Browse files Browse the repository at this point in the history
  • Loading branch information
iamgoroot committed Jul 23, 2024
1 parent 4713019 commit 23da334
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 3 deletions.
13 changes: 10 additions & 3 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,10 @@ func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {
assignments[key] = append(assignments[key], int32(i))
}

batches := w.batchMessages(msgs, assignments)
batches, err := w.batchMessages(msgs, assignments)
if err != nil {
return err
}
if w.Async {
return nil
}
Expand Down Expand Up @@ -695,7 +698,7 @@ func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {
return werr
}

func (w *Writer) batchMessages(messages []Message, assignments map[topicPartition][]int32) map[*writeBatch][]int32 {
func (w *Writer) batchMessages(messages []Message, assignments map[topicPartition][]int32) (map[*writeBatch][]int32, error) {
var batches map[*writeBatch][]int32
if !w.Async {
batches = make(map[*writeBatch][]int32, len(assignments))
Expand All @@ -704,6 +707,10 @@ func (w *Writer) batchMessages(messages []Message, assignments map[topicPartitio
w.mutex.Lock()
defer w.mutex.Unlock()

if w.closed {
return nil, io.ErrClosedPipe
}

if w.writers == nil {
w.writers = map[topicPartition]*partitionWriter{}
}
Expand All @@ -721,7 +728,7 @@ func (w *Writer) batchMessages(messages []Message, assignments map[topicPartitio
}
}

return batches
return batches, nil
}

func (w *Writer) produce(key topicPartition, batch *writeBatch) (*ProduceResponse, error) {
Expand Down
41 changes: 41 additions & 0 deletions writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ func TestWriter(t *testing.T) {
scenario: "test write message with writer data",
function: testWriteMessageWithWriterData,
},
{
scenario: "test no new partition writers after close",
function: testWriterNoNewPartitionWritersAfterClose,
},
}

for _, test := range tests {
Expand Down Expand Up @@ -1030,6 +1034,43 @@ func testWriterOverrideConfigStats(t *testing.T) {
}
}

func testWriterNoNewPartitionWritersAfterClose(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
topic1 := makeTopic()
createTopic(t, topic1, 1)
defer deleteTopic(t, topic1)

w := newTestWriter(WriterConfig{
Topic: topic1,
})
defer w.Close() // try and close anyway after test finished

// using balancer to close writer right between first mutex is released and second mutex is taken to make map of partition writers
w.Balancer = mockBalancerFunc(func(m Message, i ...int) int {
go w.Close() // close is blocking so run in goroutine
for !w.closed { // wait until writer is marked as closed
time.Sleep(time.Millisecond)
}
w.mutex.Lock() // ability to get mutex means w.Close is waiting to finish pending tasks
defer w.mutex.Unlock()
return 0
})

msg := Message{Value: []byte("Hello World")} // no topic

if err := w.WriteMessages(ctx, msg); !errors.Is(err, io.ErrClosedPipe) {
t.Errorf("expected error: %v got: %v", io.ErrClosedPipe, err)
return
}
}

type mockBalancerFunc func(msg Message, partitions ...int) (partition int)

func (b mockBalancerFunc) Balance(msg Message, partitions ...int) int {
return b(msg, partitions...)
}

type staticBalancer struct {
partition int
}
Expand Down

0 comments on commit 23da334

Please sign in to comment.