From ccd054525dae14d5bee678252f430febf61b6af7 Mon Sep 17 00:00:00 2001 From: Sebastian Villena <97059974+ruisebas@users.noreply.github.com> Date: Wed, 1 Nov 2023 14:53:31 -0400 Subject: [PATCH] fix(Analytics): Fixing crash when attempting to submit events while a previous submission is in progress (#3331) --- .../Analytics/AnalyticsClient.swift | 6 +-- .../Analytics/EventRecorder.swift | 49 +++++++++++-------- .../AnalyticsClientTests.swift | 13 +++-- .../EventRecorderTests.swift | 4 +- .../Mocks/MockEventRecorder.swift | 6 ++- 5 files changed, 45 insertions(+), 33 deletions(-) diff --git a/AmplifyPlugins/Internal/Sources/InternalAWSPinpoint/Analytics/AnalyticsClient.swift b/AmplifyPlugins/Internal/Sources/InternalAWSPinpoint/Analytics/AnalyticsClient.swift index 550dc37fc1..6ed75f1ab6 100644 --- a/AmplifyPlugins/Internal/Sources/InternalAWSPinpoint/Analytics/AnalyticsClient.swift +++ b/AmplifyPlugins/Internal/Sources/InternalAWSPinpoint/Analytics/AnalyticsClient.swift @@ -23,7 +23,7 @@ public protocol AnalyticsClientBehaviour: Actor { func removeGlobalAttribute(forKey key: String, forEventType eventType: String) func removeGlobalMetric(forKey key: String) func removeGlobalMetric(forKey key: String, forEventType eventType: String) - func record(_ event: PinpointEvent) throws + func record(_ event: PinpointEvent) async throws func setRemoteGlobalAttributes(_ attributes: [String: String]) func removeAllRemoteGlobalAttributes() @@ -231,7 +231,7 @@ actor AnalyticsClient: AnalyticsClientBehaviour { session: sessionProvider()) } - func record(_ event: PinpointEvent) throws { + func record(_ event: PinpointEvent) async throws { // Add event type attributes if let eventAttributes = eventTypeAttributes[event.eventType] { for (key, attribute) in eventAttributes { @@ -256,7 +256,7 @@ actor AnalyticsClient: AnalyticsClientBehaviour { event.addMetric(metric, forKey: key) } - try eventRecorder.save(event) + try await eventRecorder.save(event) } @discardableResult diff --git a/AmplifyPlugins/Internal/Sources/InternalAWSPinpoint/Analytics/EventRecorder.swift b/AmplifyPlugins/Internal/Sources/InternalAWSPinpoint/Analytics/EventRecorder.swift index 21ee1cc88e..15ceb143a8 100644 --- a/AmplifyPlugins/Internal/Sources/InternalAWSPinpoint/Analytics/EventRecorder.swift +++ b/AmplifyPlugins/Internal/Sources/InternalAWSPinpoint/Analytics/EventRecorder.swift @@ -13,8 +13,8 @@ import enum AwsCommonRuntimeKit.CommonRunTimeError import Foundation /// AnalyticsEventRecording saves and submits pinpoint events -protocol AnalyticsEventRecording { - var pinpointClient: PinpointClientProtocol { get } +protocol AnalyticsEventRecording: Actor { + nonisolated var pinpointClient: PinpointClientProtocol { get } /// Saves a pinpoint event to storage /// - Parameter event: A PinpointEvent @@ -35,12 +35,13 @@ protocol AnalyticsEventRecording { } /// An AnalyticsEventRecording implementation that stores and submits pinpoint events -class EventRecorder: AnalyticsEventRecording { - let appId: String - let storage: AnalyticsEventStorage - let pinpointClient: PinpointClientProtocol - let endpointClient: EndpointClientBehaviour +actor EventRecorder: AnalyticsEventRecording { + private let appId: String + private let storage: AnalyticsEventStorage private var submittedEvents: [PinpointEvent] = [] + private var submissionTask: Task<[PinpointEvent], Error>? + nonisolated let endpointClient: EndpointClientBehaviour + nonisolated let pinpointClient: PinpointClientProtocol /// Initializer for Event Recorder /// - Parameters: @@ -66,31 +67,37 @@ class EventRecorder: AnalyticsEventRecording { func save(_ event: PinpointEvent) throws { log.verbose("saveEvent: \(event)") try storage.saveEvent(event) - try self.storage.checkDiskSize(limit: Constants.pinpointClientByteLimitDefault) + try storage.checkDiskSize(limit: Constants.pinpointClientByteLimitDefault) } func updateAttributesOfEvents(ofType eventType: String, withSessionId sessionId: PinpointSession.SessionId, setAttributes attributes: [String: String]) throws { - try self.storage.updateEvents(ofType: eventType, + try storage.updateEvents(ofType: eventType, withSessionId: sessionId, setAttributes: attributes) } - /// Submit all locally stored events in batches - /// If event submission fails, the event retry count is increment otherwise event is marked dirty and available for deletion in the local storage if retry count exceeds 3 - /// If event submission succeeds, the event is removed from local storage + /// Submit all locally stored events in batches. If a previous submission is in progress, it waits until it's completed before proceeding. + /// When the submission for an event is accepted, the event is removed from local storage + /// When the submission for an event is rejected, the event retry count is incremented in the local storage. Events that exceed the maximum retry count (3) are purged. /// - Returns: A collection of events submitted to Pinpoint func submitAllEvents() async throws -> [PinpointEvent] { - submittedEvents = [] - let eventsBatch = try getBatchRecords() - if eventsBatch.count > 0 { - let endpointProfile = await endpointClient.currentEndpointProfile() - try await processBatch(eventsBatch, endpointProfile: endpointProfile) - } else { - log.verbose("No events to submit") + let task = Task { [submissionTask] in + // Wait for the previous submission to complete, regardless of its result + _ = try? await submissionTask?.value + submittedEvents = [] + let eventsBatch = try getBatchRecords() + if eventsBatch.count > 0 { + let endpointProfile = await endpointClient.currentEndpointProfile() + try await processBatch(eventsBatch, endpointProfile: endpointProfile) + } else { + log.verbose("No events to submit") + } + return submittedEvents } - return submittedEvents + submissionTask = task + return try await task.value } private func getBatchRecords() throws -> [PinpointEvent] { @@ -343,7 +350,7 @@ extension EventRecorder: DefaultLogger { public static var log: Logger { Amplify.Logging.logger(forCategory: CategoryType.analytics.displayName, forNamespace: String(describing: self)) } - public var log: Logger { + nonisolated public var log: Logger { Self.log } } diff --git a/AmplifyPlugins/Internal/Tests/InternalAWSPinpointUnitTests/AnalyticsClientTests.swift b/AmplifyPlugins/Internal/Tests/InternalAWSPinpointUnitTests/AnalyticsClientTests.swift index 3393628ff2..4e0be941ca 100644 --- a/AmplifyPlugins/Internal/Tests/InternalAWSPinpointUnitTests/AnalyticsClientTests.swift +++ b/AmplifyPlugins/Internal/Tests/InternalAWSPinpointUnitTests/AnalyticsClientTests.swift @@ -85,8 +85,9 @@ class AnalyticsClientTests: XCTestCase { do { try await analyticsClient.record(event) - XCTAssertEqual(eventRecorder.saveCount, 1) - guard let savedEvent = eventRecorder.lastSavedEvent else { + let saveCount = await eventRecorder.saveCount + XCTAssertEqual(saveCount, 1) + guard let savedEvent = await eventRecorder.lastSavedEvent else { XCTFail("Expected saved event") return } @@ -118,8 +119,9 @@ class AnalyticsClientTests: XCTestCase { do { try await analyticsClient.record(event) - XCTAssertEqual(eventRecorder.saveCount, 1) - guard let savedEvent = eventRecorder.lastSavedEvent else { + let saveCount = await eventRecorder.saveCount + XCTAssertEqual(saveCount, 1) + guard let savedEvent = await eventRecorder.lastSavedEvent else { XCTFail("Expected saved event") return } @@ -142,7 +144,8 @@ class AnalyticsClientTests: XCTestCase { func testSubmit() async { do { try await analyticsClient.submitEvents() - XCTAssertEqual(eventRecorder.submitCount, 1) + let submitCount = await eventRecorder.submitCount + XCTAssertEqual(submitCount, 1) } catch { XCTFail("Unexpected exception while attempting to submit events") } diff --git a/AmplifyPlugins/Internal/Tests/InternalAWSPinpointUnitTests/EventRecorderTests.swift b/AmplifyPlugins/Internal/Tests/InternalAWSPinpointUnitTests/EventRecorderTests.swift index 004860b037..bf7a70f69c 100644 --- a/AmplifyPlugins/Internal/Tests/InternalAWSPinpointUnitTests/EventRecorderTests.swift +++ b/AmplifyPlugins/Internal/Tests/InternalAWSPinpointUnitTests/EventRecorderTests.swift @@ -48,7 +48,7 @@ class EventRecorderTests: XCTestCase { /// - Given: a event recorder /// - When: a new pinpoint event is aved /// - Then: the event is saved to storage followed by a disk size check - func testSaveEvent() { + func testSaveEvent() async { let session = PinpointSession(sessionId: "1", startTime: Date(), stopTime: nil) let event = PinpointEvent(id: "1", eventType: "eventType", eventDate: Date(), session: session) @@ -56,7 +56,7 @@ class EventRecorderTests: XCTestCase { XCTAssertEqual(storage.checkDiskSizeCallCount, 1) do { - try recorder.save(event) + try await recorder.save(event) } catch { XCTFail("Failed to save events") } diff --git a/AmplifyPlugins/Internal/Tests/InternalAWSPinpointUnitTests/Mocks/MockEventRecorder.swift b/AmplifyPlugins/Internal/Tests/InternalAWSPinpointUnitTests/Mocks/MockEventRecorder.swift index d27fb1b7f6..e73512324b 100644 --- a/AmplifyPlugins/Internal/Tests/InternalAWSPinpointUnitTests/Mocks/MockEventRecorder.swift +++ b/AmplifyPlugins/Internal/Tests/InternalAWSPinpointUnitTests/Mocks/MockEventRecorder.swift @@ -9,8 +9,10 @@ import AWSPinpoint @_spi(InternalAWSPinpoint) @testable import InternalAWSPinpoint -class MockEventRecorder: AnalyticsEventRecording { - var pinpointClient: PinpointClientProtocol = MockPinpointClient() +actor MockEventRecorder: AnalyticsEventRecording { + nonisolated var pinpointClient: PinpointClientProtocol { + MockPinpointClient() + } var saveCount = 0 var lastSavedEvent: PinpointEvent?