Skip to content

Commit

Permalink
Merge pull request #3020 from redpanda-data/channels
Browse files Browse the repository at this point in the history
snowflake: fix channel creation leak
  • Loading branch information
rockwotj authored Nov 21, 2024
2 parents ead84e4 + 61f7f86 commit 42953e5
Show file tree
Hide file tree
Showing 17 changed files with 776 additions and 202 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ All notable changes to this project will be documented in this file.
- (Benthos) New Bloblang method `timestamp`. (@mihaitodor)
- (Benthos) New `benchmark` processor. (@ooesili)

### Fixed

- Addresses an issue where `snowflake_streaming` could create more channels than configured. (@rockwotj)

### Changed

- The `snowflake_streaming` output with `schema_evolution.enabled` set to true can now autocreate tables. (@rockwotj)
Expand Down
31 changes: 25 additions & 6 deletions docs/modules/components/pages/outputs/snowflake_streaming.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ output:
byte_size: 0
period: ""
check: ""
max_in_flight: 64
max_in_flight: 4
```
--
Expand Down Expand Up @@ -103,14 +103,16 @@ output:
this == "timestamp" => "TIMESTAMP"
_ => "VARIANT"
}
build_parallelism: 1
build_options:
parallelism: 1
chunk_size: 50000
batching:
count: 0
byte_size: 0
period: ""
check: ""
processors: [] # No default (optional)
max_in_flight: 64
max_in_flight: 4
channel_prefix: "" # No default (optional)
```
Expand Down Expand Up @@ -384,15 +386,32 @@ The input to this mapping is an object with the value and the name of the new co
*Default*: `"root = match this.value.type() {\n this == \"string\" =\u003e \"STRING\"\n this == \"bytes\" =\u003e \"BINARY\"\n this == \"number\" =\u003e \"DOUBLE\"\n this == \"bool\" =\u003e \"BOOLEAN\"\n this == \"timestamp\" =\u003e \"TIMESTAMP\"\n _ =\u003e \"VARIANT\"\n}"`
=== `build_parallelism`
=== `build_options`
The maximum amount of parallelism to use when building the output for Snowflake. The metric to watch to see if you need to change this is `snowflake_build_output_latency_ns`.
Options to optimize the time to build output data that is sent to Snowflake. The metric to watch to see if you need to change this is `snowflake_build_output_latency_ns`.
*Type*: `object`
=== `build_options.parallelism`
The maximum amount of parallelism to use.
*Type*: `int`
*Default*: `1`
=== `build_options.chunk_size`
The number of rows to chunk for parallelization.
*Type*: `int`
*Default*: `50000`
=== `batching`
Allows you to configure a xref:configuration:batching.adoc[batching policy].
Expand Down Expand Up @@ -502,7 +521,7 @@ The maximum number of messages to have in flight at a given time. Increase this
*Type*: `int`
*Default*: `64`
*Default*: `4`
=== `channel_prefix`
Expand Down
143 changes: 143 additions & 0 deletions internal/asyncroutine/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package asyncroutine

import (
"context"
"fmt"
)

type (
// Batcher is a object for managing a background goroutine that accepts a number of requests
// and executes them serially.
Batcher[Request, Response any] struct {
requestChan chan batcherRequest[Request, Response]

cancel context.CancelFunc
done chan any
}
batcherRequest[Request, Response any] struct {
req Request
respCh chan batcherResponse[Response]
}
batcherResponse[Response any] struct {
resp Response
err error
}
)

// NewBatcher creates a background goroutine that collects batches of requests and calls `fn`
// with them. `fn` should take a number of requests and return a number of responses, where the
// index of each request should line up the resulting response slice if error is `nil`.
func NewBatcher[Request any, Response any](
maxBatchSize int,
fn func(context.Context, []Request) ([]Response, error),
) (*Batcher[Request, Response], error) {
if maxBatchSize <= 0 {
return nil, fmt.Errorf("invalid maxBatchSize=%d, must be > 0", maxBatchSize)
}
b := &Batcher[Request, Response]{
requestChan: make(chan batcherRequest[Request, Response], maxBatchSize),
}
ctx, cancel := context.WithCancel(context.Background())
b.cancel = cancel
b.done = make(chan any)
go b.runLoop(ctx, fn)
return b, nil
}

func (b *Batcher[Request, Response]) runLoop(ctx context.Context, fn func(context.Context, []Request) ([]Response, error)) {
defer func() {
close(b.done)
}()
for {
batch := b.dequeueAll(ctx)
if len(batch) == 0 {
return
}
batchRequest := make([]Request, len(batch))
for i, msg := range batch {
batchRequest[i] = msg.req
}
responses, err := fn(ctx, batchRequest)
if err == nil && len(responses) != len(batch) {
err = fmt.Errorf("invalid number of responses, expected=%d got=%d", len(batch), len(responses))
}
if err != nil {
for _, msg := range batch {
msg.respCh <- batcherResponse[Response]{err: err}
}
continue
}
for i, resp := range responses {
batch[i].respCh <- batcherResponse[Response]{resp: resp}
}
}
}

