Skip to content

Commit

Permalink
deltatocumulative: Linear -> Processor
Browse files Browse the repository at this point in the history
  • Loading branch information
sh0rez committed Nov 22, 2024
1 parent b1a3f12 commit 25f9626
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 15 deletions.
2 changes: 1 addition & 1 deletion processor/deltatocumulativeprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

"go.opentelemetry.io/collector/component"

telemetry "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/lineartelemetry"
telemetry "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry"
)

var _ component.ConfigValidator = (*Config)(nil)
Expand Down
6 changes: 3 additions & 3 deletions processor/deltatocumulativeprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor"

ltel "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/lineartelemetry"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry"
)

func NewFactory() processor.Factory {
Expand All @@ -29,10 +29,10 @@ func createMetricsProcessor(_ context.Context, set processor.Settings, cfg compo
return nil, fmt.Errorf("configuration parsing error")
}

ltel, err := ltel.New(set.TelemetrySettings)
tel, err := telemetry.New(set.TelemetrySettings)
if err != nil {
return nil, err
}

return newLinear(pcfg, ltel, next), nil
return newProcessor(pcfg, tel, next), nil
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package telemetry // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/lineartelemetry"
package telemetry // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry"

import "go.opentelemetry.io/otel/attribute"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package telemetry // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/lineartelemetry"
package telemetry // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry"

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta"
telemetry "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/lineartelemetry"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry"
)

var _ processor.Metrics = (*Linear)(nil)
var _ processor.Metrics = (*Processor)(nil)

type Linear struct {
type Processor struct {
next consumer.Metrics
cfg Config

Expand All @@ -36,10 +36,10 @@ type Linear struct {
tel telemetry.Metrics
}

func newLinear(cfg *Config, tel telemetry.Metrics, next consumer.Metrics) *Linear {
func newProcessor(cfg *Config, tel telemetry.Metrics, next consumer.Metrics) *Processor {
ctx, cancel := context.WithCancel(context.Background())

proc := Linear{
proc := Processor{
next: next,
cfg: *cfg,
last: state{
Expand All @@ -60,7 +60,7 @@ func newLinear(cfg *Config, tel telemetry.Metrics, next consumer.Metrics) *Linea
return &proc
}

func (p *Linear) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
p.mtx.Lock()
defer p.mtx.Unlock()

Expand Down Expand Up @@ -141,7 +141,7 @@ func (p *Linear) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
return p.next.ConsumeMetrics(ctx, md)
}

func (p *Linear) Start(_ context.Context, _ component.Host) error {
func (p *Processor) Start(_ context.Context, _ component.Host) error {
if p.cfg.MaxStale != 0 {
// delete stale streams once per minute
go func() {
Expand All @@ -166,12 +166,12 @@ func (p *Linear) Start(_ context.Context, _ component.Host) error {
return nil
}

func (p *Linear) Shutdown(_ context.Context) error {
func (p *Processor) Shutdown(_ context.Context) error {
p.cancel()
return nil
}

func (p *Linear) Capabilities() consumer.Capabilities {
func (p *Processor) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: true}
}

Expand Down

0 comments on commit 25f9626

Please sign in to comment.