Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@ public function canHandle(string $aggregateClassName): bool
return true;
}

public function isEventSourced(): bool
{
return false;
}

public function compile(MessagingContainerBuilder $builder): Definition
{
return new Definition(DocumentStoreAggregateRepository::class, [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@ public function canHandle(string $aggregateClassName): bool
return in_array($aggregateClassName, $this->dbalConfiguration->getDoctrineORMClasses());
}

public function isEventSourced(): bool
{
return false;
}

public function compile(MessagingContainerBuilder $builder): Definition
{
return new Definition(ManagerRegistryRepository::class, [
Expand Down
159 changes: 159 additions & 0 deletions packages/Ecotone/src/EventSourcing/EventSourcedRepositoryAdapter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
<?php
/*
* licence Apache-2.0
*/
declare(strict_types=1);

namespace Ecotone\EventSourcing;

use Ecotone\Messaging\Handler\ClassDefinition;
use Ecotone\Messaging\Handler\Enricher\PropertyEditorAccessor;
use Ecotone\Messaging\Handler\Enricher\PropertyPath;
use Ecotone\Messaging\Handler\Enricher\PropertyReaderAccessor;
use Ecotone\Messaging\Handler\TypeDescriptor;
use Ecotone\Messaging\Store\Document\DocumentException;
use Ecotone\Messaging\Store\Document\DocumentStore;
use Ecotone\Messaging\Support\Assert;
use Ecotone\Messaging\Support\GenericMessage;
use Ecotone\Modelling\AggregateFlow\SaveAggregate\AggregateResolver\AggregateDefinitionRegistry;
use Ecotone\Modelling\AggregateFlow\SaveAggregate\AggregateResolver\ResolvedAggregate;
use Ecotone\Modelling\Attribute\AggregateVersion;
use Ecotone\Modelling\BaseEventSourcingConfiguration;
use Ecotone\Modelling\EventSourcedRepository;
use Ecotone\Modelling\EventSourcingExecutor\GroupedEventSourcingExecutor;
use Ecotone\Modelling\Repository\AggregateRepository;
use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;

class EventSourcedRepositoryAdapter implements AggregateRepository
{
public const SNAPSHOT_COLLECTION = 'aggregate_snapshots_';

public function __construct(
private EventSourcedRepository $eventSourcedRepository,
private AggregateDefinitionRegistry $aggregateDefinitionRegistry,
private BaseEventSourcingConfiguration $eventSourcingConfiguration,
private GroupedEventSourcingExecutor $eventSourcingHandlerExecutor,
private ContainerInterface $container,
private PropertyEditorAccessor $propertyEditorAccessor,
private bool $isDefaultRepository,
private LoggerInterface $logger,
) {
}

public function canHandle(string $aggregateClassName): bool
{
if ($this->isDefaultRepository && $this->aggregateDefinitionRegistry->has($aggregateClassName)) {
$aggregateDefinition = $this->aggregateDefinitionRegistry->getFor($aggregateClassName);
return $aggregateDefinition->isEventSourced();
}
return $this->eventSourcedRepository->canHandle($aggregateClassName);
}

public function findBy(string $aggregateClassName, array $identifiers): ?ResolvedAggregate
{
$aggregate = null;
$aggregateVersion = null;
foreach ($this->eventSourcingConfiguration->getSnapshotsConfig() as $aggregateClass => $config) {
if ($aggregateClass === $aggregateClassName) {
/** @var DocumentStore $documentStore */
$documentStore = $this->container->get($config['documentStore']);

try {
$aggregate = $documentStore->findDocument(self::getSnapshotCollectionName($aggregateClassName), self::getSnapshotDocumentId($identifiers));
} catch (DocumentException $documentException) {
$this->logger->error("Failure during loading snapshot for aggregate {$aggregateClassName} with identifiers " . json_encode($identifiers) . '. Snapshot ignored to self system system. Error: ' . $documentException->getMessage(), [
'exception' => $documentException,
]);
}

if ($aggregate !== null && $aggregate::class === $aggregateClassName) {
$aggregateVersion = $this->getAggregateVersion($aggregate);
Assert::isTrue($aggregateVersion > 0, sprintf('Serialization for snapshot of %s is set incorrectly, it does not serialize aggregate version', $aggregate::class));
} elseif ($aggregate !== null) {
$this->logger->error("Snapshot for aggregate {$aggregateClassName} was found, but it is not instance of {$aggregateClassName}. It is type of " . gettype($aggregate) . '. Snapshot ignored to self-heal system.');
$aggregate = null;
}
}
}

$eventStream = $this->eventSourcedRepository->findBy($aggregateClassName, $identifiers, $aggregate === null ? 1 : ($aggregateVersion + 1));

if ($eventStream->getEvents()) {
$aggregateVersion = $eventStream->getAggregateVersion();
$aggregate = $this->eventSourcingHandlerExecutor->fillFor($aggregateClassName, $aggregate, $eventStream->getEvents());
}

if (! $aggregate) {
return null;
}

$aggregateClassDefinition = $this->aggregateDefinitionRegistry->getFor($aggregateClassName);
if (! is_null($aggregateVersion) && $aggregateClassDefinition->isAggregateVersionAutomaticallyIncreased()) {
$this->propertyEditorAccessor->enrichDataWith(
PropertyPath::createWith($aggregateClassDefinition->getAggregateVersionProperty()), $aggregate, $aggregateVersion, GenericMessage::createWithEmptyHeaders(""), null);
}

return new ResolvedAggregate(
$aggregateClassDefinition,
false,
$aggregate,
$aggregateVersion,
$identifiers,
[],
);
}

public function save(ResolvedAggregate $aggregate, array $metadata): void
{
$version = $aggregate->getVersionBeforeHandling();

if ($this->eventSourcingConfiguration->useSnapshotFor($aggregate->getAggregateClassName())) {
$snapshotTriggerThreshold = $this->eventSourcingConfiguration->getSnapshotTriggerThresholdFor($aggregate->getAggregateClassName());
foreach ($aggregate->getEvents() as $event) {
$version += 1;
if ($version % $snapshotTriggerThreshold === 0) {
$documentStore = $this->container->get(
$this->eventSourcingConfiguration->getDocumentStoreReferenceFor($aggregate->getAggregateClassName())
);
$documentStore->upsertDocument(self::getSnapshotCollectionName($aggregate->getAggregateClassName()), self::getSnapshotDocumentId($aggregate->getIdentifiers()), $aggregate->getAggregateInstance());
}
}
}

$this->eventSourcedRepository->save(
$aggregate->getIdentifiers(),
$aggregate->getAggregateClassDefinition()->getClassName(),
$aggregate->getEvents(),
$metadata,
$aggregate->getVersionBeforeHandling());
}

private function getAggregateVersion(object|array|string $aggregate): mixed
{
$propertyReader = new PropertyReaderAccessor();
$versionAnnotation = TypeDescriptor::create(AggregateVersion::class);
$aggregateVersionPropertyName = null;
foreach (ClassDefinition::createFor(TypeDescriptor::createFromVariable($aggregate))->getProperties() as $property) {
if ($property->hasAnnotation($versionAnnotation)) {
$aggregateVersionPropertyName = $property->getName();
break;
}
}

return $propertyReader->getPropertyValue(
PropertyPath::createWith($aggregateVersionPropertyName),
$aggregate
);
}

private static function getSnapshotCollectionName(string $aggregateClassname): string
{
return self::SNAPSHOT_COLLECTION . $aggregateClassname;
}

private static function getSnapshotDocumentId(array $identifiers): string
{
return count($identifiers) === 1 ? (string)reset($identifiers) : json_encode($identifiers);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<?php
/*
* licence Apache-2.0
*/
declare(strict_types=1);

namespace Ecotone\EventSourcing;

use Ecotone\Messaging\Config\Container\Definition;
use Ecotone\Messaging\Config\Container\Reference;
use Ecotone\Messaging\Handler\Enricher\PropertyEditorAccessor;
use Ecotone\Messaging\Handler\Logger\LoggingGateway;
use Ecotone\Modelling\AggregateFlow\SaveAggregate\AggregateResolver\AggregateDefinitionRegistry;
use Ecotone\Modelling\BaseEventSourcingConfiguration;
use Ecotone\Modelling\EventSourcedRepository;
use Ecotone\Modelling\EventSourcingExecutor\GroupedEventSourcingExecutor;
use Ecotone\Modelling\Repository\AggregateRepositoryBuilder;
use Psr\Container\ContainerInterface;

class EventSourcedRepositoryAdapterBuilder implements AggregateRepositoryBuilder
{
public function __construct(private BaseEventSourcingConfiguration $baseEventSourcingConfiguration)
{
}

public function canHandle(string $repositoryClassName): bool
{
return \is_a($repositoryClassName, EventSourcedRepository::class, true);
}

public function compile(string $referenceId, bool $isDefault): Definition|Reference
{
return new Definition(EventSourcedRepositoryAdapter::class, [
new Reference($referenceId),
new Reference(AggregateDefinitionRegistry::class),
$this->baseEventSourcingConfiguration,
new Reference(GroupedEventSourcingExecutor::class),
new Reference(ContainerInterface::class),
new Reference(PropertyEditorAccessor::class),
$isDefault,
new Reference(LoggingGateway::class),
]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,6 @@ public function canHandle(string $aggregateClassName): bool
return isset($this->aggregateClassNames[$aggregateClassName]);
}

public function isEventSourced(): bool
{
return $this->isEventSourced;
}

public function compile(MessagingContainerBuilder $builder): Definition
{
return match ($this->isEventSourced) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public function getMethodCalls(): array
return $this->methodCalls;
}

public function compile(MessagingContainerBuilder $builder): Definition|Reference
public function compile(MessagingContainerBuilder $builder): self
{
return $this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
/**
* licence Apache-2.0
*/
class Reference
class Reference implements CompilableBuilder
{
public function __construct(protected string $id, protected int $invalidBehavior = ContainerImplementation::EXCEPTION_ON_INVALID_REFERENCE)
{
Expand Down Expand Up @@ -43,6 +43,11 @@ public function getInvalidBehavior(): int
return $this->invalidBehavior;
}

public function compile(MessagingContainerBuilder $builder): self
{
return $this;
}

public function __toString(): string
{
return $this->id;
Expand Down
2 changes: 2 additions & 0 deletions packages/Ecotone/src/Messaging/Config/ModuleClassList.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
use Ecotone\Messaging\Handler\Logger\Config\MessageHandlerLogger;
use Ecotone\Modelling\CommandBus;
use Ecotone\Modelling\Config\AggregrateModule;
use Ecotone\Modelling\Config\EventSourcedRepositoryModule;
use Ecotone\Modelling\Config\InstantRetry\InstantRetryModule;
use Ecotone\Modelling\Config\MessageHandlerRoutingModule;
use Ecotone\Modelling\Config\ServiceHandlerModule;
Expand Down Expand Up @@ -94,6 +95,7 @@ class ModuleClassList
MessageConsumerModule::class,
InstantRetryModule::class,
DynamicMessageChannelModule::class,
EventSourcedRepositoryModule::class,

/** Attribute based configurations */
MessageHeadersPropagatorInterceptor::class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,18 @@
namespace Ecotone\Messaging\Handler\Processor\MethodInvoker;

use Ecotone\Messaging\Message;
use Ecotone\Modelling\AggregateFlow\SaveAggregate\AggregateResolver\ResolvedAggregate;
use Ecotone\Modelling\AggregateMessage;

class MethodInvokerAggregateObjectResolver implements MethodInvokerObjectResolver
{
public function resolveFor(Message $message): object
{
return $message->getHeaders()->get(AggregateMessage::CALLED_AGGREGATE_INSTANCE);
$aggregate = $message->getHeaders()->get(AggregateMessage::CALLED_AGGREGATE_INSTANCE);
if ($aggregate instanceof ResolvedAggregate) {
return $aggregate->getAggregateInstance();
} else {
return $aggregate;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
use Ecotone\Messaging\Handler\TypeDescriptor;
use Ecotone\Messaging\Message;
use Ecotone\Messaging\Support\MessageBuilder;
use Ecotone\Modelling\AggregateFlow\SaveAggregate\AggregateResolver\AggregateDefinitionRegistry;
use Ecotone\Modelling\AggregateMessage;

/**
Expand All @@ -21,8 +20,6 @@
final class CallAggregateResultToMessageConverter implements ResultToMessageConverter
{
public function __construct(
private AggregateDefinitionRegistry $aggregateDefinitionRegistry,
private string $aggregateClass,
private ?Type $returnType,
private PropertyReaderAccessor $propertyReaderAccessor,
private bool $isCommandHandler,
Expand All @@ -44,10 +41,6 @@ public function convertToMessage(Message $requestMessage, mixed $result): ?Messa
$calledAggregate = $requestMessage->getHeaders()->containsKey(AggregateMessage::CALLED_AGGREGATE_INSTANCE) ? $requestMessage->getHeaders()->get(AggregateMessage::CALLED_AGGREGATE_INSTANCE) : null;
$versionBeforeHandling = $requestMessage->getHeaders()->containsKey(AggregateMessage::TARGET_VERSION) ? $requestMessage->getHeaders()->get(AggregateMessage::TARGET_VERSION) : null;

if ($result === null && $this->aggregateDefinitionRegistry->getFor(TypeDescriptor::create($this->aggregateClass))->isPureEventSourcedAggregate()) {
$result = [];
}

if (is_null($versionBeforeHandling) && $this->aggregateVersionProperty) {
if ($this->isFactoryMethod) {
$versionBeforeHandling = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
use Ecotone\Messaging\Handler\Processor\MethodInvoker\MethodInvokerBuilder;
use Ecotone\Messaging\Handler\TypeDescriptor;
use Ecotone\Messaging\Support\Assert;
use Ecotone\Modelling\AggregateFlow\SaveAggregate\AggregateResolver\AggregateDefinitionRegistry;
use Ecotone\Modelling\Attribute\AggregateVersion;
use Ecotone\Modelling\Attribute\EventSourcingAggregate;
use Ecotone\Modelling\Attribute\EventSourcingSaga;
Expand Down Expand Up @@ -120,8 +119,6 @@ public function compile(MessagingContainerBuilder $builder, array $aroundInterce
)
->withResultToMessageConverter(
new Definition(CallAggregateResultToMessageConverter::class, [
Reference::to(AggregateDefinitionRegistry::class),
$this->interfaceToCall->getInterfaceName(),
$this->interfaceToCall->getReturnType(),
new Reference(PropertyReaderAccessor::class),
$this->isCommandHandler,
Expand Down
Loading
Loading