Skip to content

Commit

Permalink
[chore] Fix usage of setupTestTelemetry (#36394)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored Nov 16, 2024
1 parent fbad29c commit c7ecf2c
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 21 deletions.
1 change: 1 addition & 0 deletions connector/servicegraphconnector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,7 @@ func TestValidateOwnTelemetry(t *testing.T) {
},
},
})
require.NoError(t, set.Shutdown(context.Background()))
}

func TestExtraDimensionsLabels(t *testing.T) {
Expand Down
42 changes: 21 additions & 21 deletions processor/tailsamplingprocessor/processor_decisions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ func TestSamplingPolicyTypicalPath(t *testing.T) {
NumTraces: defaultNumTraces,
}
nextConsumer := new(consumertest.TracesSink)
s := setupTestTelemetry()
ct := s.NewSettings()
tel := setupTestTelemetry()
idb := newSyncIDBatcher()

mpe1 := &mockPolicyEvaluator{}
Expand All @@ -35,7 +34,7 @@ func TestSamplingPolicyTypicalPath(t *testing.T) {
{name: "mock-policy-1", evaluator: mpe1, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))},
}

p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
require.NoError(t, err)

require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -62,6 +61,7 @@ func TestSamplingPolicyTypicalPath(t *testing.T) {

// The final decision SHOULD be Sampled.
require.EqualValues(t, 1, nextConsumer.SpanCount())
require.NoError(t, tel.Shutdown(context.Background()))
}

func TestSamplingPolicyInvertSampled(t *testing.T) {
Expand All @@ -70,8 +70,7 @@ func TestSamplingPolicyInvertSampled(t *testing.T) {
NumTraces: defaultNumTraces,
}
nextConsumer := new(consumertest.TracesSink)
s := setupTestTelemetry()
ct := s.NewSettings()
tel := setupTestTelemetry()
idb := newSyncIDBatcher()

mpe1 := &mockPolicyEvaluator{}
Expand All @@ -80,7 +79,7 @@ func TestSamplingPolicyInvertSampled(t *testing.T) {
{name: "mock-policy-1", evaluator: mpe1, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))},
}

p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
require.NoError(t, err)

require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -107,6 +106,7 @@ func TestSamplingPolicyInvertSampled(t *testing.T) {

// The final decision SHOULD be Sampled.
require.EqualValues(t, 1, nextConsumer.SpanCount())
require.NoError(t, tel.Shutdown(context.Background()))
}

func TestSamplingMultiplePolicies(t *testing.T) {
Expand All @@ -115,8 +115,7 @@ func TestSamplingMultiplePolicies(t *testing.T) {
NumTraces: defaultNumTraces,
}
nextConsumer := new(consumertest.TracesSink)
s := setupTestTelemetry()
ct := s.NewSettings()
tel := setupTestTelemetry()
idb := newSyncIDBatcher()

mpe1 := &mockPolicyEvaluator{}
Expand All @@ -127,7 +126,7 @@ func TestSamplingMultiplePolicies(t *testing.T) {
{name: "mock-policy-2", evaluator: mpe2, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-2"))},
}

p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
require.NoError(t, err)

require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -158,6 +157,7 @@ func TestSamplingMultiplePolicies(t *testing.T) {

// The final decision SHOULD be Sampled.
require.EqualValues(t, 1, nextConsumer.SpanCount())
require.NoError(t, tel.Shutdown(context.Background()))
}

func TestSamplingPolicyDecisionNotSampled(t *testing.T) {
Expand All @@ -166,8 +166,7 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) {
NumTraces: defaultNumTraces,
}
nextConsumer := new(consumertest.TracesSink)
s := setupTestTelemetry()
ct := s.NewSettings()
tel := setupTestTelemetry()
idb := newSyncIDBatcher()

mpe1 := &mockPolicyEvaluator{}
Expand All @@ -176,7 +175,7 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) {
{name: "mock-policy-1", evaluator: mpe1, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))},
}

p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
require.NoError(t, err)

require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -204,6 +203,7 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) {

// The final decision SHOULD be NotSampled.
require.EqualValues(t, 0, nextConsumer.SpanCount())
require.NoError(t, tel.Shutdown(context.Background()))
}

