Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle backpressure earlier in pipeline #2371

Merged
merged 5 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
### Enhancements

- Cache parsed DSN ([#2365](https://github.com/getsentry/sentry-dart/pull/2365))

- Handle backpressure earlier in pipeline ([#2371](https://github.com/getsentry/sentry-dart/pull/2371))
- Drops max un-awaited parallel tasks earlier, so event processors & callbacks are not executed for them.
denrase marked this conversation as resolved.
Show resolved Hide resolved
- Change by setting `SentryOptions.maxQueueSize`. Default is 30.

## 8.10.0-beta.2

### Fixes
Expand Down
69 changes: 49 additions & 20 deletions dart/lib/src/sentry.dart
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import 'sentry_user_feedback.dart';
import 'tracing.dart';
import 'sentry_attachment/sentry_attachment.dart';
import 'transport/data_category.dart';
import 'transport/task_queue.dart';

/// Configuration options callback
typedef OptionsConfiguration = FutureOr<void> Function(SentryOptions);
Expand All @@ -34,6 +36,7 @@
/// Sentry SDK main entry point
class Sentry {
static Hub _hub = NoOpHub();
static TaskQueue<SentryId> _taskQueue = NoOpTaskQueue();

Sentry._();

Expand All @@ -56,6 +59,11 @@
if (config is Future) {
await config;
}
_taskQueue = DefaultTaskQueue<SentryId>(
sentryOptions.maxQueueSize,
sentryOptions.logger,
sentryOptions.recorder,
);
} catch (exception, stackTrace) {
sentryOptions.logger(
SentryLevel.error,
Expand Down Expand Up @@ -181,12 +189,17 @@
Hint? hint,
ScopeCallback? withScope,
}) =>
_hub.captureEvent(
event,
stackTrace: stackTrace,
hint: hint,
withScope: withScope,
);
_taskQueue.enqueue(
() => _hub.captureEvent(
event,
stackTrace: stackTrace,
hint: hint,
withScope: withScope,
),
SentryId.empty(),
event.type != null
? DataCategory.fromItemType(event.type!)

Check warning on line 201 in dart/lib/src/sentry.dart

View check run for this annotation

Codecov / codecov/patch

dart/lib/src/sentry.dart#L201

Added line #L201 was not covered by tests
: DataCategory.unknown);

/// Reports the [throwable] and optionally its [stackTrace] to Sentry.io.
static Future<SentryId> captureException(
Expand All @@ -195,11 +208,15 @@
Hint? hint,
ScopeCallback? withScope,
}) =>
_hub.captureException(
throwable,
stackTrace: stackTrace,
hint: hint,
withScope: withScope,
_taskQueue.enqueue(
() => _hub.captureException(
throwable,
stackTrace: stackTrace,
hint: hint,
withScope: withScope,
),
SentryId.empty(),
DataCategory.error,
);

/// Reports a [message] to Sentry.io.
Expand All @@ -211,13 +228,17 @@
Hint? hint,
ScopeCallback? withScope,
}) =>
_hub.captureMessage(
message,
level: level,
template: template,
params: params,
hint: hint,
withScope: withScope,
_taskQueue.enqueue(
() => _hub.captureMessage(
message,
level: level,
template: template,
params: params,
hint: hint,
withScope: withScope,
),
SentryId.empty(),
DataCategory.unknown,
);

/// Reports a [userFeedback] to Sentry.io.
Expand All @@ -236,7 +257,15 @@
Hint? hint,
ScopeCallback? withScope,
}) =>
_hub.captureFeedback(feedback, hint: hint, withScope: withScope);
_taskQueue.enqueue(
() => _hub.captureFeedback(
feedback,
hint: hint,
withScope: withScope,
),
SentryId.empty(),
DataCategory.unknown,
);

/// Close the client SDK
static Future<void> close() async {
Expand All @@ -251,7 +280,7 @@
/// Last event id recorded by the current Hub
static SentryId get lastEventId => _hub.lastEventId;

/// Adds a breacrumb to the current Scope
/// Adds a breadcrumb to the current Scope
static Future<void> addBreadcrumb(Breadcrumb crumb, {Hint? hint}) =>
_hub.addBreadcrumb(crumb, hint: hint);

Expand Down
10 changes: 1 addition & 9 deletions dart/lib/src/sentry_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import 'transport/http_transport.dart';
import 'transport/noop_transport.dart';
import 'transport/rate_limiter.dart';
import 'transport/spotlight_http_transport.dart';
import 'transport/task_queue.dart';
import 'utils/isolate_utils.dart';
import 'utils/regex_utils.dart';
import 'utils/stacktrace_utils.dart';
Expand All @@ -39,10 +38,6 @@ const _defaultIpAddress = '{{auto}}';
/// Logs crash reports and events to the Sentry.io service.
class SentryClient {
final SentryOptions _options;
late final _taskQueue = TaskQueue<SentryId?>(
_options.maxQueueSize,
_options.logger,
);

final Random? _random;

Expand Down Expand Up @@ -630,9 +625,6 @@ class SentryClient {
Future<SentryId?> _attachClientReportsAndSend(SentryEnvelope envelope) {
final clientReport = _options.recorder.flush();
envelope.addClientReport(clientReport);
return _taskQueue.enqueue(
() => _options.transport.send(envelope),
SentryId.empty(),
);
return _options.transport.send(envelope);
}
}
42 changes: 37 additions & 5 deletions dart/lib/src/transport/task_queue.dart
Original file line number Diff line number Diff line change
@@ -1,21 +1,41 @@
import 'dart:async';

import 'package:meta/meta.dart';

import '../../sentry.dart';
import '../client_reports/client_report_recorder.dart';
import '../client_reports/discard_reason.dart';
import 'data_category.dart';

typedef Task<T> = Future<T> Function();

class TaskQueue<T> {
TaskQueue(this._maxQueueSize, this._logger);
@internal
abstract class TaskQueue<T> {
Future<T> enqueue(Task<T> task, T fallbackResult, DataCategory category);
}

@internal
class DefaultTaskQueue<T> implements TaskQueue<T> {
DefaultTaskQueue(this._maxQueueSize, this._logger, this._recorder);

final int _maxQueueSize;
final SentryLogger _logger;
final ClientReportRecorder _recorder;

int _queueCount = 0;

Future<T> enqueue(Task<T> task, T fallbackResult) async {
@override
Future<T> enqueue(
Task<T> task,
T fallbackResult,
DataCategory category,
) async {
if (_queueCount >= _maxQueueSize) {
_logger(SentryLevel.warning,
'Task dropped due to backpressure. Avoid capturing in a tight loop.');
_recorder.recordLostEvent(DiscardReason.queueOverflow, category);
_logger(
SentryLevel.warning,
'Task dropped due to reaching max ($_maxQueueSize} parallel tasks.).',
);
return fallbackResult;
} else {
_queueCount++;
Expand All @@ -27,3 +47,15 @@
}
}
}

@internal
class NoOpTaskQueue<T> implements TaskQueue<T> {
@override

Check warning on line 53 in dart/lib/src/transport/task_queue.dart

View check run for this annotation

Codecov / codecov/patch

dart/lib/src/transport/task_queue.dart#L53

Added line #L53 was not covered by tests
Future<T> enqueue(
Task<T> task,
T fallbackResult,
DataCategory category,
) {
return task();

Check warning on line 59 in dart/lib/src/transport/task_queue.dart

View check run for this annotation

Codecov / codecov/patch

dart/lib/src/transport/task_queue.dart#L59

Added line #L59 was not covered by tests
}
}
40 changes: 34 additions & 6 deletions dart/test/transport/tesk_queue_test.dart
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import 'dart:async';

import 'package:sentry/src/client_reports/discard_reason.dart';
import 'package:sentry/src/transport/data_category.dart';
import 'package:sentry/src/transport/task_queue.dart';
import 'package:test/test.dart';

import '../mocks/mock_client_report_recorder.dart';
import '../test_utils.dart';

void main() {
Expand All @@ -25,7 +28,7 @@ void main() {
await Future.delayed(Duration(milliseconds: 1));
completedTasks += 1;
return 1 + 1;
}, -1));
}, -1, DataCategory.error));
}

