Skip to content

Commit

Permalink
[exporter/elasticsearch] Add mapping mode bodymap (#35637)
Browse files Browse the repository at this point in the history
#### Description

This PR implements a new mapping mode `bodymap` that works by
serializing each LogRecord body as-is into a separate document for
ingestion.

Fixes #35444

#### Testing

#### Documentation

---------

Co-authored-by: Carson Ip <carsonip@users.noreply.github.com>
  • Loading branch information
mauri870 and carsonip authored Oct 17, 2024
1 parent 694e7cf commit e7ebc6e
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 3 deletions.
27 changes: 27 additions & 0 deletions .chloggen/feature_elasticsearch_mapping_bodymap.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Introduce an experimental bodymap mapping mode for logs

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35444]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
9 changes: 7 additions & 2 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,19 @@ behaviours, which may be configured through the following settings:
- `none`: Use original fields and event structure from the OTLP event.
- `ecs`: Try to map fields to [Elastic Common Schema (ECS)][ECS]
- `otel`: Elastic's preferred "OTel-native" mapping mode. Uses original fields and event structure from the OTLP event.
- :warning: This mode's behavior is unstable, it is currently is experimental and undergoing changes.
- :warning: This mode's behavior is unstable, it is currently experimental and undergoing changes.
- There's a special treatment for the following attributes: `data_stream.type`, `data_stream.dataset`, `data_stream.namespace`. Instead of serializing these values under the `*attributes.*` namespace, they're put at the root of the document, to conform with the conventions of the data stream naming scheme that maps these as `constant_keyword` fields.
- `data_stream.dataset` will always be appended with `.otel`. It is recommended to use with `*_dynamic_index.enabled: true` to route documents to data stream `${data_stream.type}-${data_stream.dataset}-${data_stream.namespace}`.
- Span events are stored in separate documents. They will be routed with `data_stream.type` set to `logs` if `traces_dynamic_index::enabled` is `true`.

- `raw`: Omit the `Attributes.` string prefixed to field names for log and
span attributes as well as omit the `Events.` string prefixed to
field names for span events.
field names for span events.
- `bodymap`: Provides fine-grained control over the final documents to be ingested.
:warning: This mode's behavior is unstable, it is currently experimental and undergoing changes.
It works only for logs where the log record body is a map. Each LogRecord
body is serialized to JSON as-is and becomes a separate document for ingestion.
If the log record body is not a map, the exporter will log a warning and drop the log record.
- `dedup` (DEPRECATED). This configuration is deprecated and non-operational,
and will be removed in the future. Object keys are always deduplicated to
avoid Elasticsearch rejecting documents.
Expand Down
4 changes: 4 additions & 0 deletions exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ const (
MappingECS
MappingOTel
MappingRaw
MappingBodyMap
)

var (
Expand All @@ -224,6 +225,8 @@ func (m MappingMode) String() string {
return "otel"
case MappingRaw:
return "raw"
case MappingBodyMap:
return "bodymap"
default:
return ""
}
Expand All @@ -236,6 +239,7 @@ var mappingModes = func() map[string]MappingMode {
MappingECS,
MappingOTel,
MappingRaw,
MappingBodyMap,
} {
table[strings.ToLower(m.String())] = m
}
Expand Down
5 changes: 5 additions & 0 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs)
return cerr
}

if errors.Is(err, ErrInvalidTypeForBodyMapMode) {
e.Logger.Warn("dropping log record", zap.Error(err))
continue
}

errs = append(errs, err)
}
}
Expand Down
125 changes: 125 additions & 0 deletions exporter/elasticsearchexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,131 @@ func TestExporterLogs(t *testing.T) {
rec.WaitItems(1)
})

