Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip attributes if the value is nan #465

Merged
merged 14 commits into from
Dec 2, 2024
23 changes: 17 additions & 6 deletions exporter/clickhousetracesexporter/clickhouse_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,14 @@ func newStructuredSpan(otelSpan ptrace.Span, ServiceName string, resource pcommo
IsColumn: false,
}
if v.Type() == pcommon.ValueTypeDouble {
numberTagMap[k] = v.Double()
spanAttribute.NumberValue = v.Double()
spanAttribute.DataType = "float64"
if utils.IsValidFloat(v.Double()) {
numberTagMap[k] = v.Double()
spanAttribute.NumberValue = v.Double()
spanAttribute.DataType = "float64"
} else {
zap.S().Warn("NaN value in tag map, skipping key: ", zap.String("key", k))
return true
}
} else if v.Type() == pcommon.ValueTypeInt {
numberTagMap[k] = float64(v.Int())
spanAttribute.NumberValue = float64(v.Int())
Expand All @@ -329,9 +334,15 @@ func newStructuredSpan(otelSpan ptrace.Span, ServiceName string, resource pcommo
}
resourceAttrs[k] = v.AsString()
if v.Type() == pcommon.ValueTypeDouble {
numberTagMap[k] = v.Double()
spanAttribute.NumberValue = v.Double()
spanAttribute.DataType = "float64"
if utils.IsValidFloat(v.Double()) {
numberTagMap[k] = v.Double()
spanAttribute.NumberValue = v.Double()
spanAttribute.DataType = "float64"
} else {
zap.S().Warn("NaN value in tag map, skipping key: ", zap.String("key", k))
return true
}

} else if v.Type() == pcommon.ValueTypeInt {
numberTagMap[k] = float64(v.Int())
spanAttribute.NumberValue = float64(v.Int())
Expand Down
36 changes: 23 additions & 13 deletions exporter/clickhousetracesexporter/clickhouse_exporter_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,14 @@ func (attrMap *attributesData) add(key string, value pcommon.Value) {
}

if value.Type() == pcommon.ValueTypeDouble {
attrMap.NumberMap[key] = value.Double()
spanAttribute.NumberValue = value.Double()
spanAttribute.DataType = "float64"
if utils.IsValidFloat(value.Double()) {
attrMap.NumberMap[key] = value.Double()
spanAttribute.NumberValue = value.Double()
spanAttribute.DataType = "float64"
} else {
zap.S().Warn("NaN value in tag map, skipping key: ", zap.String("key", key))
return
}
} else if value.Type() == pcommon.ValueTypeInt {
attrMap.NumberMap[key] = float64(value.Int())
spanAttribute.NumberValue = float64(value.Int())
Expand All @@ -141,9 +146,14 @@ func (attrMap *attributesData) add(key string, value pcommon.Value) {
tSpanAttribute.StringValue = tempVal
tSpanAttribute.DataType = "string"
case float64:
attrMap.NumberMap[tempKey] = tempVal
tSpanAttribute.NumberValue = tempVal
tSpanAttribute.DataType = "float64"
if utils.IsValidFloat(tempVal) {
attrMap.NumberMap[tempKey] = tempVal
tSpanAttribute.NumberValue = tempVal
tSpanAttribute.DataType = "float64"
} else {
zap.S().Warn("NaN value in tag map, skipping key: ", zap.String("key", tempKey))
continue
}
case bool:
attrMap.BoolMap[tempKey] = tempVal
tSpanAttribute.DataType = "bool"
Expand Down Expand Up @@ -330,12 +340,14 @@ func (s *storage) pushTraceDataV3(ctx context.Context, td ptrace.Traces) error {

structuredSpan, err := newStructuredSpanV3(uint64(lBucketStart), fp, span, serviceName, rs.Resource(), s.config)
if err != nil {
zap.S().Error("Error in creating newStructuredSpanV3: ", err)
return err
return fmt.Errorf("failed to create newStructuredSpanV3: %w", err)
}
batchOfSpans = append(batchOfSpans, structuredSpan)

serializedStructuredSpan, _ := json.Marshal(structuredSpan)
serializedStructuredSpan, err := json.Marshal(structuredSpan)
if err != nil {
return fmt.Errorf("failed to marshal structured span: %w", err)
}
size += len(serializedStructuredSpan)
count += 1
}
Expand All @@ -348,15 +360,13 @@ func (s *storage) pushTraceDataV3(ctx context.Context, td ptrace.Traces) error {

err := s.Writer.WriteBatchOfSpansV3(ctx, batchOfSpans, metrics)
if err != nil {
zap.S().Error("Error in writing spans to clickhouse: ", err)
return err
return fmt.Errorf("error in writing spans to clickhouse: %w", err)
}

// write the resources
err = s.Writer.WriteResourcesV3(ctx, resourcesSeen)
if err != nil {
zap.S().Error("Error in writing resources to clickhouse: ", err)
return err
return fmt.Errorf("error in writing resources to clickhouse: %w", err)
}

return nil
Expand Down
26 changes: 10 additions & 16 deletions exporter/clickhousetracesexporter/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ func (w *SpanWriter) writeIndexBatch(ctx context.Context, batchSpans []*Span) er
}()
statement, err = w.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s", w.traceDatabase, w.indexTable), driver.WithReleaseConnection())
if err != nil {
w.logger.Error("Could not prepare batch for index table: ", zap.Error(err))
return err
return fmt.Errorf("could not prepare batch for index table: %w", err)
}

for _, span := range batchSpans {
Expand Down Expand Up @@ -155,8 +154,7 @@ func (w *SpanWriter) writeIndexBatch(ctx context.Context, batchSpans []*Span) er
span.SpanKind,
)
if err != nil {
w.logger.Error("Could not append span to batch: ", zap.Object("span", span), zap.Error(err))
return err
return fmt.Errorf("could not append span to batch: %w", err)
}
}

Expand Down Expand Up @@ -188,8 +186,7 @@ func (w *SpanWriter) writeModelBatch(ctx context.Context, batchSpans []*Span) er

statement, err = w.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s", w.traceDatabase, w.spansTable), driver.WithReleaseConnection())
if err != nil {
w.logger.Error("Could not prepare batch for model table: ", zap.Error(err))
return err
return fmt.Errorf("could not prepare batch for model table: %w", err)
}

metrics := map[string]usage.Metric{}
Expand All @@ -199,18 +196,17 @@ func (w *SpanWriter) writeModelBatch(ctx context.Context, batchSpans []*Span) er
usageMap.TagMap = map[string]string{}
serialized, err = json.Marshal(span.TraceModel)
if err != nil {
return err
return fmt.Errorf("could not marshal trace model: %w", err)
}
serializedUsage, err := json.Marshal(usageMap)

serializedUsage, err := json.Marshal(usageMap)
if err != nil {
return err
return fmt.Errorf("could not marshal usage map: %w", err)
}

err = statement.Append(time.Unix(0, int64(span.StartTimeUnixNano)), span.TraceId, string(serialized))
if err != nil {
w.logger.Error("Could not append span to batch: ", zap.Object("span", span), zap.Error(err))
return err
return fmt.Errorf("could not append span to batch: %w", err)
}

if !w.useNewSchema {
Expand All @@ -227,7 +223,7 @@ func (w *SpanWriter) writeModelBatch(ctx context.Context, batchSpans []*Span) er
)
stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds())))
if err != nil {
return err
return fmt.Errorf("could not send batch to model table: %w", err)
}