func (b *Batcher[Request, Response]) dequeueAll(ctx context.Context) (batch []batcherRequest[Request, Response]) {
for {
if len(batch) >= cap(b.requestChan) {
return
}
select {
case req := <-b.requestChan:
batch = append(batch, req)
default:
if len(batch) > 0 {
return
}
// Blocking wait for next request
select {
case req := <-b.requestChan:
batch = append(batch, req)
// look and see if another request snuck in, otherwise we'll exit next iteration of the loop.
case <-ctx.Done():
return
}
}
}
}

// Submit sends a request to be batched with other requests, the response and error is returned.
func (b *Batcher[Request, Response]) Submit(ctx context.Context, req Request) (resp Response, err error) {
respCh := make(chan batcherResponse[Response], 1)
b.requestChan <- batcherRequest[Request, Response]{req, respCh}
select {
case br := <-respCh:
resp = br.resp
err = br.err
case <-ctx.Done():
err = ctx.Err()
}
return
}

// Close cancels any outgoing requests and waits for the background goroutine to exit.
//
// NOTE: One should *never* call Submit after calling Close (even if Close hasn't returned yet).
func (b *Batcher[Request, Response]) Close() {
if b.cancel == nil {
return
}
b.cancel()
<-b.done
b.done = nil
b.cancel = nil
close(b.requestChan)
for req := range b.requestChan {
req.respCh <- batcherResponse[Response]{err: context.Canceled}
}
}
101 changes: 101 additions & 0 deletions internal/asyncroutine/batcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package asyncroutine

import (
"context"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"
)

type req struct{ i int }
type resp struct{ i int }

func TestBatcherCancellation(t *testing.T) {
b, err := NewBatcher(3, func(ctx context.Context, reqs []req) (resps []resp, err error) {
<-ctx.Done()
err = ctx.Err()
return
})
require.NoError(t, err)

// test request cancellation
ctx, cancel := context.WithCancel(context.Background())
var done atomic.Bool
go func() {
_, err := b.Submit(ctx, req{1})
require.ErrorIs(t, err, context.Canceled)
done.Store(true)
}()
time.Sleep(5 * time.Millisecond)
require.False(t, done.Load())
cancel()
require.Eventually(t, func() bool { return done.Load() }, time.Second, time.Millisecond)

// test batcher cancellation
done.Store(false)
go func() {
_, err := b.Submit(context.Background(), req{1})
require.ErrorIs(t, err, context.Canceled)
done.Store(true)
}()
time.Sleep(5 * time.Millisecond)
require.False(t, done.Load())
b.Close()
require.Eventually(t, func() bool { return done.Load() }, time.Second, time.Millisecond)
}

func TestBatching(t *testing.T) {
batchSize := make(chan int)
b, err := NewBatcher(3, func(_ context.Context, reqs []req) (resps []resp, err error) {
batchSize <- len(reqs)
resps = make([]resp, len(reqs))
for i, req := range reqs {
resps[i].i = req.i
}
return
})
require.NoError(t, err)

var done, submitted sync.WaitGroup
done.Add(100)
submitted.Add(100)
for i := range 100 {
go func(i int) {
submitted.Done()
resp, err := b.Submit(context.Background(), req{i})
require.NoError(t, err)
require.Equal(t, i, resp.i)
done.Done()
}(i)
}
submitted.Wait()

// We can't strictly assert anything here without races,
// but in general we should get *some* batching
batches := 0
for batches < 100 {
size := <-batchSize
require.Greater(t, size, 0)
require.LessOrEqual(t, size, 3)
batches += size
}
done.Wait()
b.Close()
}
18 changes: 18 additions & 0 deletions internal/asyncroutine/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// package asyncroutine contains several small common patterns around async goroutines
// that allows for clean shutdown and allows for writing plugins and ignoring some of
// the boilerplate around launching goroutines and shutting them down cleanly.
package asyncroutine
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package periodic
package asyncroutine

import (
"context"
Expand All @@ -34,18 +34,18 @@ type Periodic struct {
done chan any
}

// New creates new background work that runs every `duration` and performs `work`.
func New(duration time.Duration, work func()) *Periodic {
// NewPeriodic creates new background work that runs every `duration` and performs `work`.
func NewPeriodic(duration time.Duration, work func()) *Periodic {
return &Periodic{
duration: duration,
work: func(context.Context) { work() },
}
}

// NewWithContext creates new background work that runs every `duration` and performs `work`.
// NewPeriodicWithContext creates new background work that runs every `duration` and performs `work`.
//
// Work is passed a context that is cancelled when the overall periodic is cancelled.
func NewWithContext(duration time.Duration, work func(context.Context)) *Periodic {
func NewPeriodicWithContext(duration time.Duration, work func(context.Context)) *Periodic {
return &Periodic{
duration: duration,
work: work,
Expand Down
Loading

0 comments on commit 42953e5

Please sign in to comment.