// This will always await the other futures, even if they are running longer, as it was scheduled after them.
Expand All @@ -48,7 +51,7 @@ void main() {
print('Completed task $i');
completedTasks += 1;
return 1 + 1;
}, -1));
}, -1, DataCategory.error));
}

print('Started waiting for first 5 tasks');
Expand All @@ -62,7 +65,7 @@ void main() {
print('Completed task $i');
completedTasks += 1;
return 1 + 1;
}, -1));
}, -1, DataCategory.error));
}

print('Started waiting for second 5 tasks');
Expand All @@ -83,7 +86,7 @@ void main() {
await Future.delayed(Duration(milliseconds: 1));
completedTasks += 1;
return 1 + 1;
}, -1);
}, -1, DataCategory.error);
}
expect(completedTasks, 10);
});
Expand All @@ -98,20 +101,45 @@ void main() {
await sut.enqueue(() async {
completedTasks += 1;
throw Error();
}, -1);
}, -1, DataCategory.error);
} catch (_) {
// Ignore
}
}
expect(completedTasks, 10);
});

test('recording dropped event when category set', () async {
final sut = fixture.getSut(maxQueueSize: 5);

for (int i = 0; i < 10; i++) {
unawaited(sut.enqueue(() async {
print('Task $i');
return 1 + 1;
}, -1, DataCategory.error));
}

// This will always await the other futures, even if they are running longer, as it was scheduled after them.
print('Started waiting for first 5 tasks');
await Future.delayed(Duration(milliseconds: 1));
print('Stopped waiting for first 5 tasks');

expect(fixture.clientReportRecorder.discardedEvents.length, 5);
for (final event in fixture.clientReportRecorder.discardedEvents) {
expect(event.reason, DiscardReason.queueOverflow);
expect(event.category, DataCategory.error);
expect(event.quantity, 1);
}
});
});
}

class Fixture {
final options = defaultTestOptions();

late var clientReportRecorder = MockClientReportRecorder();

TaskQueue<int> getSut({required int maxQueueSize}) {
return TaskQueue(maxQueueSize, options.logger);
return DefaultTaskQueue(maxQueueSize, options.logger, clientReportRecorder);
}
}
Loading