if !w.useNewSchema {
Expand All @@ -244,16 +240,14 @@ func (w *SpanWriter) WriteBatchOfSpans(ctx context.Context, batch []*Span) error
// inserts to the singoz_spans table
if w.spansTable != "" {
if err := w.writeModelBatch(ctx, batch); err != nil {
w.logger.Error("Could not write a batch of spans to model table: ", zap.Error(err))
return err
return fmt.Errorf("could not write a batch of spans to model table: %w", err)
}
}

// inserts to the signoz_index_v2 table
if w.indexTable != "" {
if err := w.writeIndexBatch(ctx, batch); err != nil {
w.logger.Error("Could not write a batch of spans to index table: ", zap.Error(err))
return err
return fmt.Errorf("could not write a batch of spans to index table: %w", err)
}
}

Expand Down
30 changes: 10 additions & 20 deletions exporter/clickhousetracesexporter/writerV3.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ func (w *SpanWriter) writeIndexBatchV3(ctx context.Context, batchSpans []*SpanV3
}()
statement, err = w.db.PrepareBatch(ctx, fmt.Sprintf(insertTraceSQLTemplateV2, w.traceDatabase, w.indexTableV3), driver.WithReleaseConnection())
if err != nil {
w.logger.Error("Could not prepare batch for index table: ", zap.Error(err))
return err
return fmt.Errorf("could not prepare batch for index table: %w", err)
}
for _, span := range batchSpans {
err = statement.Append(
Expand Down Expand Up @@ -67,8 +66,7 @@ func (w *SpanWriter) writeIndexBatchV3(ctx context.Context, batchSpans []*SpanV3
span.IsRemote,
)
if err != nil {
w.logger.Error("Could not append span to batch: ", zap.Any("span", span), zap.Error(err))
return err
return fmt.Errorf("could not append span to batch: %w", err)
}
}

Expand All @@ -95,8 +93,7 @@ func (w *SpanWriter) writeErrorBatchV3(ctx context.Context, batchSpans []*SpanV3
}()
statement, err = w.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s", w.traceDatabase, w.errorTable), driver.WithReleaseConnection())
if err != nil {
w.logger.Error("Could not prepare batch for error table: ", zap.Error(err))
return err
return fmt.Errorf("could not prepare batch for error table: %w", err)
}

for _, span := range batchSpans {
Expand All @@ -118,8 +115,7 @@ func (w *SpanWriter) writeErrorBatchV3(ctx context.Context, batchSpans []*SpanV3
span.ResourcesString,
)
if err != nil {
w.logger.Error("Could not append span to batch: ", zap.Any("span", span), zap.Error(err))
return err
return fmt.Errorf("could not append span to batch: %w", err)
}
}
}
Expand Down Expand Up @@ -151,13 +147,11 @@ func (w *SpanWriter) writeTagBatchV3(ctx context.Context, batchSpans []*SpanV3)
}()
tagStatement, err = w.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s", w.traceDatabase, w.attributeTable), driver.WithReleaseConnection())
if err != nil {
w.logger.Error("Could not prepare batch for span attributes table due to error: ", zap.Error(err))
return err
return fmt.Errorf("could not prepare batch for span attributes table due to error: %w", err)
}
tagKeyStatement, err = w.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s", w.traceDatabase, w.attributeKeyTable), driver.WithReleaseConnection())
if err != nil {
w.logger.Error("Could not prepare batch for span attributes key table due to error: ", zap.Error(err))
return err
return fmt.Errorf("could not prepare batch for span attributes key table due to error: %w", err)
}
// create map of span attributes of key, tagType, dataType and isColumn to avoid duplicates in batch
mapOfSpanAttributeKeys := make(map[string]struct{})
Expand Down Expand Up @@ -192,8 +186,7 @@ func (w *SpanWriter) writeTagBatchV3(ctx context.Context, batchSpans []*SpanV3)
spanAttribute.IsColumn,
)
if err != nil {
w.logger.Error("Could not append span to tagKey Statement to batch due to error: ", zap.Error(err), zap.Any("span", span))
return err
return fmt.Errorf("could not append span to tagKey Statement to batch due to error: %w", err)
}
}
// add mapOfSpanAttributeKey to map
Expand Down Expand Up @@ -231,8 +224,7 @@ func (w *SpanWriter) writeTagBatchV3(ctx context.Context, batchSpans []*SpanV3)
)
}
if err != nil {
w.logger.Error("Could not append span to tag Statement batch due to error: ", zap.Error(err), zap.Any("span", span))
return err
return fmt.Errorf("could not append span to tag Statement batch due to error: %w", err)
}
}
}
Expand All @@ -247,8 +239,7 @@ func (w *SpanWriter) writeTagBatchV3(ctx context.Context, batchSpans []*SpanV3)
writeLatencyMillis.M(int64(time.Since(tagStart).Milliseconds())),
)
if err != nil {
w.logger.Error("Could not write to span attributes table due to error: ", zap.Error(err))
return err
return fmt.Errorf("could not write to span attributes table due to error: %w", err)
}

tagKeyStart := time.Now()
Expand All @@ -261,8 +252,7 @@ func (w *SpanWriter) writeTagBatchV3(ctx context.Context, batchSpans []*SpanV3)
writeLatencyMillis.M(int64(time.Since(tagKeyStart).Milliseconds())),
)
if err != nil {
w.logger.Error("Could not write to span attributes key table due to error: ", zap.Error(err))
return err
return fmt.Errorf("could not write to span attributes key table due to error: %w", err)
}

return err
Expand Down
6 changes: 6 additions & 0 deletions utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package utils

import (
"encoding/hex"
"math"

"go.opentelemetry.io/collector/pdata/pcommon"
)
Expand All @@ -19,3 +20,8 @@ func SpanIDToHexOrEmptyString(spanID pcommon.SpanID) string {
}
return ""
}

func IsValidFloat(value float64) bool {
// Check for NaN, +/-Inf
return !math.IsNaN(value) && !math.IsInf(value, 0)
}