func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) {
Expand All @@ -212,8 +212,7 @@ func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) {
NumTraces: defaultNumTraces,
}
nextConsumer := new(consumertest.TracesSink)
s := setupTestTelemetry()
ct := s.NewSettings()
tel := setupTestTelemetry()
idb := newSyncIDBatcher()

mpe1 := &mockPolicyEvaluator{}
Expand All @@ -224,7 +223,7 @@ func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) {
{name: "mock-policy-2", evaluator: mpe2, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-2"))},
}

p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
require.NoError(t, err)

require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -255,6 +254,7 @@ func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) {

// The final decision SHOULD be NotSampled.
require.EqualValues(t, 0, nextConsumer.SpanCount())
require.NoError(t, tel.Shutdown(context.Background()))
}

func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) {
Expand All @@ -263,8 +263,7 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) {
NumTraces: defaultNumTraces,
}
nextConsumer := new(consumertest.TracesSink)
s := setupTestTelemetry()
ct := s.NewSettings()
tel := setupTestTelemetry()
idb := newSyncIDBatcher()

mpe1 := &mockPolicyEvaluator{}
Expand All @@ -275,7 +274,7 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) {
{name: "mock-policy-2", evaluator: mpe2, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-2"))},
}

p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
require.NoError(t, err)

require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -325,6 +324,7 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) {
require.EqualValues(t, 1, mpe1.EvaluationCount)
require.EqualValues(t, 1, mpe2.EvaluationCount)
require.EqualValues(t, 0, nextConsumer.SpanCount(), "original final decision not honored")
require.NoError(t, tel.Shutdown(context.Background()))
}

func TestLateArrivingSpanUsesDecisionCache(t *testing.T) {
Expand All @@ -333,8 +333,7 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) {
NumTraces: defaultNumTraces,
}
nextConsumer := new(consumertest.TracesSink)
s := setupTestTelemetry()
ct := s.NewSettings()
tel := setupTestTelemetry()
idb := newSyncIDBatcher()

mpe := &mockPolicyEvaluator{}
Expand All @@ -345,7 +344,7 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) {
// Use this instead of the default no-op cache
c, err := cache.NewLRUDecisionCache[bool](200)
require.NoError(t, err)
p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies), withSampledDecisionCache(c))
p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies), withSampledDecisionCache(c))
require.NoError(t, err)

require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -399,4 +398,5 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) {
require.NoError(t, p.ConsumeTraces(context.Background(), spanIndexToTraces(2)))
require.EqualValues(t, 1, mpe.EvaluationCount)
require.EqualValues(t, 2, nextConsumer.SpanCount(), "original final decision not honored")
require.NoError(t, tel.Shutdown(context.Background()))
}
9 changes: 9 additions & 0 deletions receiver/kafkareceiver/kafka_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func TestTracesConsumerGroupHandler(t *testing.T) {
groupClaim.messageChan <- &sarama.ConsumerMessage{}
close(groupClaim.messageChan)
wg.Wait()
require.NoError(t, tel.Shutdown(context.Background()))
}

func TestTracesConsumerGroupHandler_session_done(t *testing.T) {
Expand Down Expand Up @@ -233,6 +234,7 @@ func TestTracesConsumerGroupHandler_session_done(t *testing.T) {
groupClaim.messageChan <- &sarama.ConsumerMessage{}
cancelFunc()
wg.Wait()
require.NoError(t, tel.Shutdown(context.Background()))
}

func TestTracesConsumerGroupHandler_error_unmarshal(t *testing.T) {
Expand Down Expand Up @@ -331,6 +333,7 @@ func TestTracesConsumerGroupHandler_error_unmarshal(t *testing.T) {
},
},
})
require.NoError(t, tel.Shutdown(context.Background()))
}

