From 368d918803b0ab60523e35c3f23b1bb26b097756 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Fri, 15 Nov 2024 23:41:10 +0000 Subject: [PATCH 01/14] Make sync bulk indexer respect flush::bytes --- exporter/elasticsearchexporter/bulkindexer.go | 27 +++++++++---------- exporter/elasticsearchexporter/factory.go | 4 +++ 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/exporter/elasticsearchexporter/bulkindexer.go b/exporter/elasticsearchexporter/bulkindexer.go index 1700b03619d6..2358c4567c31 100644 --- a/exporter/elasticsearchexporter/bulkindexer.go +++ b/exporter/elasticsearchexporter/bulkindexer.go @@ -88,6 +88,7 @@ func newSyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config return &syncBulkIndexer{ config: bulkIndexerConfig(client, config), flushTimeout: config.Timeout, + flushBytes: config.Flush.Bytes, retryConfig: config.Retry, logger: logger, } @@ -96,6 +97,7 @@ func newSyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config type syncBulkIndexer struct { config docappender.BulkIndexerConfig flushTimeout time.Duration + flushBytes int retryConfig RetrySettings logger *zap.Logger } @@ -124,8 +126,15 @@ type syncBulkIndexerSession struct { } // Add adds an item to the sync bulk indexer session. -func (s *syncBulkIndexerSession) Add(_ context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string) error { - return s.bi.Add(docappender.BulkIndexerItem{Index: index, Body: document, DynamicTemplates: dynamicTemplates}) +func (s *syncBulkIndexerSession) Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string) error { + err := s.bi.Add(docappender.BulkIndexerItem{Index: index, Body: document, DynamicTemplates: dynamicTemplates}) + if err != nil { + return err + } + if s.bi.Len() > s.s.flushBytes { + return s.Flush(ctx) + } + return nil } // End is a no-op. @@ -170,16 +179,6 @@ func newAsyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, confi numWorkers = runtime.NumCPU() } - flushInterval := config.Flush.Interval - if flushInterval == 0 { - flushInterval = 30 * time.Second - } - - flushBytes := config.Flush.Bytes - if flushBytes == 0 { - flushBytes = 5e+6 - } - pool := &asyncBulkIndexer{ wg: sync.WaitGroup{}, items: make(chan docappender.BulkIndexerItem, config.NumWorkers), @@ -195,9 +194,9 @@ func newAsyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, confi w := asyncBulkIndexerWorker{ indexer: bi, items: pool.items, - flushInterval: flushInterval, + flushInterval: config.Flush.Interval, flushTimeout: config.Timeout, - flushBytes: flushBytes, + flushBytes: config.Flush.Bytes, logger: logger, stats: &pool.stats, } diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index 9cb5da2af966..e79883d99962 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -94,6 +94,10 @@ func createDefaultConfig() component.Config { MaxSizeItems: 10000, }, }, + Flush: FlushSettings{ + Bytes: 5e+6, + Interval: 30 * time.Second, + }, } } From c8d838e199b12931aa2184a8c74da15f066d8188 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Fri, 15 Nov 2024 23:48:50 +0000 Subject: [PATCH 02/14] Fix tests --- exporter/elasticsearchexporter/bulkindexer_test.go | 4 ++++ exporter/elasticsearchexporter/config_test.go | 9 ++++++--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/exporter/elasticsearchexporter/bulkindexer_test.go b/exporter/elasticsearchexporter/bulkindexer_test.go index e0213adaab79..4f423fbb1799 100644 --- a/exporter/elasticsearchexporter/bulkindexer_test.go +++ b/exporter/elasticsearchexporter/bulkindexer_test.go @@ -122,6 +122,7 @@ func TestAsyncBulkIndexer_requireDataStream(t *testing.T) { config: Config{ NumWorkers: 1, Mapping: MappingsSettings{Mode: MappingECS.String()}, + Flush: FlushSettings{Interval: time.Hour, Bytes: 1e+8}, }, wantRequireDataStream: false, }, @@ -130,6 +131,7 @@ func TestAsyncBulkIndexer_requireDataStream(t *testing.T) { config: Config{ NumWorkers: 1, Mapping: MappingsSettings{Mode: MappingOTel.String()}, + Flush: FlushSettings{Interval: time.Hour, Bytes: 1e+8}, }, wantRequireDataStream: true, }, @@ -252,6 +254,7 @@ func TestAsyncBulkIndexer_logRoundTrip(t *testing.T) { config: Config{ NumWorkers: 1, ClientConfig: confighttp.ClientConfig{Compression: "none"}, + Flush: FlushSettings{Interval: time.Hour, Bytes: 1e+8}, }, }, { @@ -259,6 +262,7 @@ func TestAsyncBulkIndexer_logRoundTrip(t *testing.T) { config: Config{ NumWorkers: 1, ClientConfig: confighttp.ClientConfig{Compression: "gzip"}, + Flush: FlushSettings{Interval: time.Hour, Bytes: 1e+8}, }, }, } diff --git a/exporter/elasticsearchexporter/config_test.go b/exporter/elasticsearchexporter/config_test.go index baec2bd9646a..bc803173440b 100644 --- a/exporter/elasticsearchexporter/config_test.go +++ b/exporter/elasticsearchexporter/config_test.go @@ -93,7 +93,8 @@ func TestConfig(t *testing.T) { OnStart: true, }, Flush: FlushSettings{ - Bytes: 10485760, + Bytes: 10485760, + Interval: 30 * time.Second, }, Retry: RetrySettings{ Enabled: true, @@ -164,7 +165,8 @@ func TestConfig(t *testing.T) { OnStart: true, }, Flush: FlushSettings{ - Bytes: 10485760, + Bytes: 10485760, + Interval: 30 * time.Second, }, Retry: RetrySettings{ Enabled: true, @@ -235,7 +237,8 @@ func TestConfig(t *testing.T) { OnStart: true, }, Flush: FlushSettings{ - Bytes: 10485760, + Bytes: 10485760, + Interval: 30 * time.Second, }, Retry: RetrySettings{ Enabled: true, From cc8fbc8d16b9189193cabfc894be498a0843f12c Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Fri, 15 Nov 2024 23:52:08 +0000 Subject: [PATCH 03/14] Update readme --- exporter/elasticsearchexporter/README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 53a093109557..7ec4d60f4c27 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -97,7 +97,9 @@ The Elasticsearch exporter supports the [common `batcher` settings](https://gith By default, the exporter will perform its own buffering and batching, as configured through the `flush` config, and `batcher` will be unused. By setting `batcher::enabled` to either `true` or `false`, the exporter will not perform any of its own buffering or batching, and the `flush` config -will be ignored. In a future release when the `batcher` config is stable, and has feature parity +will be ignored, except `flush::bytes`. +`flush::bytes` can be used to limit the size of a bulk request, compressed or not, but the actual request body size may overshoot the limit. +In a future release when the `batcher` config is stable, and has feature parity with the exporter's existing `flush` config, it will be enabled by default. Using the common `batcher` functionality provides several benefits over the default behavior: From e7ea01f585cf68315e64fd8c7731063c2bbcc037 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Sat, 16 Nov 2024 00:00:29 +0000 Subject: [PATCH 04/14] Add changelog --- ...xporter_sync-bulk-indexer-flush-bytes.yaml | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 .chloggen/elasticsearchexporter_sync-bulk-indexer-flush-bytes.yaml diff --git a/.chloggen/elasticsearchexporter_sync-bulk-indexer-flush-bytes.yaml b/.chloggen/elasticsearchexporter_sync-bulk-indexer-flush-bytes.yaml new file mode 100644 index 000000000000..cb51f9335a94 --- /dev/null +++ b/.chloggen/elasticsearchexporter_sync-bulk-indexer-flush-bytes.yaml @@ -0,0 +1,29 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Make sync bulk indexer respect `flush::bytes` + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36163] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + Make sync bulk indexer respect flush::bytes to roughly limit the bulk request size. + Sync bulk indexer is used when batcher::enabled is either true or false. In order words, sync bulk indexer is not used when batcher config is undefined. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] From 2ab7b3dd92cc66820bc6963c3fc65ba544aef60c Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Sat, 16 Nov 2024 00:14:59 +0000 Subject: [PATCH 05/14] Add test --- .../elasticsearchexporter/bulkindexer_test.go | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/exporter/elasticsearchexporter/bulkindexer_test.go b/exporter/elasticsearchexporter/bulkindexer_test.go index 4f423fbb1799..e09a26f121b7 100644 --- a/exporter/elasticsearchexporter/bulkindexer_test.go +++ b/exporter/elasticsearchexporter/bulkindexer_test.go @@ -9,6 +9,7 @@ import ( "io" "net/http" "strings" + "sync/atomic" "testing" "time" @@ -320,3 +321,28 @@ func runBulkIndexerOnce(t *testing.T, config *Config, client *elasticsearch.Clie return bulkIndexer } + +func TestSyncBulkIndexer_flushBytes(t *testing.T) { + var reqCnt atomic.Int64 + cfg := Config{NumWorkers: 1, Flush: FlushSettings{Interval: time.Hour, Bytes: 1}} + client, err := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{ + RoundTripFunc: func(r *http.Request) (*http.Response, error) { + if r.URL.Path == "/_bulk" { + reqCnt.Add(1) + } + return &http.Response{ + Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}}, + Body: io.NopCloser(strings.NewReader(successResp)), + }, nil + }, + }}) + require.NoError(t, err) + + bi := newSyncBulkIndexer(zap.NewNop(), client, &cfg) + session, err := bi.StartSession(context.Background()) + require.NoError(t, err) + + assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil)) + assert.Equal(t, int64(1), reqCnt.Load()) // flush due to flush::bytes + assert.NoError(t, bi.Close(context.Background())) +} From 96ed8370023caaead5cc921caa9af040cb10717c Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Sat, 16 Nov 2024 00:28:27 +0000 Subject: [PATCH 06/14] Use >= for consistency --- exporter/elasticsearchexporter/bulkindexer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/elasticsearchexporter/bulkindexer.go b/exporter/elasticsearchexporter/bulkindexer.go index 2358c4567c31..a5213ef50ffc 100644 --- a/exporter/elasticsearchexporter/bulkindexer.go +++ b/exporter/elasticsearchexporter/bulkindexer.go @@ -131,7 +131,7 @@ func (s *syncBulkIndexerSession) Add(ctx context.Context, index string, document if err != nil { return err } - if s.bi.Len() > s.s.flushBytes { + if s.bi.Len() >= s.s.flushBytes { return s.Flush(ctx) } return nil From 14ca06c2bb2934ed2232fd45ff9620f29f3fbb64 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 18 Nov 2024 13:04:36 +0000 Subject: [PATCH 07/14] Update changelog --- ...elasticsearchexporter_sync-bulk-indexer-flush-bytes.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.chloggen/elasticsearchexporter_sync-bulk-indexer-flush-bytes.yaml b/.chloggen/elasticsearchexporter_sync-bulk-indexer-flush-bytes.yaml index cb51f9335a94..74cec1fe274f 100644 --- a/.chloggen/elasticsearchexporter_sync-bulk-indexer-flush-bytes.yaml +++ b/.chloggen/elasticsearchexporter_sync-bulk-indexer-flush-bytes.yaml @@ -7,7 +7,7 @@ change_type: enhancement component: elasticsearchexporter # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). -note: Make sync bulk indexer respect `flush::bytes` +note: Respect `flush::bytes` in sync bulk indexer # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. issues: [36163] @@ -16,8 +16,8 @@ issues: [36163] # These lines will be padded with 2 spaces and then inserted directly into the document. # Use pipe (|) for multiline entries. subtext: - Make sync bulk indexer respect flush::bytes to roughly limit the bulk request size. - Sync bulk indexer is used when batcher::enabled is either true or false. In order words, sync bulk indexer is not used when batcher config is undefined. + Limit the bulk request size to roughly `flush::bytes` for sync bulk indexer. + Sync bulk indexer is used when `batcher::enabled` is either true or false. In order words, sync bulk indexer is not used when batcher config is undefined. # If your change doesn't affect end users or the exported elements of any package, # you should instead start your pull request title with [chore] or use the "Skip Changelog" label. From 3ce7e5b4b2532a7aac94812e52584ee750efda4e Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 18 Nov 2024 13:20:50 +0000 Subject: [PATCH 08/14] Update README --- exporter/elasticsearchexporter/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 7ec4d60f4c27..f01f18aa05aa 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -211,7 +211,7 @@ The behaviour of this bulk indexing can be configured with the following setting - `retry_on_status` (default=[429]): Status codes that trigger request or document level retries. Request level retry and document level retry status codes are shared and cannot be configured separately. To avoid duplicates, it defaults to `[429]`. > [!NOTE] -> The `flush` config will be ignored when `batcher::enabled` config is explicitly set to `true` or `false`. +> The `flush::interval` config will be ignored when `batcher::enabled` config is explicitly set to `true` or `false`. ### Elasticsearch node discovery From 6377281e89f6e5751803f56f00f868fd3222875b Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Tue, 19 Nov 2024 16:27:30 +0000 Subject: [PATCH 09/14] Improve docs --- exporter/elasticsearchexporter/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index f01f18aa05aa..0d6cb938911a 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -200,7 +200,7 @@ The behaviour of this bulk indexing can be configured with the following setting - `num_workers` (default=runtime.NumCPU()): Number of workers publishing bulk requests concurrently. - `flush`: Event bulk indexer buffer flush settings - - `bytes` (default=5000000): Write buffer flush size limit. + - `bytes` (default=5000000): Write buffer flush size limit. A bulk request will be sent immediately when its buffer exceeds this limit. This value should be much lower than [Elasticsearch `http.max_content_length`](https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html#http-settings) config to avoid HTTP 413 Entity Too Large error. It is recommended to keep this value under 5MB. - `interval` (default=30s): Write buffer flush time limit. - `retry`: Elasticsearch bulk request retry settings - `enabled` (default=true): Enable/Disable request retry on error. Failed requests are retried with exponential backoff. From f8e8d3776285692e0547dfd9ad4dab62f9741820 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Tue, 19 Nov 2024 17:30:33 +0000 Subject: [PATCH 10/14] Use UncompressedLen --- exporter/elasticsearchexporter/README.md | 4 ++-- exporter/elasticsearchexporter/bulkindexer.go | 7 ++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 0d6cb938911a..fd7321ffb111 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -98,7 +98,7 @@ By default, the exporter will perform its own buffering and batching, as configu `flush` config, and `batcher` will be unused. By setting `batcher::enabled` to either `true` or `false`, the exporter will not perform any of its own buffering or batching, and the `flush` config will be ignored, except `flush::bytes`. -`flush::bytes` can be used to limit the size of a bulk request, compressed or not, but the actual request body size may overshoot the limit. +`flush::bytes` can be used to limit the size of a bulk request, but the actual request body size may overshoot the limit. In a future release when the `batcher` config is stable, and has feature parity with the exporter's existing `flush` config, it will be enabled by default. @@ -200,7 +200,7 @@ The behaviour of this bulk indexing can be configured with the following setting - `num_workers` (default=runtime.NumCPU()): Number of workers publishing bulk requests concurrently. - `flush`: Event bulk indexer buffer flush settings - - `bytes` (default=5000000): Write buffer flush size limit. A bulk request will be sent immediately when its buffer exceeds this limit. This value should be much lower than [Elasticsearch `http.max_content_length`](https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html#http-settings) config to avoid HTTP 413 Entity Too Large error. It is recommended to keep this value under 5MB. + - `bytes` (default=5000000): Write buffer flush size limit before compression. A bulk request will be sent immediately when its buffer exceeds this limit. This value should be much lower than [Elasticsearch `http.max_content_length`](https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html#http-settings) config to avoid HTTP 413 Entity Too Large error. It is recommended to keep this value under 5MB. - `interval` (default=30s): Write buffer flush time limit. - `retry`: Elasticsearch bulk request retry settings - `enabled` (default=true): Enable/Disable request retry on error. Failed requests are retried with exponential backoff. diff --git a/exporter/elasticsearchexporter/bulkindexer.go b/exporter/elasticsearchexporter/bulkindexer.go index a5213ef50ffc..5e762749a65d 100644 --- a/exporter/elasticsearchexporter/bulkindexer.go +++ b/exporter/elasticsearchexporter/bulkindexer.go @@ -131,7 +131,7 @@ func (s *syncBulkIndexerSession) Add(ctx context.Context, index string, document if err != nil { return err } - if s.bi.Len() >= s.s.flushBytes { + if s.bi.UncompressedLen() >= s.s.flushBytes { return s.Flush(ctx) } return nil @@ -297,8 +297,9 @@ func (w *asyncBulkIndexerWorker) run() { w.logger.Error("error adding item to bulk indexer", zap.Error(err)) } - // w.indexer.Len() can be either compressed or uncompressed bytes - if w.indexer.Len() >= w.flushBytes { + // flush bytes should operate on uncompressed length + // as Elasticsearch http.max_content_length measures uncompressed length. + if w.indexer.UncompressedLen() >= w.flushBytes { w.flush() flushTick.Reset(w.flushInterval) } From d4aa307382d44318c14174aa2ff7fd76c04ac839 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Tue, 19 Nov 2024 17:47:21 +0000 Subject: [PATCH 11/14] Remove default max_size_items --- ...elasticsearchexporter_sync-bulk-indexer-flush-bytes.yaml | 6 ++++-- exporter/elasticsearchexporter/README.md | 2 +- exporter/elasticsearchexporter/config_test.go | 6 +++--- exporter/elasticsearchexporter/factory.go | 2 +- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/.chloggen/elasticsearchexporter_sync-bulk-indexer-flush-bytes.yaml b/.chloggen/elasticsearchexporter_sync-bulk-indexer-flush-bytes.yaml index 74cec1fe274f..979d80bdfad8 100644 --- a/.chloggen/elasticsearchexporter_sync-bulk-indexer-flush-bytes.yaml +++ b/.chloggen/elasticsearchexporter_sync-bulk-indexer-flush-bytes.yaml @@ -1,13 +1,13 @@ # Use this changelog template to create an entry for release notes. # One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' -change_type: enhancement +change_type: breaking # The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) component: elasticsearchexporter # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). -note: Respect `flush::bytes` in sync bulk indexer +note: Respect `flush::bytes` in sync bulk indexer, `flush::bytes` measures uncompressed size, change default `batcher::max_size_items` to `0` # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. issues: [36163] @@ -18,6 +18,8 @@ issues: [36163] subtext: Limit the bulk request size to roughly `flush::bytes` for sync bulk indexer. Sync bulk indexer is used when `batcher::enabled` is either true or false. In order words, sync bulk indexer is not used when batcher config is undefined. + Change `flush::bytes` to always measure in uncompressed bytes. + Change default `batcher::max_size_items` to `0` as bulk request size limit is now more effectively enforced by `flush::bytes`. # If your change doesn't affect end users or the exported elements of any package, # you should instead start your pull request title with [chore] or use the "Skip Changelog" label. diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index fd7321ffb111..391debba8a1a 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -91,7 +91,7 @@ The Elasticsearch exporter supports the [common `batcher` settings](https://gith - `batcher`: - `enabled` (default=unset): Enable batching of requests into a single bulk request. - `min_size_items` (default=5000): Minimum number of log records / spans in the buffer to trigger a flush immediately. - - `max_size_items` (default=10000): Maximum number of log records / spans in a request. + - `max_size_items` (default=0): Maximum number of log records / spans in a request. - `flush_timeout` (default=30s): Maximum time of the oldest item spent inside the buffer, aka "max age of buffer". A flush will happen regardless of the size of content in buffer. By default, the exporter will perform its own buffering and batching, as configured through the diff --git a/exporter/elasticsearchexporter/config_test.go b/exporter/elasticsearchexporter/config_test.go index bc803173440b..f3f848db2c90 100644 --- a/exporter/elasticsearchexporter/config_test.go +++ b/exporter/elasticsearchexporter/config_test.go @@ -118,7 +118,7 @@ func TestConfig(t *testing.T) { MinSizeItems: 5000, }, MaxSizeConfig: exporterbatcher.MaxSizeConfig{ - MaxSizeItems: 10000, + MaxSizeItems: 0, }, }, }, @@ -190,7 +190,7 @@ func TestConfig(t *testing.T) { MinSizeItems: 5000, }, MaxSizeConfig: exporterbatcher.MaxSizeConfig{ - MaxSizeItems: 10000, + MaxSizeItems: 0, }, }, }, @@ -262,7 +262,7 @@ func TestConfig(t *testing.T) { MinSizeItems: 5000, }, MaxSizeConfig: exporterbatcher.MaxSizeConfig{ - MaxSizeItems: 10000, + MaxSizeItems: 0, }, }, }, diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index e79883d99962..4783d430196a 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -91,7 +91,7 @@ func createDefaultConfig() component.Config { MinSizeItems: 5000, }, MaxSizeConfig: exporterbatcher.MaxSizeConfig{ - MaxSizeItems: 10000, + MaxSizeItems: 0, }, }, Flush: FlushSettings{ From 60be79f587cfee1fafb1ddbc1116f9d72f8d3141 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Tue, 19 Nov 2024 17:48:35 +0000 Subject: [PATCH 12/14] Add comment --- exporter/elasticsearchexporter/bulkindexer.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/exporter/elasticsearchexporter/bulkindexer.go b/exporter/elasticsearchexporter/bulkindexer.go index 5e762749a65d..2200216be4ef 100644 --- a/exporter/elasticsearchexporter/bulkindexer.go +++ b/exporter/elasticsearchexporter/bulkindexer.go @@ -131,6 +131,8 @@ func (s *syncBulkIndexerSession) Add(ctx context.Context, index string, document if err != nil { return err } + // flush bytes should operate on uncompressed length + // as Elasticsearch http.max_content_length measures uncompressed length. if s.bi.UncompressedLen() >= s.s.flushBytes { return s.Flush(ctx) } From 2ba0d4f93c72ac15cdf0ef1d9c7536ab18f34219 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Wed, 20 Nov 2024 15:58:39 +0000 Subject: [PATCH 13/14] Advise against using max_size_items --- exporter/elasticsearchexporter/README.md | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 391debba8a1a..5bae4132f448 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -89,16 +89,15 @@ The Elasticsearch exporter supports the common [`sending_queue` settings][export The Elasticsearch exporter supports the [common `batcher` settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterbatcher/config.go). - `batcher`: - - `enabled` (default=unset): Enable batching of requests into a single bulk request. - - `min_size_items` (default=5000): Minimum number of log records / spans in the buffer to trigger a flush immediately. - - `max_size_items` (default=0): Maximum number of log records / spans in a request. - - `flush_timeout` (default=30s): Maximum time of the oldest item spent inside the buffer, aka "max age of buffer". A flush will happen regardless of the size of content in buffer. + - `enabled` (default=unset): Enable batching of requests into 1 or more bulk requests. On a batcher flush, it is possible for a batched request to be translated to more than 1 bulk request due to `flush::bytes`. + - `min_size_items` (default=5000): Minimum number of log records / spans / data points in the batched request to immediately trigger a batcher flush. + - `max_size_items` (default=0): Maximum number of log records / spans / data points in a batched request. To limit bulk request size, configure `flush::bytes` instead. :warning: It is recommended to keep `max_size_items` as 0 as a non-zero value may lead to broken metrics grouping and indexing rejections. + - `flush_timeout` (default=30s): Maximum time of the oldest item spent inside the batcher buffer, aka "max age of batcher buffer". A batcher flush will happen regardless of the size of content in batcher buffer. By default, the exporter will perform its own buffering and batching, as configured through the `flush` config, and `batcher` will be unused. By setting `batcher::enabled` to either `true` or -`false`, the exporter will not perform any of its own buffering or batching, and the `flush` config -will be ignored, except `flush::bytes`. -`flush::bytes` can be used to limit the size of a bulk request, but the actual request body size may overshoot the limit. +`false`, the exporter will not perform any of its own buffering or batching, and the `flush::interval` config +will be ignored. In a future release when the `batcher` config is stable, and has feature parity with the exporter's existing `flush` config, it will be enabled by default. From ac14168c2ed8deef3886d919765276df85023691 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Thu, 21 Nov 2024 16:12:50 +0000 Subject: [PATCH 14/14] Update exporter/elasticsearchexporter/README.md Co-authored-by: Christos Markou --- exporter/elasticsearchexporter/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 5bae4132f448..13ecfa53507d 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -199,7 +199,7 @@ The behaviour of this bulk indexing can be configured with the following setting - `num_workers` (default=runtime.NumCPU()): Number of workers publishing bulk requests concurrently. - `flush`: Event bulk indexer buffer flush settings - - `bytes` (default=5000000): Write buffer flush size limit before compression. A bulk request will be sent immediately when its buffer exceeds this limit. This value should be much lower than [Elasticsearch `http.max_content_length`](https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html#http-settings) config to avoid HTTP 413 Entity Too Large error. It is recommended to keep this value under 5MB. + - `bytes` (default=5000000): Write buffer flush size limit before compression. A bulk request will be sent immediately when its buffer exceeds this limit. This value should be much lower than [Elasticsearch's `http.max_content_length`](https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html#http-settings) config to avoid HTTP 413 Entity Too Large error. It is recommended to keep this value under 5MB. - `interval` (default=30s): Write buffer flush time limit. - `retry`: Elasticsearch bulk request retry settings - `enabled` (default=true): Enable/Disable request retry on error. Failed requests are retried with exponential backoff.