Skip to content

Commit 32ed7e7

Browse files
authored
Event sourcing: decouple from core (#449)
* move only side effects with pure event sourced aggregate logic * small refactor * make Definition and Reference Compilable Builders * decouple core from event sourcing: load aggregate * decouple core from event sourcing: save aggregate * refactor * add aggregate repository storage tests Taken from RepositoryStorageTest * review * remove $versionBeforeHandling from AggregateRepository interface * decouple aggregate module from event sourcing * move classes around * remove isEventSourced from RepositoryBuilder interface * not nullable logger * better error message * fix phpstan
1 parent be9bb1a commit 32ed7e7

File tree

41 files changed

+778
-839
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+778
-839
lines changed

packages/Dbal/src/DocumentStore/DocumentStoreAggregateRepositoryBuilder.php

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,6 @@ public function canHandle(string $aggregateClassName): bool
2121
return true;
2222
}
2323

24-
public function isEventSourced(): bool
25-
{
26-
return false;
27-
}
28-
2924
public function compile(MessagingContainerBuilder $builder): Definition
3025
{
3126
return new Definition(DocumentStoreAggregateRepository::class, [

packages/Dbal/src/ObjectManager/DoctrineORMRepositoryBuilder.php

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,6 @@ public function canHandle(string $aggregateClassName): bool
2828
return in_array($aggregateClassName, $this->dbalConfiguration->getDoctrineORMClasses());
2929
}
3030

31-
public function isEventSourced(): bool
32-
{
33-
return false;
34-
}
35-
3631
public function compile(MessagingContainerBuilder $builder): Definition
3732
{
3833
return new Definition(ManagerRegistryRepository::class, [
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
<?php
2+
/*
3+
* licence Apache-2.0
4+
*/
5+
declare(strict_types=1);
6+
7+
namespace Ecotone\EventSourcing;
8+
9+
use Ecotone\Messaging\Handler\ClassDefinition;
10+
use Ecotone\Messaging\Handler\Enricher\PropertyEditorAccessor;
11+
use Ecotone\Messaging\Handler\Enricher\PropertyPath;
12+
use Ecotone\Messaging\Handler\Enricher\PropertyReaderAccessor;
13+
use Ecotone\Messaging\Handler\TypeDescriptor;
14+
use Ecotone\Messaging\Store\Document\DocumentException;
15+
use Ecotone\Messaging\Store\Document\DocumentStore;
16+
use Ecotone\Messaging\Support\Assert;
17+
use Ecotone\Messaging\Support\GenericMessage;
18+
use Ecotone\Modelling\AggregateFlow\SaveAggregate\AggregateResolver\AggregateDefinitionRegistry;
19+
use Ecotone\Modelling\AggregateFlow\SaveAggregate\AggregateResolver\ResolvedAggregate;
20+
use Ecotone\Modelling\Attribute\AggregateVersion;
21+
use Ecotone\Modelling\BaseEventSourcingConfiguration;
22+
use Ecotone\Modelling\EventSourcedRepository;
23+
use Ecotone\Modelling\EventSourcingExecutor\GroupedEventSourcingExecutor;
24+
use Ecotone\Modelling\Repository\AggregateRepository;
25+
use Psr\Container\ContainerInterface;
26+
use Psr\Log\LoggerInterface;
27+
28+
class EventSourcedRepositoryAdapter implements AggregateRepository
29+
{
30+
public const SNAPSHOT_COLLECTION = 'aggregate_snapshots_';
31+
32+
public function __construct(
33+
private EventSourcedRepository $eventSourcedRepository,
34+
private AggregateDefinitionRegistry $aggregateDefinitionRegistry,
35+
private BaseEventSourcingConfiguration $eventSourcingConfiguration,
36+
private GroupedEventSourcingExecutor $eventSourcingHandlerExecutor,
37+
private ContainerInterface $container,
38+
private PropertyEditorAccessor $propertyEditorAccessor,
39+
private bool $isDefaultRepository,
40+
private LoggerInterface $logger,
41+
) {
42+
}
43+
44+
public function canHandle(string $aggregateClassName): bool
45+
{
46+
if ($this->isDefaultRepository && $this->aggregateDefinitionRegistry->has($aggregateClassName)) {
47+
$aggregateDefinition = $this->aggregateDefinitionRegistry->getFor($aggregateClassName);
48+
return $aggregateDefinition->isEventSourced();
49+
}
50+
return $this->eventSourcedRepository->canHandle($aggregateClassName);
51+
}
52+
53+
public function findBy(string $aggregateClassName, array $identifiers): ?ResolvedAggregate
54+
{
55+
$aggregate = null;
56+
$aggregateVersion = null;
57+
foreach ($this->eventSourcingConfiguration->getSnapshotsConfig() as $aggregateClass => $config) {
58+
if ($aggregateClass === $aggregateClassName) {
59+
/** @var DocumentStore $documentStore */
60+
$documentStore = $this->container->get($config['documentStore']);
61+
62+
try {
63+
$aggregate = $documentStore->findDocument(self::getSnapshotCollectionName($aggregateClassName), self::getSnapshotDocumentId($identifiers));
64+
} catch (DocumentException $documentException) {
65+
$this->logger->error("Failure during loading snapshot for aggregate {$aggregateClassName} with identifiers " . json_encode($identifiers) . '. Snapshot ignored to self system system. Error: ' . $documentException->getMessage(), [
66+
'exception' => $documentException,
67+
]);
68+
}
69+
70+
if ($aggregate !== null && $aggregate::class === $aggregateClassName) {
71+
$aggregateVersion = $this->getAggregateVersion($aggregate);
72+
Assert::isTrue($aggregateVersion > 0, sprintf('Serialization for snapshot of %s is set incorrectly, it does not serialize aggregate version', $aggregate::class));
73+
} elseif ($aggregate !== null) {
74+
$this->logger->error("Snapshot for aggregate {$aggregateClassName} was found, but it is not instance of {$aggregateClassName}. It is type of " . gettype($aggregate) . '. Snapshot ignored to self-heal system.');
75+
$aggregate = null;
76+
}
77+
}
78+
}
79+
80+
$eventStream = $this->eventSourcedRepository->findBy($aggregateClassName, $identifiers, $aggregate === null ? 1 : ($aggregateVersion + 1));
81+
82+
if ($eventStream->getEvents()) {
83+
$aggregateVersion = $eventStream->getAggregateVersion();
84+
$aggregate = $this->eventSourcingHandlerExecutor->fillFor($aggregateClassName, $aggregate, $eventStream->getEvents());
85+
}
86+
87+
if (! $aggregate) {
88+
return null;
89+
}
90+
91+
$aggregateClassDefinition = $this->aggregateDefinitionRegistry->getFor($aggregateClassName);
92+
if (! is_null($aggregateVersion) && $aggregateClassDefinition->isAggregateVersionAutomaticallyIncreased()) {
93+
$this->propertyEditorAccessor->enrichDataWith(
94+
PropertyPath::createWith($aggregateClassDefinition->getAggregateVersionProperty()), $aggregate, $aggregateVersion, GenericMessage::createWithEmptyHeaders(""), null);
95+
}
96+
97+
return new ResolvedAggregate(
98+
$aggregateClassDefinition,
99+
false,
100+
$aggregate,
101+
$aggregateVersion,
102+
$identifiers,
103+
[],
104+
);
105+
}
106+
107+
public function save(ResolvedAggregate $aggregate, array $metadata): void
108+
{
109+
$version = $aggregate->getVersionBeforeHandling();
110+
111+
if ($this->eventSourcingConfiguration->useSnapshotFor($aggregate->getAggregateClassName())) {
112+
$snapshotTriggerThreshold = $this->eventSourcingConfiguration->getSnapshotTriggerThresholdFor($aggregate->getAggregateClassName());
113+
foreach ($aggregate->getEvents() as $event) {
114+
$version += 1;
115+
if ($version % $snapshotTriggerThreshold === 0) {
116+
$documentStore = $this->container->get(
117+
$this->eventSourcingConfiguration->getDocumentStoreReferenceFor($aggregate->getAggregateClassName())
118+
);
119+
$documentStore->upsertDocument(self::getSnapshotCollectionName($aggregate->getAggregateClassName()), self::getSnapshotDocumentId($aggregate->getIdentifiers()), $aggregate->getAggregateInstance());
120+
}
121+
}
122+
}
123+
124+
$this->eventSourcedRepository->save(
125+
$aggregate->getIdentifiers(),
126+
$aggregate->getAggregateClassDefinition()->getClassName(),
127+
$aggregate->getEvents(),
128+
$metadata,
129+
$aggregate->getVersionBeforeHandling());
130+
}
131+
132+
private function getAggregateVersion(object|array|string $aggregate): mixed
133+
{
134+
$propertyReader = new PropertyReaderAccessor();
135+
$versionAnnotation = TypeDescriptor::create(AggregateVersion::class);
136+
$aggregateVersionPropertyName = null;
137+
foreach (ClassDefinition::createFor(TypeDescriptor::createFromVariable($aggregate))->getProperties() as $property) {
138+
if ($property->hasAnnotation($versionAnnotation)) {
139+
$aggregateVersionPropertyName = $property->getName();
140+
break;
141+
}
142+
}
143+
144+
return $propertyReader->getPropertyValue(
145+
PropertyPath::createWith($aggregateVersionPropertyName),
146+
$aggregate
147+
);
148+
}
149+
150+
private static function getSnapshotCollectionName(string $aggregateClassname): string
151+
{
152+
return self::SNAPSHOT_COLLECTION . $aggregateClassname;
153+
}
154+
155+
private static function getSnapshotDocumentId(array $identifiers): string
156+
{
157+
return count($identifiers) === 1 ? (string)reset($identifiers) : json_encode($identifiers);
158+
}
159+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
<?php
2+
/*
3+
* licence Apache-2.0
4+
*/
5+
declare(strict_types=1);
6+
7+
namespace Ecotone\EventSourcing;
8+
9+
use Ecotone\Messaging\Config\Container\Definition;
10+
use Ecotone\Messaging\Config\Container\Reference;
11+
use Ecotone\Messaging\Handler\Enricher\PropertyEditorAccessor;
12+
use Ecotone\Messaging\Handler\Logger\LoggingGateway;
13+
use Ecotone\Modelling\AggregateFlow\SaveAggregate\AggregateResolver\AggregateDefinitionRegistry;
14+
use Ecotone\Modelling\BaseEventSourcingConfiguration;
15+
use Ecotone\Modelling\EventSourcedRepository;
16+
use Ecotone\Modelling\EventSourcingExecutor\GroupedEventSourcingExecutor;
17+
use Ecotone\Modelling\Repository\AggregateRepositoryBuilder;
18+
use Psr\Container\ContainerInterface;
19+
20+
class EventSourcedRepositoryAdapterBuilder implements AggregateRepositoryBuilder
21+
{
22+
public function __construct(private BaseEventSourcingConfiguration $baseEventSourcingConfiguration)
23+
{
24+
}
25+
26+
public function canHandle(string $repositoryClassName): bool
27+
{
28+
return \is_a($repositoryClassName, EventSourcedRepository::class, true);
29+
}
30+
31+
public function compile(string $referenceId, bool $isDefault): Definition|Reference
32+
{
33+
return new Definition(EventSourcedRepositoryAdapter::class, [
34+
new Reference($referenceId),
35+
new Reference(AggregateDefinitionRegistry::class),
36+
$this->baseEventSourcingConfiguration,
37+
new Reference(GroupedEventSourcingExecutor::class),
38+
new Reference(ContainerInterface::class),
39+
new Reference(PropertyEditorAccessor::class),
40+
$isDefault,
41+
new Reference(LoggingGateway::class),
42+
]);
43+
}
44+
}

packages/Ecotone/src/Lite/Test/Configuration/InMemoryRepositoryBuilder.php

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,6 @@ public function canHandle(string $aggregateClassName): bool
5353
return isset($this->aggregateClassNames[$aggregateClassName]);
5454
}
5555

56-
public function isEventSourced(): bool
57-
{
58-
return $this->isEventSourced;
59-
}
60-
6156
public function compile(MessagingContainerBuilder $builder): Definition
6257
{
6358
return match ($this->isEventSourced) {

packages/Ecotone/src/Messaging/Config/Container/Definition.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public function getMethodCalls(): array
6565
return $this->methodCalls;
6666
}
6767

68-
public function compile(MessagingContainerBuilder $builder): Definition|Reference
68+
public function compile(MessagingContainerBuilder $builder): self
6969
{
7070
return $this;
7171
}

packages/Ecotone/src/Messaging/Config/Container/Reference.php

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
/**
99
* licence Apache-2.0
1010
*/
11-
class Reference
11+
class Reference implements CompilableBuilder
1212
{
1313
public function __construct(protected string $id, protected int $invalidBehavior = ContainerImplementation::EXCEPTION_ON_INVALID_REFERENCE)
1414
{
@@ -43,6 +43,11 @@ public function getInvalidBehavior(): int
4343
return $this->invalidBehavior;
4444
}
4545

46+
public function compile(MessagingContainerBuilder $builder): self
47+
{
48+
return $this;
49+
}
50+
4651
public function __toString(): string
4752
{
4853
return $this->id;

packages/Ecotone/src/Messaging/Config/ModuleClassList.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
use Ecotone\Messaging\Handler\Logger\Config\MessageHandlerLogger;
4848
use Ecotone\Modelling\CommandBus;
4949
use Ecotone\Modelling\Config\AggregrateModule;
50+
use Ecotone\Modelling\Config\EventSourcedRepositoryModule;
5051
use Ecotone\Modelling\Config\InstantRetry\InstantRetryModule;
5152
use Ecotone\Modelling\Config\MessageHandlerRoutingModule;
5253
use Ecotone\Modelling\Config\ServiceHandlerModule;
@@ -94,6 +95,7 @@ class ModuleClassList
9495
MessageConsumerModule::class,
9596
InstantRetryModule::class,
9697
DynamicMessageChannelModule::class,
98+
EventSourcedRepositoryModule::class,
9799

98100
/** Attribute based configurations */
99101
MessageHeadersPropagatorInterceptor::class,

packages/Ecotone/src/Messaging/Handler/Processor/MethodInvoker/MethodInvokerAggregateObjectResolver.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,18 @@
77
namespace Ecotone\Messaging\Handler\Processor\MethodInvoker;
88

99
use Ecotone\Messaging\Message;
10+
use Ecotone\Modelling\AggregateFlow\SaveAggregate\AggregateResolver\ResolvedAggregate;
1011
use Ecotone\Modelling\AggregateMessage;
1112

1213
class MethodInvokerAggregateObjectResolver implements MethodInvokerObjectResolver
1314
{
1415
public function resolveFor(Message $message): object
1516
{
16-
return $message->getHeaders()->get(AggregateMessage::CALLED_AGGREGATE_INSTANCE);
17+
$aggregate = $message->getHeaders()->get(AggregateMessage::CALLED_AGGREGATE_INSTANCE);
18+
if ($aggregate instanceof ResolvedAggregate) {
19+
return $aggregate->getAggregateInstance();
20+
} else {
21+
return $aggregate;
22+
}
1723
}
1824
}

packages/Ecotone/src/Modelling/AggregateFlow/CallAggregate/CallAggregateResultToMessageConverter.php

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
use Ecotone\Messaging\Handler\TypeDescriptor;
1313
use Ecotone\Messaging\Message;
1414
use Ecotone\Messaging\Support\MessageBuilder;
15-
use Ecotone\Modelling\AggregateFlow\SaveAggregate\AggregateResolver\AggregateDefinitionRegistry;
1615
use Ecotone\Modelling\AggregateMessage;
1716

1817
/**
@@ -21,8 +20,6 @@
2120
final class CallAggregateResultToMessageConverter implements ResultToMessageConverter
2221
{
2322
public function __construct(
24-
private AggregateDefinitionRegistry $aggregateDefinitionRegistry,
25-
private string $aggregateClass,
2623
private ?Type $returnType,
2724
private PropertyReaderAccessor $propertyReaderAccessor,
2825
private bool $isCommandHandler,
@@ -44,10 +41,6 @@ public function convertToMessage(Message $requestMessage, mixed $result): ?Messa
4441
$calledAggregate = $requestMessage->getHeaders()->containsKey(AggregateMessage::CALLED_AGGREGATE_INSTANCE) ? $requestMessage->getHeaders()->get(AggregateMessage::CALLED_AGGREGATE_INSTANCE) : null;
4542
$versionBeforeHandling = $requestMessage->getHeaders()->containsKey(AggregateMessage::TARGET_VERSION) ? $requestMessage->getHeaders()->get(AggregateMessage::TARGET_VERSION) : null;
4643

47-
if ($result === null && $this->aggregateDefinitionRegistry->getFor(TypeDescriptor::create($this->aggregateClass))->isPureEventSourcedAggregate()) {
48-
$result = [];
49-
}
50-
5144
if (is_null($versionBeforeHandling) && $this->aggregateVersionProperty) {
5245
if ($this->isFactoryMethod) {
5346
$versionBeforeHandling = 0;

0 commit comments

Comments
 (0)