Skip to content

Commit

Permalink
feat: adds trace exporter override and jaeger exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
nilslice committed Sep 22, 2023
1 parent 6c2a358 commit 1a22f48
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 20 deletions.
27 changes: 25 additions & 2 deletions go/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ type TraceEvent struct {

// Shared implementation for all Adapters
type AdapterBase struct {
TraceEvents chan TraceEvent
TraceEvents chan TraceEvent
OtelTraceExporter OtelTraceExporter

stop chan bool
eventBucket *EventBucket
flusher Flusher
Expand Down Expand Up @@ -89,7 +91,7 @@ func (b *AdapterBase) Stop(wait bool) {
// MakeOtelCallSpans recursively constructs call spans in open telemetry format
func (b *AdapterBase) MakeOtelCallSpans(event CallEvent, parentId []byte, traceId string) []*trace.Span {
name := event.FunctionName()
span := NewOtelSpan(traceId, parentId, name, event.Time, event.Time.Add(event.Duration))
span := b.NewOtelSpan(traceId, parentId, name, event.Time, event.Time.Add(event.Duration))
span.Attributes = append(span.Attributes, NewOtelKeyValueString("function-name", fmt.Sprintf("function-call-%s", name)))

spans := []*trace.Span{span}
Expand All @@ -112,6 +114,27 @@ func (b *AdapterBase) MakeOtelCallSpans(event CallEvent, parentId []byte, traceI
return spans
}

func (b AdapterBase) NewOtelSpan(traceId string, parentId []byte, name string, start, end time.Time) *trace.Span {
if parentId == nil {
parentId = []byte{}
}

if b.OtelTraceExporter != nil {
return b.OtelTraceExporter(traceId, parentId, name, start, end)
}

return &trace.Span{
TraceId: []byte(traceId),
SpanId: []byte(NewSpanId().ToHex8()),
ParentSpanId: parentId,
Name: name,
Kind: 1,
StartTimeUnixNano: uint64(start.UnixNano()),
EndTimeUnixNano: uint64(end.UnixNano()),
// uses empty defaults for remaining fields...
}
}

// Definition of how to filter our Spans to reduce noise
type SpanFilter struct {
MinDuration time.Duration
Expand Down
33 changes: 33 additions & 0 deletions go/adapter/opentelemetry/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package opentelemetry

import (
"context"
"encoding/binary"
"encoding/hex"
"log"
"time"

Expand Down Expand Up @@ -43,6 +45,10 @@ func (a *OTelAdapter) UseCustomClient(client otlptrace.Client) {
}
}

func (a *OTelAdapter) UseTraceExporter(exporter observe.OtelTraceExporter) {
a.AdapterBase.OtelTraceExporter = exporter
}

// NewOTelAdapter will create an instance of an OTelAdapter using the configuration to construct
// an otlptrace.Client based on the Protocol set in the config.
func NewOTelAdapter(config *OTelConfig) *OTelAdapter {
Expand Down Expand Up @@ -83,6 +89,32 @@ func NewOTelAdapter(config *OTelConfig) *OTelAdapter {
return adapter
}

func JaegerTraceExporter(traceId string, parentId []byte, name string, start, end time.Time) *trace.Span {
if parentId == nil {
parentId = []byte{}
}

traceIdB, err := hex.DecodeString(traceId)
if err != nil {
panic(err)
}

spanId := observe.NewSpanId().Msb()
spanIdB := make([]byte, 8)
binary.LittleEndian.PutUint64(spanIdB, spanId)

return &trace.Span{
TraceId: traceIdB,
SpanId: spanIdB,
ParentSpanId: parentId,
Name: name,
Kind: 1,
StartTimeUnixNano: uint64(start.UnixNano()),
EndTimeUnixNano: uint64(end.UnixNano()),
// uses empty defaults for remaining fields...
}
}

func (h *OTelAdapter) Start(ctx context.Context) {
h.AdapterBase.Start(ctx, h)
h.Config.client.Start(ctx)
Expand Down Expand Up @@ -129,6 +161,7 @@ func (h *OTelAdapter) Flush(evts []observe.TraceEvent) error {
}

t := observe.NewOtelTrace(traceId, h.Config.ServiceName, allSpans)

if te.AdapterMeta != nil {
meta, ok := te.AdapterMeta.(map[string]string)
if ok {
Expand Down
6 changes: 4 additions & 2 deletions go/bin/opentelemetry/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ func main() {
ServiceName: "golang",
EmitTracesInterval: time.Second * 1,
TraceBatchMax: 100,
Endpoint: "localhost:4318",
Protocol: opentelemetry.HTTP,
Endpoint: "localhost:4317",
Protocol: opentelemetry.GRPC,
AllowInsecure: true, // for localhost in dev via http
}
adapter := opentelemetry.NewOTelAdapter(conf)
// use an exporter which satisfies the observe.TraceExporter function
adapter.UseTraceExporter(opentelemetry.JaegerTraceExporter)
defer adapter.StopWithContext(ctx, true)
adapter.Start(ctx)

Expand Down
46 changes: 30 additions & 16 deletions go/otel_formatter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package observe

import (
"encoding/binary"
"encoding/hex"
"time"

common "go.opentelemetry.io/proto/otlp/common/v1"
Expand All @@ -13,6 +15,8 @@ type OtelTrace struct {
TracesData *trace.TracesData
}

type OtelTraceExporter func(traceId string, parentId []byte, name string, start, end time.Time) *trace.Span

func NewOtelTrace(traceId string, serviceName string, spans []*trace.Span) *OtelTrace {
return &OtelTrace{
TraceId: traceId,
Expand Down Expand Up @@ -47,22 +51,6 @@ func (t *OtelTrace) SetMetadata(te *TraceEvent, meta map[string]string) {
}
}

func NewOtelSpan(traceId string, parentId []byte, name string, start, end time.Time) *trace.Span {
if parentId == nil {
parentId = []byte{}
}
return &trace.Span{
TraceId: []byte(traceId),
SpanId: []byte(NewSpanId().ToHex8()),
ParentSpanId: parentId,
Name: name,
Kind: 1,
StartTimeUnixNano: uint64(start.UnixNano()),
EndTimeUnixNano: uint64(end.UnixNano()),
// uses empty defaults for remaining fields...
}
}

func NewOtelKeyValueString(key string, value string) *common.KeyValue {
strVal := &common.AnyValue_StringValue{
StringValue: value,
Expand Down Expand Up @@ -118,3 +106,29 @@ func AddOtelKeyValueInt64(kvs ...*common.KeyValue) *common.KeyValue {
}
return nil
}

func NewOtelJagerSpan(traceId string, parentId []byte, name string, start, end time.Time) *trace.Span {
if parentId == nil {
parentId = []byte{}
}

traceIdB, err := hex.DecodeString(traceId)
if err != nil {
panic(err)
}

spanId := NewSpanId().msb
spanIdB := make([]byte, 8)
binary.LittleEndian.PutUint64(spanIdB, spanId)

return &trace.Span{
TraceId: traceIdB,
SpanId: spanIdB,
ParentSpanId: parentId,
Name: name,
Kind: 1,
StartTimeUnixNano: uint64(start.UnixNano()),
EndTimeUnixNano: uint64(end.UnixNano()),
// uses empty defaults for remaining fields...
}
}
48 changes: 48 additions & 0 deletions go/telemetry.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package observe

import (
"encoding/hex"
"fmt"
"log"
"math/rand"
"strconv"
"time"
)

Expand Down Expand Up @@ -36,6 +39,29 @@ func NewSpanId() TelemetryId {
}
}

func TelemetryIdFromString(tid string) (TelemetryId, error) {
id, err := strconv.ParseInt(tid, 10, 64)
if err != nil {
return TelemetryId{}, nil
}

return TelemetryId{
msb: uint64(id) << 4,
lsb: uint64(id) << 4,
}, nil
}

type TraceId struct{ TelemetryId }
type SpanId struct{ TelemetryId }

func (id TelemetryId) Msb() uint64 {
return id.msb
}

func (id TelemetryId) Lsb() uint64 {
return id.lsb
}

// Encode this id into an 8 byte hex (16 chars)
// Just uses the least significant of the 16 bytes
func (t TelemetryId) ToHex8() string {
Expand All @@ -52,3 +78,25 @@ func (t TelemetryId) ToHex16() string {
func (t TelemetryId) ToUint64() uint64 {
return t.lsb
}

func (t TelemetryId) ToRawTraceIdBytes() []byte {
traceId := t.ToHex16()
traceIdB, err := hex.DecodeString(traceId)
if err != nil {
log.Println(traceId, "convert traceid to raw bytes:", err)
return make([]byte, 0)
}

return traceIdB
}

func (t TelemetryId) ToRawSpanIdBytes() []byte {
spanId := t.ToHex8()
spanIdB, err := hex.DecodeString(spanId)
if err != nil {
log.Println(spanId, "convert spanid to raw bytes:", err)
return make([]byte, 0)
}

return spanIdB
}

0 comments on commit 1a22f48

Please sign in to comment.