Skip to content

Commit

Permalink
feat(chdump): add OTLP logs ingester
Browse files Browse the repository at this point in the history
  • Loading branch information
tdakkota committed Dec 10, 2024
1 parent 2f6ca79 commit 0bbc9c7
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 0 deletions.
56 changes: 56 additions & 0 deletions cmd/otelbench/chdump/ingester.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package chdump

import (
"context"

"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
"golang.org/x/sync/errgroup"
)

// IngestLogs loads logs from dump and sends them to the collector.
type IngestLogs struct {
// Workers is the number of workers to use.
//
// Defaults to 1.
Workers int
}

// Run ingests logs.
func (lg IngestLogs) Run(ctx context.Context, client plogotlp.GRPCClient, tr TableReader) error {
var (
workers = max(lg.Workers, 1)
batcnCh = make(chan plog.Logs, workers)
)
grp, grpCtx := errgroup.WithContext(ctx)
grp.Go(func() error {
defer close(batcnCh)
return Consume(tr, ConsumeOptions{
OnLogs: func(t *Logs) error {
ctx := grpCtx

batch := plog.NewLogs()
t.ToOTLP(batch)

select {
case <-ctx.Done():
return ctx.Err()
case batcnCh <- batch:
return nil
}
},
})
})
for range workers {
grp.Go(func() error {
ctx := grpCtx
for batch := range batcnCh {
if _, err := client.Export(ctx, plogotlp.NewExportRequestFromLogs(batch)); err != nil {
return err
}
}
return nil
})
}
return grp.Wait()
}
77 changes: 77 additions & 0 deletions cmd/otelbench/chdump/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"slices"

"github.com/ClickHouse/ch-go/proto"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"

"github.com/go-faster/oteldb/internal/otelstorage"
)
Expand Down Expand Up @@ -75,6 +77,81 @@ func (c *Logs) Reset() {
}
}

// ToOTLP appends data from [Logs] to given batch.
func (c *Logs) ToOTLP(batch plog.Logs) {
resMap := map[otelstorage.Hash]plog.ResourceLogs{}
resLogs := batch.ResourceLogs()
for i := range resLogs.Len() {
resLog := resLogs.At(i)
attrs := otelstorage.Attrs(resLog.Resource().Attributes())
resMap[attrs.Hash()] = resLog
}

getResLog := func(resourceAttrs otelstorage.Attrs) plog.ResourceLogs {
hash := resourceAttrs.Hash()

resLog, ok := resMap[hash]
if !ok {
resLog = resLogs.AppendEmpty()
resource := resLog.Resource()
resourceAttrs.AsMap().CopyTo(resource.Attributes())

resMap[hash] = resLog
}
return resLog
}
getScopeLog := func(resLog plog.ResourceLogs, scopeAttrs otelstorage.Attrs, scopeName, scopeVersion string) plog.ScopeLogs {
scopeLogs := resLog.ScopeLogs()
scopeAttrsHash := scopeAttrs.Hash()

for i := range scopeLogs.Len() {
scopeLog := scopeLogs.At(i)
scope := scopeLog.Scope()
if scope.Name() == scopeName &&
scope.Version() == scopeVersion &&
otelstorage.Attrs(scope.Attributes()).Hash() == scopeAttrsHash {
return scopeLog
}
}
scopeLog := scopeLogs.AppendEmpty()

scope := scopeLog.Scope()
scope.SetName(scopeName)
scope.SetVersion(scopeVersion)
scopeAttrs.AsMap().CopyTo(scope.Attributes())

return scopeLog
}

for row := range c.Body.Rows() {
timestamp := c.Timestamp.Row(row)
severityText := c.SeverityText.Row(row)
severityNumber := c.SeverityNumber.Row(row)
traceFlags := c.TraceFlags.Row(row)
traceID := c.TraceID.Row(row)
spanID := c.SpanID.Row(row)
body := c.Body.Row(row)
attributes := c.Attributes.Row(row)
resource := c.Resource.Row(row)
scope := c.Scope.Row(row)
scopeName := c.ScopeName.Row(row)
scopeVersion := c.ScopeVersion.Row(row)

resLog := getResLog(resource)
scopeLog := getScopeLog(resLog, scope, scopeName, scopeVersion)
record := scopeLog.LogRecords().AppendEmpty()

record.SetTimestamp(otelstorage.NewTimestampFromTime(timestamp))
record.SetSeverityText(severityText)
record.SetSeverityNumber(plog.SeverityNumber(severityNumber))
record.SetFlags(plog.LogRecordFlags(traceFlags))
record.SetTraceID(pcommon.TraceID(traceID))
record.SetSpanID(pcommon.SpanID(spanID))
record.Body().SetStr(body)
attributes.CopyTo(record.Attributes())
}
}

func (c *Logs) columns() iter.Seq[proto.ResultColumn] {
return func(yield func(proto.ResultColumn) bool) {
for _, col := range []proto.ResultColumn{
Expand Down

0 comments on commit 0bbc9c7

Please sign in to comment.