t.Run("publish with bodymap encoding", func(t *testing.T) {
tableTests := []struct {
name string
body func() pcommon.Value
expected string
}{
{
name: "flat",
body: func() pcommon.Value {
body := pcommon.NewValueMap()
m := body.Map()
m.PutStr("@timestamp", "2024-03-12T20:00:41.123456789Z")
m.PutInt("id", 1)
m.PutStr("key", "value")
return body
},
expected: `{"@timestamp":"2024-03-12T20:00:41.123456789Z","id":1,"key":"value"}`,
},
{
name: "dotted key",
body: func() pcommon.Value {
body := pcommon.NewValueMap()
m := body.Map()
m.PutInt("a", 1)
m.PutInt("a.b", 2)
m.PutInt("a.b.c", 3)
return body
},
expected: `{"a":1,"a.b":2,"a.b.c":3}`,
},
{
name: "slice",
body: func() pcommon.Value {
body := pcommon.NewValueMap()
m := body.Map()
s := m.PutEmptySlice("a")
for i := 0; i < 2; i++ {
s.AppendEmpty().SetInt(int64(i))
}
return body
},
expected: `{"a":[0,1]}`,
},
{
name: "inner map",
body: func() pcommon.Value {
body := pcommon.NewValueMap()
m := body.Map()
m1 := m.PutEmptyMap("a")
m1.PutInt("b", 1)
m1.PutInt("c", 2)
return body
},
expected: `{"a":{"b":1,"c":2}}`,
},
{
name: "nested map",
body: func() pcommon.Value {
body := pcommon.NewValueMap()
m := body.Map()
m1 := m.PutEmptyMap("a")
m2 := m1.PutEmptyMap("b")
m2.PutInt("c", 1)
m2.PutInt("d", 2)
return body
},
expected: `{"a":{"b":{"c":1,"d":2}}}`,
},
}

for _, tt := range tableTests {
t.Run(tt.name, func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)
assert.JSONEq(t, tt.expected, string(docs[0].Document))
return itemsAllOK(docs)
})

exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) {
cfg.Mapping.Mode = "bodymap"
})
logs := plog.NewLogs()
resourceLogs := logs.ResourceLogs().AppendEmpty()
scopeLogs := resourceLogs.ScopeLogs().AppendEmpty()
logRecords := scopeLogs.LogRecords()
logRecord := logRecords.AppendEmpty()
tt.body().CopyTo(logRecord.Body())

mustSendLogs(t, exporter, logs)
rec.WaitItems(1)
})
}
})

t.Run("drops log records for bodymap mode if body is not a map", func(t *testing.T) {
logs := plog.NewLogs()
resourceLogs := logs.ResourceLogs().AppendEmpty()
scopeLogs := resourceLogs.ScopeLogs().AppendEmpty()
logRecords := scopeLogs.LogRecords()

// Invalid body type should be dropped.
logRecords.AppendEmpty().Body().SetEmptySlice()

// We should still process the valid records in the batch.
bodyMap := logRecords.AppendEmpty().Body().SetEmptyMap()
bodyMap.PutInt("a", 42)

rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
defer rec.Record(docs)
assert.Len(t, docs, 1)
assert.JSONEq(t, `{"a":42}`, string(docs[0].Document))
return itemsAllOK(docs)
})

exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) {
cfg.Mapping.Mode = "bodymap"
})

err := exporter.ConsumeLogs(context.Background(), logs)
assert.NoError(t, err)
rec.WaitItems(1)
})

