Skip to content

Commit

Permalink
deltatocumulative: remove nested implementation
Browse files Browse the repository at this point in the history
entirely removes the nested (old) implementation of deltatocumulative,
as all tests pass using the linear one only.
  • Loading branch information
sh0rez committed Nov 22, 2024
1 parent 9196ea5 commit b1a3f12
Show file tree
Hide file tree
Showing 23 changed files with 4 additions and 1,712 deletions.
51 changes: 0 additions & 51 deletions processor/deltatocumulativeprocessor/chain.go

This file was deleted.

5 changes: 1 addition & 4 deletions processor/deltatocumulativeprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,5 @@ func createMetricsProcessor(_ context.Context, set processor.Settings, cfg compo
return nil, err
}

proc := newProcessor(pcfg, set.Logger, &ltel.TelemetryBuilder, next)
linear := newLinear(pcfg, ltel, proc)

return Chain{linear, proc}, nil
return newLinear(pcfg, ltel, next), nil
}
93 changes: 0 additions & 93 deletions processor/deltatocumulativeprocessor/internal/data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,116 +4,23 @@
package data // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data"

import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo"
)

var (
_ Point[Number] = Number{}
_ Point[Histogram] = Histogram{}
_ Point[ExpHistogram] = ExpHistogram{}
_ Point[Summary] = Summary{}
)

type Point[Self any] interface {
StartTimestamp() pcommon.Timestamp
Timestamp() pcommon.Timestamp
Attributes() pcommon.Map

Clone() Self
CopyTo(Self)

Add(Self) Self
}

type Typed[Self any] interface {
Point[Self]
Number | Histogram | ExpHistogram | Summary
}

type Number struct {
pmetric.NumberDataPoint
}

func Zero[P Typed[P]]() P {
var point P
switch ty := any(&point).(type) {
case *Number:
ty.NumberDataPoint = pmetric.NewNumberDataPoint()
case *Histogram:
ty.HistogramDataPoint = pmetric.NewHistogramDataPoint()
case *ExpHistogram:
ty.DataPoint = pmetric.NewExponentialHistogramDataPoint()
}
return point
}

func (dp Number) Clone() Number {
clone := Number{NumberDataPoint: pmetric.NewNumberDataPoint()}
if dp.NumberDataPoint != (pmetric.NumberDataPoint{}) {
dp.CopyTo(clone)
}
return clone
}

func (dp Number) CopyTo(dst Number) {
dp.NumberDataPoint.CopyTo(dst.NumberDataPoint)
}

type Histogram struct {
pmetric.HistogramDataPoint
}

func (dp Histogram) Clone() Histogram {
clone := Histogram{HistogramDataPoint: pmetric.NewHistogramDataPoint()}
if dp.HistogramDataPoint != (pmetric.HistogramDataPoint{}) {
dp.CopyTo(clone)
}
return clone
}

func (dp Histogram) CopyTo(dst Histogram) {
dp.HistogramDataPoint.CopyTo(dst.HistogramDataPoint)
}

type ExpHistogram struct {
expo.DataPoint
}

func (dp ExpHistogram) Clone() ExpHistogram {
clone := ExpHistogram{DataPoint: pmetric.NewExponentialHistogramDataPoint()}
if dp.DataPoint != (expo.DataPoint{}) {
dp.CopyTo(clone)
}
return clone
}

func (dp ExpHistogram) CopyTo(dst ExpHistogram) {
dp.DataPoint.CopyTo(dst.DataPoint)
}

type mustPoint[D Point[D]] struct{ _ D }

var (
_ = mustPoint[Number]{}
_ = mustPoint[Histogram]{}
_ = mustPoint[ExpHistogram]{}
)

type Summary struct {
pmetric.SummaryDataPoint
}

func (dp Summary) Clone() Summary {
clone := Summary{SummaryDataPoint: pmetric.NewSummaryDataPoint()}
if dp.SummaryDataPoint != (pmetric.SummaryDataPoint{}) {
dp.CopyTo(clone)
}
return clone
}

func (dp Summary) CopyTo(dst Summary) {
dp.SummaryDataPoint.CopyTo(dst.SummaryDataPoint)
}
54 changes: 0 additions & 54 deletions processor/deltatocumulativeprocessor/internal/delta/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,55 +9,9 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

exp "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams"
)

func New[D data.Point[D]]() Accumulator[D] {
return Accumulator[D]{
Map: make(exp.HashMap[D]),
}
}

var _ streams.Map[data.Number] = (*Accumulator[data.Number])(nil)

type Accumulator[D data.Point[D]] struct {
streams.Map[D]
}

func (a Accumulator[D]) Store(id streams.Ident, dp D) error {
aggr, ok := a.Map.Load(id)

// new series: initialize with current sample
if !ok {
clone := dp.Clone()
return a.Map.Store(id, clone)
}

// drop bad samples
switch {
case dp.StartTimestamp() < aggr.StartTimestamp():
// belongs to older series
return ErrOlderStart{Start: aggr.StartTimestamp(), Sample: dp.StartTimestamp()}
case dp.Timestamp() <= aggr.Timestamp():
// out of order
return ErrOutOfOrder{Last: aggr.Timestamp(), Sample: dp.Timestamp()}
}

// detect gaps
var gap error
if dp.StartTimestamp() > aggr.Timestamp() {
gap = ErrGap{From: aggr.Timestamp(), To: dp.StartTimestamp()}
}

res := aggr.Add(dp)
if err := a.Map.Store(id, res); err != nil {
return err
}
return gap
}

type ErrOlderStart struct {
Start pcommon.Timestamp
Sample pcommon.Timestamp
Expand All @@ -76,14 +30,6 @@ func (e ErrOutOfOrder) Error() string {
return fmt.Sprintf("out of order: dropped sample from time=%s, because series is already at time=%s", e.Sample, e.Last)
}

type ErrGap struct {
From, To pcommon.Timestamp
}

func (e ErrGap) Error() string {
return fmt.Sprintf("gap in stream from %s to %s. samples were likely lost in transit", e.From, e.To)
}

type Type interface {
pmetric.NumberDataPoint | pmetric.HistogramDataPoint | pmetric.ExponentialHistogramDataPoint

Expand Down
Loading

0 comments on commit b1a3f12

Please sign in to comment.