Skip to content

Commit

Permalink
TASK: Move transactional logic _on_ projection as it does not belong …
Browse files Browse the repository at this point in the history
…to the subscription store

... which technically only coincidentally uses the same connection and dbal instance

see neos/neos-development-collection#5321 (comment)
  • Loading branch information
mhsdesign committed Nov 27, 2024
1 parent 086b5a1 commit 90ed483
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 16 deletions.
37 changes: 37 additions & 0 deletions Classes/Infrastructure/ProjectionTransactionTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?php

declare(strict_types=1);

namespace Neos\ContentRepository\Core\Infrastructure;

use Neos\ContentRepository\Core\Projection\ProjectionInterface;

/**
* @phpstan-require-implements ProjectionInterface
* @api to simplify creating a custom dbal projection
*/
trait ProjectionTransactionTrait
{
/**
* DBAL default implementation for {@see ProjectionInterface::transactional()}
*/
public function transactional(\Closure $closure): void
{
if ($this->dbal->isTransactionActive() === false) {
/** @phpstan-ignore argument.templateType */
$this->dbal->transactional($closure);
return;
}
// technically we could leverage nested transactions from dbal, which effectively does the same.
// but that requires us to enable this globally first via setNestTransactionsWithSavepoints also making this explicit is more transparent:
$this->dbal->createSavepoint('PROJECTION');
try {
$closure();
} catch (\Throwable $e) {
// roll back the partially applied event on the projection
$this->dbal->rollbackSavepoint('PROJECTION');
throw $e;
}
$this->dbal->releaseSavepoint('PROJECTION');
}
}
10 changes: 10 additions & 0 deletions Classes/Projection/ProjectionInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ public function setUp(): void;
*/
public function status(): ProjectionStatus;

/**
* Must invoke the closure which will update the catchup hooks and {@see apply}.
* Additionally, to guarantee exactly once delivery and also to behave correct during exceptions (even fatal ones),
* a database transaction should be started, or if a transaction is already active on the same connection save points
* must be used and rolled back on error.
*
* @param-immediately-invoked-callable $closure
*/
public function transactional(\Closure $closure): void;

public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void;

/**
Expand Down
10 changes: 3 additions & 7 deletions Classes/Subscription/Engine/SubscriptionEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -349,15 +349,12 @@ private function catchUpSubscriptions(Subscriptions $subscriptionsToCatchup, \Cl
$this->logger?->debug(sprintf('Subscription Engine: Subscription "%s" is farther than the current position (%d >= %d), continue catch up.', $subscription->id->value, $subscription->position->value, $sequenceNumber->value));
continue;
}
$this->subscriptionStore->createSavepoint();
$error = $this->handleEvent($eventEnvelope, $domainEvent, $subscription->id);
if ($error !== null) {
// ERROR Case:
// 1.) roll back the partially applied event on the subscriber
$this->subscriptionStore->rollbackSavepoint();
// 2.) for the leftover events we are not including this failed subscription for catchup
// 1.) for the leftover events we are not including this failed subscription for catchup
$subscriptionsToCatchup = $subscriptionsToCatchup->without($subscription->id);
// 3.) update the subscription error state on either its unchanged or new position (if some events worked)
// 2.) update the subscription error state on either its unchanged or new position (if some events worked)
$this->subscriptionStore->update(
$subscription->id,
status: SubscriptionStatus::ERROR,
Expand All @@ -367,7 +364,7 @@ private function catchUpSubscriptions(Subscriptions $subscriptionsToCatchup, \Cl
$error->throwable
),
);
// 4.) invoke onAfterCatchUp, as onBeforeCatchUp was invoked already and to be consistent we want to "shutdown" this catchup iteration event though we know it failed
// 3.) invoke onAfterCatchUp, as onBeforeCatchUp was invoked already and to be consistent we want to "shutdown" this catchup iteration event though we know it failed
// todo put the ERROR $subscriptionStatus into the after hook, so it can properly be reacted upon
try {
$this->subscribers->get($subscription->id)->onAfterCatchUp();
Expand All @@ -381,7 +378,6 @@ private function catchUpSubscriptions(Subscriptions $subscriptionsToCatchup, \Cl
continue;
}
// HAPPY Case:
$this->subscriptionStore->releaseSavepoint();
$highestSequenceNumberForSubscriber[$subscription->id->value] = $eventEnvelope->sequenceNumber;
}
$numberOfProcessedEvents++;
Expand Down
6 changes: 0 additions & 6 deletions Classes/Subscription/Store/SubscriptionStoreInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,4 @@ public function update(
* @return T
*/
public function transactional(\Closure $closure): mixed;

public function createSavepoint(): void;

public function releaseSavepoint(): void;

public function rollbackSavepoint(): void;
}
8 changes: 5 additions & 3 deletions Classes/Subscription/Subscriber/ProjectionSubscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ public function onBeforeCatchUp(SubscriptionStatus $subscriptionStatus): void

public function handle(EventInterface $event, EventEnvelope $eventEnvelope): void
{
$this->catchUpHook?->onBeforeEvent($event, $eventEnvelope);
$this->projection->apply($event, $eventEnvelope);
$this->catchUpHook?->onAfterEvent($event, $eventEnvelope);
$this->projection->transactional(function () use ($event, $eventEnvelope) {
$this->catchUpHook?->onBeforeEvent($event, $eventEnvelope);
$this->projection->apply($event, $eventEnvelope);
$this->catchUpHook?->onAfterEvent($event, $eventEnvelope);
});
}

public function onAfterCatchUp(): void
Expand Down

0 comments on commit 90ed483

Please sign in to comment.