func TestTracesConsumerGroupHandler_error_nextConsumer(t *testing.T) {
Expand Down Expand Up @@ -531,6 +534,7 @@ func TestMetricsConsumerGroupHandler(t *testing.T) {
groupClaim.messageChan <- &sarama.ConsumerMessage{}
close(groupClaim.messageChan)
wg.Wait()
require.NoError(t, tel.Shutdown(context.Background()))
}

func TestMetricsConsumerGroupHandler_session_done(t *testing.T) {
Expand Down Expand Up @@ -574,6 +578,7 @@ func TestMetricsConsumerGroupHandler_session_done(t *testing.T) {
groupClaim.messageChan <- &sarama.ConsumerMessage{}
cancelFunc()
wg.Wait()
require.NoError(t, tel.Shutdown(context.Background()))
}

func TestMetricsConsumerGroupHandler_error_unmarshal(t *testing.T) {
Expand Down Expand Up @@ -672,6 +677,7 @@ func TestMetricsConsumerGroupHandler_error_unmarshal(t *testing.T) {
},
},
})
require.NoError(t, tel.Shutdown(context.Background()))
}

func TestMetricsConsumerGroupHandler_error_nextConsumer(t *testing.T) {
Expand Down Expand Up @@ -887,6 +893,7 @@ func TestLogsConsumerGroupHandler(t *testing.T) {
groupClaim.messageChan <- &sarama.ConsumerMessage{}
close(groupClaim.messageChan)
wg.Wait()
require.NoError(t, tel.Shutdown(context.Background()))
}

func TestLogsConsumerGroupHandler_session_done(t *testing.T) {
Expand Down Expand Up @@ -930,6 +937,7 @@ func TestLogsConsumerGroupHandler_session_done(t *testing.T) {
groupClaim.messageChan <- &sarama.ConsumerMessage{}
cancelFunc()
wg.Wait()
require.NoError(t, tel.Shutdown(context.Background()))
}

func TestLogsConsumerGroupHandler_error_unmarshal(t *testing.T) {
Expand Down Expand Up @@ -1028,6 +1036,7 @@ func TestLogsConsumerGroupHandler_error_unmarshal(t *testing.T) {
},
},
})
require.NoError(t, tel.Shutdown(context.Background()))
}

func TestLogsConsumerGroupHandler_error_nextConsumer(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions receiver/solacereceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func TestReceiveMessage(t *testing.T) {
if testCase.validation != nil {
testCase.validation(t, tt)
}
require.NoError(t, tt.Shutdown(context.Background()))
})
}
}
Expand Down Expand Up @@ -281,6 +282,7 @@ func TestReceiveMessagesTerminateWithCtxDone(t *testing.T) {
},
},
})
require.NoError(t, tt.Shutdown(context.Background()))
}

func TestReceiverLifecycle(t *testing.T) {
Expand Down Expand Up @@ -413,6 +415,7 @@ func TestReceiverLifecycle(t *testing.T) {
},
},
})
require.NoError(t, tt.Shutdown(context.Background()))
}

func TestReceiverDialFailureContinue(t *testing.T) {
Expand Down Expand Up @@ -542,6 +545,7 @@ func TestReceiverDialFailureContinue(t *testing.T) {
},
},
})
require.NoError(t, tt.Shutdown(context.Background()))
}

func TestReceiverUnmarshalVersionFailureExpectingDisable(t *testing.T) {
Expand Down Expand Up @@ -656,6 +660,7 @@ func TestReceiverUnmarshalVersionFailureExpectingDisable(t *testing.T) {
})
err = receiver.Shutdown(context.Background())
assert.NoError(t, err)
require.NoError(t, tt.Shutdown(context.Background()))
}

func TestReceiverFlowControlDelayedRetry(t *testing.T) {
Expand Down Expand Up @@ -935,6 +940,7 @@ func TestReceiverFlowControlDelayedRetry(t *testing.T) {
},
})
}
require.NoError(t, tt.Shutdown(context.Background()))
})
}
}
Expand Down Expand Up @@ -1153,6 +1159,7 @@ func TestReceiverFlowControlDelayedRetryMultipleRetries(t *testing.T) {
},
},
})
require.NoError(t, tt.Shutdown(context.Background()))
}

func newReceiver(t *testing.T) (*solaceTracesReceiver, *mockMessagingService, *mockUnmarshaller, componentTestTelemetry) {
Expand Down

0 comments on commit c7ecf2c

Please sign in to comment.