From c7ecf2c02cec13afc871f67c5bc16aff43abe4fb Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Fri, 15 Nov 2024 16:51:17 -0800 Subject: [PATCH] [chore] Fix usage of setupTestTelemetry (#36394) Signed-off-by: Bogdan Drutu --- .../servicegraphconnector/connector_test.go | 1 + .../processor_decisions_test.go | 42 +++++++++---------- receiver/kafkareceiver/kafka_receiver_test.go | 9 ++++ receiver/solacereceiver/receiver_test.go | 7 ++++ 4 files changed, 38 insertions(+), 21 deletions(-) diff --git a/connector/servicegraphconnector/connector_test.go b/connector/servicegraphconnector/connector_test.go index 3b6e1a4beda7..8dba2717a06f 100644 --- a/connector/servicegraphconnector/connector_test.go +++ b/connector/servicegraphconnector/connector_test.go @@ -606,6 +606,7 @@ func TestValidateOwnTelemetry(t *testing.T) { }, }, }) + require.NoError(t, set.Shutdown(context.Background())) } func TestExtraDimensionsLabels(t *testing.T) { diff --git a/processor/tailsamplingprocessor/processor_decisions_test.go b/processor/tailsamplingprocessor/processor_decisions_test.go index e50ea4f7ce59..c198316fc418 100644 --- a/processor/tailsamplingprocessor/processor_decisions_test.go +++ b/processor/tailsamplingprocessor/processor_decisions_test.go @@ -25,8 +25,7 @@ func TestSamplingPolicyTypicalPath(t *testing.T) { NumTraces: defaultNumTraces, } nextConsumer := new(consumertest.TracesSink) - s := setupTestTelemetry() - ct := s.NewSettings() + tel := setupTestTelemetry() idb := newSyncIDBatcher() mpe1 := &mockPolicyEvaluator{} @@ -35,7 +34,7 @@ func TestSamplingPolicyTypicalPath(t *testing.T) { {name: "mock-policy-1", evaluator: mpe1, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))}, } - p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) + p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) require.NoError(t, err) require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) @@ -62,6 +61,7 @@ func TestSamplingPolicyTypicalPath(t *testing.T) { // The final decision SHOULD be Sampled. require.EqualValues(t, 1, nextConsumer.SpanCount()) + require.NoError(t, tel.Shutdown(context.Background())) } func TestSamplingPolicyInvertSampled(t *testing.T) { @@ -70,8 +70,7 @@ func TestSamplingPolicyInvertSampled(t *testing.T) { NumTraces: defaultNumTraces, } nextConsumer := new(consumertest.TracesSink) - s := setupTestTelemetry() - ct := s.NewSettings() + tel := setupTestTelemetry() idb := newSyncIDBatcher() mpe1 := &mockPolicyEvaluator{} @@ -80,7 +79,7 @@ func TestSamplingPolicyInvertSampled(t *testing.T) { {name: "mock-policy-1", evaluator: mpe1, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))}, } - p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) + p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) require.NoError(t, err) require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) @@ -107,6 +106,7 @@ func TestSamplingPolicyInvertSampled(t *testing.T) { // The final decision SHOULD be Sampled. require.EqualValues(t, 1, nextConsumer.SpanCount()) + require.NoError(t, tel.Shutdown(context.Background())) } func TestSamplingMultiplePolicies(t *testing.T) { @@ -115,8 +115,7 @@ func TestSamplingMultiplePolicies(t *testing.T) { NumTraces: defaultNumTraces, } nextConsumer := new(consumertest.TracesSink) - s := setupTestTelemetry() - ct := s.NewSettings() + tel := setupTestTelemetry() idb := newSyncIDBatcher() mpe1 := &mockPolicyEvaluator{} @@ -127,7 +126,7 @@ func TestSamplingMultiplePolicies(t *testing.T) { {name: "mock-policy-2", evaluator: mpe2, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-2"))}, } - p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) + p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) require.NoError(t, err) require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) @@ -158,6 +157,7 @@ func TestSamplingMultiplePolicies(t *testing.T) { // The final decision SHOULD be Sampled. require.EqualValues(t, 1, nextConsumer.SpanCount()) + require.NoError(t, tel.Shutdown(context.Background())) } func TestSamplingPolicyDecisionNotSampled(t *testing.T) { @@ -166,8 +166,7 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) { NumTraces: defaultNumTraces, } nextConsumer := new(consumertest.TracesSink) - s := setupTestTelemetry() - ct := s.NewSettings() + tel := setupTestTelemetry() idb := newSyncIDBatcher() mpe1 := &mockPolicyEvaluator{} @@ -176,7 +175,7 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) { {name: "mock-policy-1", evaluator: mpe1, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))}, } - p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) + p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) require.NoError(t, err) require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) @@ -204,6 +203,7 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) { // The final decision SHOULD be NotSampled. require.EqualValues(t, 0, nextConsumer.SpanCount()) + require.NoError(t, tel.Shutdown(context.Background())) } func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) { @@ -212,8 +212,7 @@ func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) { NumTraces: defaultNumTraces, } nextConsumer := new(consumertest.TracesSink) - s := setupTestTelemetry() - ct := s.NewSettings() + tel := setupTestTelemetry() idb := newSyncIDBatcher() mpe1 := &mockPolicyEvaluator{} @@ -224,7 +223,7 @@ func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) { {name: "mock-policy-2", evaluator: mpe2, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-2"))}, } - p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) + p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) require.NoError(t, err) require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) @@ -255,6 +254,7 @@ func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) { // The final decision SHOULD be NotSampled. require.EqualValues(t, 0, nextConsumer.SpanCount()) + require.NoError(t, tel.Shutdown(context.Background())) } func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) { @@ -263,8 +263,7 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) { NumTraces: defaultNumTraces, } nextConsumer := new(consumertest.TracesSink) - s := setupTestTelemetry() - ct := s.NewSettings() + tel := setupTestTelemetry() idb := newSyncIDBatcher() mpe1 := &mockPolicyEvaluator{} @@ -275,7 +274,7 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) { {name: "mock-policy-2", evaluator: mpe2, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-2"))}, } - p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) + p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) require.NoError(t, err) require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) @@ -325,6 +324,7 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) { require.EqualValues(t, 1, mpe1.EvaluationCount) require.EqualValues(t, 1, mpe2.EvaluationCount) require.EqualValues(t, 0, nextConsumer.SpanCount(), "original final decision not honored") + require.NoError(t, tel.Shutdown(context.Background())) } func TestLateArrivingSpanUsesDecisionCache(t *testing.T) { @@ -333,8 +333,7 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) { NumTraces: defaultNumTraces, } nextConsumer := new(consumertest.TracesSink) - s := setupTestTelemetry() - ct := s.NewSettings() + tel := setupTestTelemetry() idb := newSyncIDBatcher() mpe := &mockPolicyEvaluator{} @@ -345,7 +344,7 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) { // Use this instead of the default no-op cache c, err := cache.NewLRUDecisionCache[bool](200) require.NoError(t, err) - p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies), withSampledDecisionCache(c)) + p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies), withSampledDecisionCache(c)) require.NoError(t, err) require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) @@ -399,4 +398,5 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) { require.NoError(t, p.ConsumeTraces(context.Background(), spanIndexToTraces(2))) require.EqualValues(t, 1, mpe.EvaluationCount) require.EqualValues(t, 2, nextConsumer.SpanCount(), "original final decision not honored") + require.NoError(t, tel.Shutdown(context.Background())) } diff --git a/receiver/kafkareceiver/kafka_receiver_test.go b/receiver/kafkareceiver/kafka_receiver_test.go index 61693d58eced..e353974acd91 100644 --- a/receiver/kafkareceiver/kafka_receiver_test.go +++ b/receiver/kafkareceiver/kafka_receiver_test.go @@ -189,6 +189,7 @@ func TestTracesConsumerGroupHandler(t *testing.T) { groupClaim.messageChan <- &sarama.ConsumerMessage{} close(groupClaim.messageChan) wg.Wait() + require.NoError(t, tel.Shutdown(context.Background())) } func TestTracesConsumerGroupHandler_session_done(t *testing.T) { @@ -233,6 +234,7 @@ func TestTracesConsumerGroupHandler_session_done(t *testing.T) { groupClaim.messageChan <- &sarama.ConsumerMessage{} cancelFunc() wg.Wait() + require.NoError(t, tel.Shutdown(context.Background())) } func TestTracesConsumerGroupHandler_error_unmarshal(t *testing.T) { @@ -331,6 +333,7 @@ func TestTracesConsumerGroupHandler_error_unmarshal(t *testing.T) { }, }, }) + require.NoError(t, tel.Shutdown(context.Background())) } func TestTracesConsumerGroupHandler_error_nextConsumer(t *testing.T) { @@ -531,6 +534,7 @@ func TestMetricsConsumerGroupHandler(t *testing.T) { groupClaim.messageChan <- &sarama.ConsumerMessage{} close(groupClaim.messageChan) wg.Wait() + require.NoError(t, tel.Shutdown(context.Background())) } func TestMetricsConsumerGroupHandler_session_done(t *testing.T) { @@ -574,6 +578,7 @@ func TestMetricsConsumerGroupHandler_session_done(t *testing.T) { groupClaim.messageChan <- &sarama.ConsumerMessage{} cancelFunc() wg.Wait() + require.NoError(t, tel.Shutdown(context.Background())) } func TestMetricsConsumerGroupHandler_error_unmarshal(t *testing.T) { @@ -672,6 +677,7 @@ func TestMetricsConsumerGroupHandler_error_unmarshal(t *testing.T) { }, }, }) + require.NoError(t, tel.Shutdown(context.Background())) } func TestMetricsConsumerGroupHandler_error_nextConsumer(t *testing.T) { @@ -887,6 +893,7 @@ func TestLogsConsumerGroupHandler(t *testing.T) { groupClaim.messageChan <- &sarama.ConsumerMessage{} close(groupClaim.messageChan) wg.Wait() + require.NoError(t, tel.Shutdown(context.Background())) } func TestLogsConsumerGroupHandler_session_done(t *testing.T) { @@ -930,6 +937,7 @@ func TestLogsConsumerGroupHandler_session_done(t *testing.T) { groupClaim.messageChan <- &sarama.ConsumerMessage{} cancelFunc() wg.Wait() + require.NoError(t, tel.Shutdown(context.Background())) } func TestLogsConsumerGroupHandler_error_unmarshal(t *testing.T) { @@ -1028,6 +1036,7 @@ func TestLogsConsumerGroupHandler_error_unmarshal(t *testing.T) { }, }, }) + require.NoError(t, tel.Shutdown(context.Background())) } func TestLogsConsumerGroupHandler_error_nextConsumer(t *testing.T) { diff --git a/receiver/solacereceiver/receiver_test.go b/receiver/solacereceiver/receiver_test.go index d6ce7913ce2a..e68850ae8c23 100644 --- a/receiver/solacereceiver/receiver_test.go +++ b/receiver/solacereceiver/receiver_test.go @@ -217,6 +217,7 @@ func TestReceiveMessage(t *testing.T) { if testCase.validation != nil { testCase.validation(t, tt) } + require.NoError(t, tt.Shutdown(context.Background())) }) } } @@ -281,6 +282,7 @@ func TestReceiveMessagesTerminateWithCtxDone(t *testing.T) { }, }, }) + require.NoError(t, tt.Shutdown(context.Background())) } func TestReceiverLifecycle(t *testing.T) { @@ -413,6 +415,7 @@ func TestReceiverLifecycle(t *testing.T) { }, }, }) + require.NoError(t, tt.Shutdown(context.Background())) } func TestReceiverDialFailureContinue(t *testing.T) { @@ -542,6 +545,7 @@ func TestReceiverDialFailureContinue(t *testing.T) { }, }, }) + require.NoError(t, tt.Shutdown(context.Background())) } func TestReceiverUnmarshalVersionFailureExpectingDisable(t *testing.T) { @@ -656,6 +660,7 @@ func TestReceiverUnmarshalVersionFailureExpectingDisable(t *testing.T) { }) err = receiver.Shutdown(context.Background()) assert.NoError(t, err) + require.NoError(t, tt.Shutdown(context.Background())) } func TestReceiverFlowControlDelayedRetry(t *testing.T) { @@ -935,6 +940,7 @@ func TestReceiverFlowControlDelayedRetry(t *testing.T) { }, }) } + require.NoError(t, tt.Shutdown(context.Background())) }) } } @@ -1153,6 +1159,7 @@ func TestReceiverFlowControlDelayedRetryMultipleRetries(t *testing.T) { }, }, }) + require.NoError(t, tt.Shutdown(context.Background())) } func newReceiver(t *testing.T) (*solaceTracesReceiver, *mockMessagingService, *mockUnmarshaller, componentTestTelemetry) {