t.Run("publish with dedot", func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
Expand Down
2 changes: 1 addition & 1 deletion exporter/elasticsearchexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/elastic/go-docappender/v2 v2.3.0
github.com/elastic/go-elasticsearch/v7 v7.17.10
github.com/elastic/go-structform v0.0.12
github.com/json-iterator/go v1.1.12
github.com/lestrrat-go/strftime v1.1.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.111.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.111.0
Expand Down Expand Up @@ -43,7 +44,6 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.10 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
Expand Down
14 changes: 14 additions & 0 deletions exporter/elasticsearchexporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"slices"
"time"

jsoniter "github.com/json-iterator/go"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand Down Expand Up @@ -65,6 +66,8 @@ var resourceAttrsToPreserve = map[string]bool{
semconv.AttributeHostName: true,
}

var ErrInvalidTypeForBodyMapMode = errors.New("invalid log record body type for 'bodymap' mapping mode")

type mappingModel interface {
encodeLog(pcommon.Resource, string, plog.LogRecord, pcommon.InstrumentationScope, string) ([]byte, error)
encodeSpan(pcommon.Resource, string, ptrace.Span, pcommon.InstrumentationScope, string) ([]byte, error)
Expand Down Expand Up @@ -107,6 +110,8 @@ func (m *encodeModel) encodeLog(resource pcommon.Resource, resourceSchemaURL str
document = m.encodeLogECSMode(resource, record, scope)
case MappingOTel:
document = m.encodeLogOTelMode(resource, resourceSchemaURL, record, scope, scopeSchemaURL)
case MappingBodyMap:
return m.encodeLogBodyMapMode(record)
default:
document = m.encodeLogDefaultMode(resource, record, scope)
}
Expand Down Expand Up @@ -138,6 +143,15 @@ func (m *encodeModel) encodeLogDefaultMode(resource pcommon.Resource, record plo
return document
}

func (m *encodeModel) encodeLogBodyMapMode(record plog.LogRecord) ([]byte, error) {
body := record.Body()
if body.Type() != pcommon.ValueTypeMap {
return nil, fmt.Errorf("%w: %q", ErrInvalidTypeForBodyMapMode, body.Type())
}

return jsoniter.Marshal(body.Map().AsRaw())
}

func (m *encodeModel) encodeLogOTelMode(resource pcommon.Resource, resourceSchemaURL string, record plog.LogRecord, scope pcommon.InstrumentationScope, scopeSchemaURL string) objmodel.Document {
var document objmodel.Document

Expand Down
40 changes: 40 additions & 0 deletions exporter/elasticsearchexporter/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1225,3 +1225,43 @@ func TestEncodeLogScalarObjectConflict(t *testing.T) {
fooValue = gjson.GetBytes(encoded, "Attributes\\.foo\\.value")
assert.Equal(t, "foovalue", fooValue.Str)
}

func TestEncodeLogBodyMapMode(t *testing.T) {
// craft a log record with a body map
logs := plog.NewLogs()
resourceLogs := logs.ResourceLogs().AppendEmpty()
scopeLogs := resourceLogs.ScopeLogs().AppendEmpty()
logRecords := scopeLogs.LogRecords()
observedTimestamp := pcommon.Timestamp(time.Now().UnixNano())

logRecord := logRecords.AppendEmpty()
logRecord.SetObservedTimestamp(observedTimestamp)

bodyMap := pcommon.NewMap()
bodyMap.PutStr("@timestamp", "2024-03-12T20:00:41.123456789Z")
bodyMap.PutInt("id", 1)
bodyMap.PutStr("key", "value")
bodyMap.PutStr("key.a", "a")
bodyMap.PutStr("key.a.b", "b")
bodyMap.PutDouble("pi", 3.14)
bodyMap.CopyTo(logRecord.Body().SetEmptyMap())

m := encodeModel{}
got, err := m.encodeLogBodyMapMode(logRecord)
require.NoError(t, err)

require.JSONEq(t, `{
"@timestamp": "2024-03-12T20:00:41.123456789Z",
"id": 1,
"key": "value",
"key.a": "a",
"key.a.b": "b",
"pi": 3.14
}`, string(got))

// invalid body map
logRecord.Body().SetEmptySlice()
_, err = m.encodeLogBodyMapMode(logRecord)
require.Error(t, err)
require.ErrorIs(t, err, ErrInvalidTypeForBodyMapMode)
}

0 comments on commit e7ebc6e

Please sign in to comment.