-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[chore] [deltatocumulative]: linear histograms #36486
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,182 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package deltatocumulativeprocessor | ||
|
||
import ( | ||
"context" | ||
"math/rand/v2" | ||
"strconv" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/require" | ||
"go.opentelemetry.io/collector/consumer" | ||
"go.opentelemetry.io/collector/consumer/consumertest" | ||
"go.opentelemetry.io/collector/pdata/pcommon" | ||
"go.opentelemetry.io/collector/pdata/pmetric" | ||
|
||
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" | ||
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest" | ||
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/histo" | ||
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testing/sdktest" | ||
) | ||
|
||
var out *consumertest.MetricsSink | ||
|
||
func BenchmarkProcessor(gb *testing.B) { | ||
const ( | ||
metrics = 5 | ||
streams = 10 | ||
) | ||
|
||
type Case struct { | ||
name string | ||
fill func(m pmetric.Metric) | ||
next func(m pmetric.Metric) | ||
} | ||
|
||
run := func(b *testing.B, proc consumer.Metrics, cs Case) { | ||
md := pmetric.NewMetrics() | ||
ms := md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics() | ||
for i := range metrics { | ||
m := ms.AppendEmpty() | ||
m.SetName(strconv.Itoa(i)) | ||
cs.fill(m) | ||
} | ||
|
||
b.ReportAllocs() | ||
b.ResetTimer() | ||
b.StopTimer() | ||
|
||
ctx := context.Background() | ||
for range b.N { | ||
for i := range ms.Len() { | ||
cs.next(ms.At(i)) | ||
} | ||
req := pmetric.NewMetrics() | ||
md.CopyTo(req) | ||
|
||
b.StartTimer() | ||
err := proc.ConsumeMetrics(ctx, req) | ||
b.StopTimer() | ||
require.NoError(b, err) | ||
} | ||
} | ||
Comment on lines
+39
to
+65
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any special reason to transform this into a function? We're not reusing the code anywhere, so why not just put this inside the b.Run loop? |
||
|
||
now := time.Now() | ||
start := pcommon.NewTimestampFromTime(now) | ||
ts := pcommon.NewTimestampFromTime(now.Add(time.Minute)) | ||
|
||
cases := []Case{{ | ||
name: "sums", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a reason to split sums, histograms, and exponential histograms into separate benchmarks? Are those metric types expected to be split by separate deltatocumulative processors in real-world scenarios? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Your PR description only shows results for |
||
fill: func(m pmetric.Metric) { | ||
sum := m.SetEmptySum() | ||
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) | ||
for i := range streams { | ||
dp := sum.DataPoints().AppendEmpty() | ||
dp.SetIntValue(int64(rand.IntN(10))) | ||
dp.Attributes().PutStr("idx", strconv.Itoa(i)) | ||
dp.SetStartTimestamp(start) | ||
dp.SetTimestamp(ts) | ||
} | ||
}, | ||
next: next(pmetric.Metric.Sum), | ||
}, { | ||
name: "histogram", | ||
fill: func(m pmetric.Metric) { | ||
hist := m.SetEmptyHistogram() | ||
hist.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) | ||
for i := range streams { | ||
dp := hist.DataPoints().AppendEmpty() | ||
histo.DefaultBounds.Observe( | ||
float64(rand.IntN(1000)), | ||
float64(rand.IntN(1000)), | ||
float64(rand.IntN(1000)), | ||
float64(rand.IntN(1000)), | ||
).CopyTo(dp) | ||
|
||
dp.SetStartTimestamp(start) | ||
dp.SetTimestamp(ts) | ||
dp.Attributes().PutStr("idx", strconv.Itoa(i)) | ||
} | ||
}, | ||
next: next(pmetric.Metric.Histogram), | ||
}, { | ||
name: "exponential", | ||
fill: func(m pmetric.Metric) { | ||
ex := m.SetEmptyExponentialHistogram() | ||
ex.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) | ||
for i := range streams { | ||
dp := ex.DataPoints().AppendEmpty() | ||
o := expotest.Observe(expo.Scale(2), | ||
float64(rand.IntN(31)+1), | ||
float64(rand.IntN(31)+1), | ||
float64(rand.IntN(31)+1), | ||
float64(rand.IntN(31)+1), | ||
) | ||
o.CopyTo(dp.Positive()) | ||
o.CopyTo(dp.Negative()) | ||
|
||
dp.SetStartTimestamp(start) | ||
dp.SetTimestamp(ts) | ||
dp.Attributes().PutStr("idx", strconv.Itoa(i)) | ||
} | ||
}, | ||
next: next(pmetric.Metric.ExponentialHistogram), | ||
}} | ||
|
||
tel := func(n int) sdktest.Spec { | ||
total := int64(n * metrics * streams) | ||
tracked := int64(metrics * streams) | ||
return sdktest.Expect(map[string]sdktest.Metric{ | ||
"otelcol_deltatocumulative.datapoints.linear": { | ||
Type: sdktest.TypeSum, | ||
Numbers: []sdktest.Number{{Int: &total}}, | ||
Monotonic: true, | ||
}, | ||
"otelcol_deltatocumulative.streams.tracked.linear": { | ||
Type: sdktest.TypeSum, | ||
Numbers: []sdktest.Number{{Int: &tracked}}, | ||
}, | ||
}) | ||
} | ||
|
||
for _, cs := range cases { | ||
gb.Run(cs.name, func(b *testing.B) { | ||
st := setup(b, nil) | ||
out = st.sink | ||
run(b, st.proc, cs) | ||
|
||
// verify all dps are processed without error | ||
b.StopTimer() | ||
if err := sdktest.Test(tel(b.N), st.tel.reader); err != nil { | ||
b.Fatal(err) | ||
} | ||
Comment on lines
+153
to
+155
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this a benchmark or a test? I'm unsure if I'm missing something, but it seems you're trying to do both...? Is it an option to split them to make the code easier to understand? Trying to accomplish all things at once also makes the code more fragile since we will also break all things at once if we make a mistake in the future |
||
}) | ||
} | ||
} | ||
|
||
func next[ | ||
T interface{ DataPoints() Ps }, | ||
Ps interface { | ||
At(int) P | ||
Len() int | ||
}, | ||
P interface { | ||
Timestamp() pcommon.Timestamp | ||
SetStartTimestamp(pcommon.Timestamp) | ||
SetTimestamp(pcommon.Timestamp) | ||
}, | ||
](sel func(pmetric.Metric) T) func(m pmetric.Metric) { | ||
Comment on lines
+160
to
+171
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do not understand any of this 😭. What are we trying to accomplish here? What is I don't mind code duplication if it makes the code more readable 😬 |
||
return func(m pmetric.Metric) { | ||
dps := sel(m).DataPoints() | ||
for i := range dps.Len() { | ||
dp := dps.At(i) | ||
dp.SetStartTimestamp(dp.Timestamp()) | ||
dp.SetTimestamp(pcommon.NewTimestampFromTime( | ||
dp.Timestamp().AsTime().Add(time.Minute), | ||
)) | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ import ( | |
"fmt" | ||
|
||
"go.opentelemetry.io/collector/pdata/pcommon" | ||
"go.opentelemetry.io/collector/pdata/pmetric" | ||
|
||
exp "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" | ||
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" | ||
|
@@ -83,10 +84,17 @@ func (e ErrGap) Error() string { | |
return fmt.Sprintf("gap in stream from %s to %s. samples were likely lost in transit", e.From, e.To) | ||
} | ||
|
||
type Type interface { | ||
pmetric.NumberDataPoint | pmetric.HistogramDataPoint | pmetric.ExponentialHistogramDataPoint | ||
|
||
StartTimestamp() pcommon.Timestamp | ||
Timestamp() pcommon.Timestamp | ||
} | ||
|
||
// AccumulateInto adds state and dp, storing the result in state | ||
// | ||
// state = state + dp | ||
func AccumulateInto[P data.Point[P]](state P, dp P) error { | ||
func AccumulateInto[T Type](state, dp T) error { | ||
switch { | ||
case dp.StartTimestamp() < state.StartTimestamp(): | ||
// belongs to older series | ||
|
@@ -96,6 +104,16 @@ func AccumulateInto[P data.Point[P]](state P, dp P) error { | |
return ErrOutOfOrder{Last: state.Timestamp(), Sample: dp.Timestamp()} | ||
} | ||
|
||
state.Add(dp) | ||
switch dp := any(dp).(type) { | ||
case pmetric.NumberDataPoint: | ||
state := any(state).(pmetric.NumberDataPoint) | ||
data.Number{NumberDataPoint: state}.Add(data.Number{NumberDataPoint: dp}) | ||
case pmetric.HistogramDataPoint: | ||
state := any(state).(pmetric.HistogramDataPoint) | ||
data.Histogram{HistogramDataPoint: state}.Add(data.Histogram{HistogramDataPoint: dp}) | ||
case pmetric.ExponentialHistogramDataPoint: | ||
state := any(state).(pmetric.ExponentialHistogramDataPoint) | ||
data.ExpHistogram{DataPoint: state}.Add(data.ExpHistogram{DataPoint: dp}) | ||
} | ||
Comment on lines
+107
to
+117
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This refactor effectively eliminates the need for the data package, as we no longer rely on type characteristics. I'll refactor datapoint addition in a future PR, making this part more clear, maybe like this: var add data.Aggregator = new(data.Add)
switch into := any(dp).(type) {
case pmetric.NumberDataPoint:
add.Numbers(into, dp)
case pmetric.HistogramDataPoint:
add.Histograms(into, dp)
} |
||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package telemetry // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/lineartelemetry" | ||
|
||
import "go.opentelemetry.io/otel/attribute" | ||
|
||
type Attributes []attribute.KeyValue | ||
|
||
func (a *Attributes) Set(attr attribute.KeyValue) { | ||
*a = append(*a, attr) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,7 +47,7 @@ func (m Metric) AggregationTemporality() pmetric.AggregationTemporality { | |
return pmetric.AggregationTemporalityUnspecified | ||
} | ||
|
||
func (m Metric) Typed() any { | ||
func (m Metric) Typed() Any { | ||
//exhaustive:enforce | ||
switch m.Type() { | ||
case pmetric.MetricTypeSum: | ||
|
@@ -63,3 +63,49 @@ func (m Metric) Typed() any { | |
} | ||
panic("unreachable") | ||
} | ||
|
||
var ( | ||
_ Any = Sum{} | ||
_ Any = Gauge{} | ||
_ Any = ExpHistogram{} | ||
_ Any = Histogram{} | ||
_ Any = Summary{} | ||
) | ||
Comment on lines
+67
to
+73
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just curious, why do we need to do this? |
||
|
||
type Any interface { | ||
Len() int | ||
Ident() identity.Metric | ||
|
||
SetAggregationTemporality(pmetric.AggregationTemporality) | ||
} | ||
|
||
func (m Metric) Filter(ok func(id identity.Stream, dp any) bool) { | ||
mid := m.Ident() | ||
switch m.Type() { | ||
case pmetric.MetricTypeSum: | ||
m.Sum().DataPoints().RemoveIf(func(dp pmetric.NumberDataPoint) bool { | ||
id := identity.OfStream(mid, dp) | ||
return !ok(id, dp) | ||
}) | ||
case pmetric.MetricTypeGauge: | ||
m.Gauge().DataPoints().RemoveIf(func(dp pmetric.NumberDataPoint) bool { | ||
id := identity.OfStream(mid, dp) | ||
return !ok(id, dp) | ||
}) | ||
case pmetric.MetricTypeHistogram: | ||
m.Histogram().DataPoints().RemoveIf(func(dp pmetric.HistogramDataPoint) bool { | ||
id := identity.OfStream(mid, dp) | ||
return !ok(id, dp) | ||
}) | ||
case pmetric.MetricTypeExponentialHistogram: | ||
m.ExponentialHistogram().DataPoints().RemoveIf(func(dp pmetric.ExponentialHistogramDataPoint) bool { | ||
id := identity.OfStream(mid, dp) | ||
return !ok(id, dp) | ||
}) | ||
case pmetric.MetricTypeSummary: | ||
m.Summary().DataPoints().RemoveIf(func(dp pmetric.SummaryDataPoint) bool { | ||
id := identity.OfStream(mid, dp) | ||
return !ok(id, dp) | ||
}) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we remove the abstraction of
run
, we could also move this closer to where it's used.