From c93473e816c2b1608d754819f187e6a6e131e82e Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Tue, 6 Jan 2026 09:15:16 +0100 Subject: [PATCH 01/10] feat: async batched projection backfills --- Monorepo/Benchmark/ProjectingBenchmark.php | 2 +- .../Ecotone/src/Lite/Test/FlowTestSupport.php | 2 +- .../Projecting/BackfillExecutorHandler.php | 46 +++++++++++++++ .../Config/ProjectingConsoleCommands.php | 2 +- .../Projecting/Config/ProjectingModule.php | 35 +++++++++++- .../src/Projecting/NullPartitionProvider.php | 16 ------ .../src/Projecting/PartitionProvider.php | 18 +++++- .../src/Projecting/ProjectingManager.php | 57 +++++++++++++++++-- .../Projecting/SinglePartitionProvider.php | 25 ++++++++ .../AggregateIdPartitionProvider.php | 43 +++++++++++++- 10 files changed, 217 insertions(+), 29 deletions(-) create mode 100644 packages/Ecotone/src/Projecting/BackfillExecutorHandler.php delete mode 100644 packages/Ecotone/src/Projecting/NullPartitionProvider.php create mode 100644 packages/Ecotone/src/Projecting/SinglePartitionProvider.php diff --git a/Monorepo/Benchmark/ProjectingBenchmark.php b/Monorepo/Benchmark/ProjectingBenchmark.php index 840e1a8b0..46137c926 100644 --- a/Monorepo/Benchmark/ProjectingBenchmark.php +++ b/Monorepo/Benchmark/ProjectingBenchmark.php @@ -174,7 +174,7 @@ public function bench_ecotone_projection_backfill(): void Assert::assertEquals([], self::$ecotone->getQueryBus()->sendWithRouting('product.getPriceChange', self::$expectedProductIds[0]) ); - $projectionManager->backfill(); + $projectionManager->prepareBackfill(); Assert::assertEquals([ new PriceChange(100, 0), new PriceChange(120, 20), diff --git a/packages/Ecotone/src/Lite/Test/FlowTestSupport.php b/packages/Ecotone/src/Lite/Test/FlowTestSupport.php index 50aa9a5d7..ca6ab582e 100644 --- a/packages/Ecotone/src/Lite/Test/FlowTestSupport.php +++ b/packages/Ecotone/src/Lite/Test/FlowTestSupport.php @@ -237,7 +237,7 @@ public function triggerProjection(string|array $projectionName): self foreach ($projectionName as $name) { if ($this->getGateway(ProjectionRegistry::class)->has($name)) { - $this->getGateway(ProjectionRegistry::class)->get($name)->backfill(); + $this->getGateway(ProjectionRegistry::class)->get($name)->prepareBackfill(); } else { $this->getGateway(ProjectionManager::class)->triggerProjection($name); } diff --git a/packages/Ecotone/src/Projecting/BackfillExecutorHandler.php b/packages/Ecotone/src/Projecting/BackfillExecutorHandler.php new file mode 100644 index 000000000..22c85fd82 --- /dev/null +++ b/packages/Ecotone/src/Projecting/BackfillExecutorHandler.php @@ -0,0 +1,46 @@ +projectionRegistry->get($projectionName); + + foreach ($projectingManager->getPartitionProvider()->partitions($limit, $offset) as $partition) { + $projectingManager->execute($partition, true); + if ($this->terminationListener->shouldTerminate()) { + break; + } + } + } +} + diff --git a/packages/Ecotone/src/Projecting/Config/ProjectingConsoleCommands.php b/packages/Ecotone/src/Projecting/Config/ProjectingConsoleCommands.php index 62d48c85c..a90ad9951 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectingConsoleCommands.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectingConsoleCommands.php @@ -42,7 +42,7 @@ public function backfillProjection(string $name): void if (! $this->registry->has($name)) { throw new InvalidArgumentException("There is no projection with name {$name}"); } - $this->registry->get($name)->backfill(); + $this->registry->get($name)->prepareBackfill(); } #[ConsoleCommand('ecotone:projection:delete')] diff --git a/packages/Ecotone/src/Projecting/Config/ProjectingModule.php b/packages/Ecotone/src/Projecting/Config/ProjectingModule.php index 6569de0d4..3fc525165 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectingModule.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectingModule.php @@ -20,13 +20,16 @@ use Ecotone\Messaging\Config\ModuleReferenceSearchService; use Ecotone\Messaging\Config\ServiceConfiguration; use Ecotone\Messaging\Endpoint\Interceptor\TerminationListener; +use Ecotone\Messaging\Gateway\MessagingEntrypoint; use Ecotone\Messaging\Handler\InterfaceToCallRegistry; use Ecotone\Messaging\Handler\Processor\MethodInvoker\Converter\HeaderBuilder; +use Ecotone\Messaging\Handler\Processor\MethodInvoker\Converter\PayloadBuilder; use Ecotone\Messaging\Handler\Processor\MethodInvoker\Converter\ValueBuilder; use Ecotone\Messaging\Handler\Processor\MethodInvoker\MethodInvokerBuilder; use Ecotone\Messaging\Handler\ServiceActivator\MessageProcessorActivatorBuilder; +use Ecotone\Projecting\BackfillExecutorHandler; use Ecotone\Projecting\InMemory\InMemoryProjectionRegistry; -use Ecotone\Projecting\NullPartitionProvider; +use Ecotone\Projecting\SinglePartitionProvider; use Ecotone\Projecting\PartitionProvider; use Ecotone\Projecting\ProjectingHeaders; use Ecotone\Projecting\ProjectingManager; @@ -97,11 +100,13 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO $components[$projectionName][ProjectionStateStorage::class] ?? throw ConfigurationException::create("Projection with name {$projectionName} does not have projection state storage configured. Please check your configuration."), new Reference($reference), $components[$projectionName][StreamSource::class] ?? throw ConfigurationException::create("Projection with name {$projectionName} does not have stream source configured. Please check your configuration."), - $components[$projectionName][PartitionProvider::class] ?? new Definition(NullPartitionProvider::class), + $components[$projectionName][PartitionProvider::class] ?? new Definition(SinglePartitionProvider::class), $projectionName, new Reference(TerminationListener::class), + new Reference(MessagingEntrypoint::class), $projectionBuilder->batchSize(), // batchSize $projectionBuilder->automaticInitialization(), + 100, // backfillBatchSize - default value, can be made configurable via #[Backfill] attribute ]) ); $projectionRegistryMap[$projectionName] = new Reference($projectingManagerReference); @@ -142,6 +147,32 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO new Definition(InMemoryProjectionRegistry::class, [$projectionRegistryMap]) ); + // Register BackfillExecutorHandler and its message handler + $messagingConfiguration->registerServiceDefinition( + BackfillExecutorHandler::class, + new Definition(BackfillExecutorHandler::class, [ + new Reference(ProjectionRegistry::class), + new Reference(TerminationListener::class), + ]) + ); + + $messagingConfiguration->registerMessageHandler( + MessageProcessorActivatorBuilder::create() + ->chainInterceptedProcessor( + MethodInvokerBuilder::create( + BackfillExecutorHandler::class, + InterfaceToCallReference::create(BackfillExecutorHandler::class, 'executeBackfillBatch'), + [ + PayloadBuilder::create('projectionName'), + HeaderBuilder::createOptional('limit', 'backfill.limit'), + HeaderBuilder::createOptional('offset', 'backfill.offset'), + ], + ) + ) + ->withEndpointId('backfill_executor_handler') + ->withInputChannelName(BackfillExecutorHandler::BACKFILL_EXECUTOR_CHANNEL) + ); + // Register console commands $messagingConfiguration->registerServiceDefinition(ProjectingConsoleCommands::class, new Definition(ProjectingConsoleCommands::class, [new Reference(ProjectionRegistry::class)])); } diff --git a/packages/Ecotone/src/Projecting/NullPartitionProvider.php b/packages/Ecotone/src/Projecting/NullPartitionProvider.php deleted file mode 100644 index 97ae79d3b..000000000 --- a/packages/Ecotone/src/Projecting/NullPartitionProvider.php +++ /dev/null @@ -1,16 +0,0 @@ - Partition keys + */ + public function partitions(?int $limit = null, int $offset = 0): iterable; } diff --git a/packages/Ecotone/src/Projecting/ProjectingManager.php b/packages/Ecotone/src/Projecting/ProjectingManager.php index 769c38c48..89d770f48 100644 --- a/packages/Ecotone/src/Projecting/ProjectingManager.php +++ b/packages/Ecotone/src/Projecting/ProjectingManager.php @@ -8,11 +8,14 @@ namespace Ecotone\Projecting; use Ecotone\Messaging\Endpoint\Interceptor\TerminationListener; +use Ecotone\Messaging\Gateway\MessagingEntrypoint; use InvalidArgumentException; use Throwable; class ProjectingManager { + private const DEFAULT_BACKFILL_BATCH_SIZE = 100; + public function __construct( private ProjectionStateStorage $projectionStateStorage, private ProjectorExecutor $projectorExecutor, @@ -20,12 +23,17 @@ public function __construct( private PartitionProvider $partitionProvider, private string $projectionName, private TerminationListener $terminationListener, + private MessagingEntrypoint $messagingEntrypoint, private int $batchSize = 1000, private bool $automaticInitialization = true, + private int $backfillBatchSize = self::DEFAULT_BACKFILL_BATCH_SIZE, ) { if ($batchSize < 1) { throw new InvalidArgumentException('Batch size must be at least 1'); } + if ($backfillBatchSize < 1) { + throw new InvalidArgumentException('Backfill batch size must be at least 1'); + } } public function execute(?string $partitionKeyValue = null, bool $manualInitialization = false): void @@ -83,6 +91,16 @@ public function loadState(?string $partitionKey = null): ProjectionPartitionStat return $this->projectionStateStorage->loadPartition($this->projectionName, $partitionKey); } + public function getPartitionProvider(): PartitionProvider + { + return $this->partitionProvider; + } + + public function getProjectionName(): string + { + return $this->projectionName; + } + public function init(): void { $this->projectionStateStorage->init($this->projectionName); @@ -97,16 +115,43 @@ public function delete(): void $this->projectorExecutor->delete(); } - public function backfill(): void + /** + * Prepares backfill by calculating batches and sending messages to BackfillExecutorHandler. + * Each batch message contains a limit and offset for processing a subset of partitions. + * This enables the backfill to be executed synchronously or asynchronously depending on configuration. + */ + public function prepareBackfill(): void { - foreach ($this->partitionProvider->partitions() as $partition) { - $this->execute($partition, true); - if ($this->terminationListener->shouldTerminate()) { - break; - } + $totalPartitions = $this->partitionProvider->count(); + + if ($totalPartitions === 0) { + return; + } + + $numberOfBatches = (int) ceil($totalPartitions / $this->backfillBatchSize); + + for ($batch = 0; $batch < $numberOfBatches; $batch++) { + $offset = $batch * $this->backfillBatchSize; + + $this->messagingEntrypoint->sendWithHeaders( + $this->projectionName, + [ + 'backfill.limit' => $this->backfillBatchSize, + 'backfill.offset' => $offset, + ], + BackfillExecutorHandler::BACKFILL_EXECUTOR_CHANNEL + ); } } + /** + * @deprecated Use prepareBackfill() instead. This method is kept for backward compatibility. + */ + public function backfill(): void + { + $this->prepareBackfill(); + } + private function loadOrInitializePartitionState(?string $partitionKey, bool $canInitialize): ?ProjectionPartitionState { $projectionState = $this->projectionStateStorage->loadPartition($this->projectionName, $partitionKey); diff --git a/packages/Ecotone/src/Projecting/SinglePartitionProvider.php b/packages/Ecotone/src/Projecting/SinglePartitionProvider.php new file mode 100644 index 000000000..8ac349cc1 --- /dev/null +++ b/packages/Ecotone/src/Projecting/SinglePartitionProvider.php @@ -0,0 +1,25 @@ += 1)) { + yield null; + } + } +} diff --git a/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProvider.php b/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProvider.php index 28272c69c..5b9dcd858 100644 --- a/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProvider.php +++ b/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProvider.php @@ -27,7 +27,7 @@ public function __construct( ) { } - public function partitions(): iterable + public function count(): int { $connection = $this->getConnection(); $platform = $connection->getDatabasePlatform(); @@ -35,6 +35,43 @@ public function partitions(): iterable // Resolve table name at runtime using the provider $streamTable = $this->tableNameProvider->generateTableNameForStream($this->streamName); + // Build platform-specific count query + if ($platform instanceof PostgreSQLPlatform) { + // PostgreSQL: Use JSONB operators + $result = $connection->executeQuery(<<>'_aggregate_id') + FROM {$streamTable} + WHERE metadata->>'_aggregate_type' = ? + SQL, [$this->aggregateType]); + } elseif ($platform instanceof MySQLPlatform || $platform instanceof MariaDBPlatform) { + // MySQL/MariaDB: Use generated indexed columns for better performance + $result = $connection->executeQuery(<<aggregateType]); + } else { + throw new RuntimeException('Unsupported database platform: ' . get_class($platform)); + } + + return (int) $result->fetchOne(); + } + + public function partitions(?int $limit = null, int $offset = 0): iterable + { + $connection = $this->getConnection(); + $platform = $connection->getDatabasePlatform(); + + // Resolve table name at runtime using the provider + $streamTable = $this->tableNameProvider->generateTableNameForStream($this->streamName); + + // Build pagination clause + $limitClause = ''; + if ($limit !== null) { + $limitClause = " LIMIT {$limit}"; + } + $offsetClause = $offset > 0 ? " OFFSET {$offset}" : ''; + // Build platform-specific query if ($platform instanceof PostgreSQLPlatform) { // PostgreSQL: Use JSONB operators @@ -42,6 +79,8 @@ public function partitions(): iterable SELECT DISTINCT metadata->>'_aggregate_id' AS aggregate_id FROM {$streamTable} WHERE metadata->>'_aggregate_type' = ? + ORDER BY aggregate_id + {$limitClause}{$offsetClause} SQL, [$this->aggregateType]); } elseif ($platform instanceof MySQLPlatform || $platform instanceof MariaDBPlatform) { // MySQL/MariaDB: Use generated indexed columns for better performance @@ -49,6 +88,8 @@ public function partitions(): iterable SELECT DISTINCT aggregate_id FROM {$streamTable} WHERE aggregate_type = ? + ORDER BY aggregate_id + {$limitClause}{$offsetClause} SQL, [$this->aggregateType]); } else { throw new RuntimeException('Unsupported database platform: ' . get_class($platform)); From 68adb04b92f9611ca9b0ff85d0143261b04d70fd Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Thu, 8 Jan 2026 17:55:28 +0100 Subject: [PATCH 02/10] stream filtering --- .../Attribute/FromAggregateStream.php | 37 +++++ .../EventSourcing}/Attribute/FromStream.php | 4 + .../src/Messaging/Config/ModuleClassList.php | 2 + .../Projecting/BackfillExecutorHandler.php | 16 +- .../Projecting/Config/ProjectingModule.php | 5 + .../Config/StreamFilterRegistryModule.php | 138 ++++++++++++++++++ .../src/Projecting/PartitionProvider.php | 10 +- .../src/Projecting/ProjectingManager.php | 15 +- .../Projecting/SinglePartitionProvider.php | 4 +- .../Ecotone/src/Projecting/StreamFilter.php | 25 ++++ .../src/Projecting/StreamFilterRegistry.php | 32 ++++ .../src/Attribute/FromAggregateStream.php | 42 ------ .../src/Config/ProophProjectingModule.php | 94 +++++------- .../AggregateIdPartitionProvider.php | 19 ++- .../AggregateIdPartitionProviderBuilder.php | 4 +- 15 files changed, 323 insertions(+), 124 deletions(-) create mode 100644 packages/Ecotone/src/EventSourcing/Attribute/FromAggregateStream.php rename packages/{PdoEventSourcing/src => Ecotone/src/EventSourcing}/Attribute/FromStream.php (85%) create mode 100644 packages/Ecotone/src/Projecting/Config/StreamFilterRegistryModule.php create mode 100644 packages/Ecotone/src/Projecting/StreamFilter.php create mode 100644 packages/Ecotone/src/Projecting/StreamFilterRegistry.php delete mode 100644 packages/PdoEventSourcing/src/Attribute/FromAggregateStream.php diff --git a/packages/Ecotone/src/EventSourcing/Attribute/FromAggregateStream.php b/packages/Ecotone/src/EventSourcing/Attribute/FromAggregateStream.php new file mode 100644 index 000000000..53fc3fe97 --- /dev/null +++ b/packages/Ecotone/src/EventSourcing/Attribute/FromAggregateStream.php @@ -0,0 +1,37 @@ +projectionRegistry->get($projectionName); + $streamFilter = new StreamFilter($streamName, $aggregateType, $eventStoreReferenceName); - foreach ($projectingManager->getPartitionProvider()->partitions($limit, $offset) as $partition) { + foreach ($projectingManager->getPartitionProvider()->partitions($streamFilter, $limit, $offset) as $partition) { $projectingManager->execute($partition, true); if ($this->terminationListener->shouldTerminate()) { break; diff --git a/packages/Ecotone/src/Projecting/Config/ProjectingModule.php b/packages/Ecotone/src/Projecting/Config/ProjectingModule.php index 3fc525165..891b672a4 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectingModule.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectingModule.php @@ -35,6 +35,7 @@ use Ecotone\Projecting\ProjectingManager; use Ecotone\Projecting\ProjectionRegistry; use Ecotone\Projecting\ProjectionStateStorage; +use Ecotone\Projecting\StreamFilterRegistry; use Ecotone\Projecting\StreamSource; use Ramsey\Uuid\Uuid; @@ -101,6 +102,7 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO new Reference($reference), $components[$projectionName][StreamSource::class] ?? throw ConfigurationException::create("Projection with name {$projectionName} does not have stream source configured. Please check your configuration."), $components[$projectionName][PartitionProvider::class] ?? new Definition(SinglePartitionProvider::class), + new Reference(StreamFilterRegistry::class), $projectionName, new Reference(TerminationListener::class), new Reference(MessagingEntrypoint::class), @@ -166,6 +168,9 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO PayloadBuilder::create('projectionName'), HeaderBuilder::createOptional('limit', 'backfill.limit'), HeaderBuilder::createOptional('offset', 'backfill.offset'), + HeaderBuilder::createOptional('streamName', 'backfill.streamName'), + HeaderBuilder::createOptional('aggregateType', 'backfill.aggregateType'), + HeaderBuilder::createOptional('eventStoreReferenceName', 'backfill.eventStoreReferenceName'), ], ) ) diff --git a/packages/Ecotone/src/Projecting/Config/StreamFilterRegistryModule.php b/packages/Ecotone/src/Projecting/Config/StreamFilterRegistryModule.php new file mode 100644 index 000000000..a77aaea91 --- /dev/null +++ b/packages/Ecotone/src/Projecting/Config/StreamFilterRegistryModule.php @@ -0,0 +1,138 @@ + $streamFilters */ + public function __construct(private array $streamFilters) + { + } + + public static function create(AnnotationFinder $annotationRegistrationService, InterfaceToCallRegistry $interfaceToCallRegistry): static + { + return new self(self::collectStreamFilters($annotationRegistrationService)); + } + + /** + * Collects stream filters from all #[ProjectionV2] classes. + * This method can be reused by other modules that need stream filter information. + * + * @return array Map of projection name to stream filters + */ + public static function collectStreamFilters(AnnotationFinder $annotationFinder): array + { + $streamFilters = []; + + foreach ($annotationFinder->findAnnotatedClasses(ProjectionV2::class) as $classname) { + $projectionAttribute = $annotationFinder->getAttributeForClass($classname, ProjectionV2::class); + $projectionName = $projectionAttribute->name; + + foreach ($annotationFinder->getAnnotationsForClass($classname, FromStream::class) as $streamAttribute) { + $streamFilters[$projectionName][] = new StreamFilter( + $streamAttribute->stream, + $streamAttribute->aggregateType, + $streamAttribute->eventStoreReferenceName, + ); + } + + foreach ($annotationFinder->getAnnotationsForClass($classname, FromAggregateStream::class) as $aggregateStreamAttribute) { + $streamFilters[$projectionName][] = self::resolveFromAggregateStream($annotationFinder, $aggregateStreamAttribute, $projectionName); + } + } + + return $streamFilters; + } + + private static function resolveFromAggregateStream( + AnnotationFinder $annotationFinder, + FromAggregateStream $attribute, + string $projectionName + ): StreamFilter { + $aggregateClass = $attribute->aggregateClass; + + $eventSourcingAggregateAttribute = $annotationFinder->findAttributeForClass($aggregateClass, EventSourcingAggregate::class); + if ($eventSourcingAggregateAttribute === null) { + throw ConfigurationException::create("Class {$aggregateClass} referenced in #[FromAggregateStream] for projection {$projectionName} must be an EventSourcingAggregate."); + } + + // Resolve stream name from #[Stream] attribute if available + $streamName = $aggregateClass; + if (class_exists(Stream::class)) { + $streamAttribute = $annotationFinder->findAttributeForClass($aggregateClass, Stream::class); + $streamName = $streamAttribute?->getName() ?? $aggregateClass; + } + + // Resolve aggregate type from #[AggregateType] attribute if available + $aggregateType = $aggregateClass; + $aggregateTypeAttribute = $annotationFinder->findAttributeForClass($aggregateClass, AggregateType::class); + $aggregateType = $aggregateTypeAttribute?->getName() ?? $aggregateClass; + + return new StreamFilter($streamName, $aggregateType, $attribute->eventStoreReferenceName); + } + + public function prepare(Configuration $messagingConfiguration, array $extensionObjects, ModuleReferenceSearchService $moduleReferenceSearchService, InterfaceToCallRegistry $interfaceToCallRegistry): void + { + $filtersDefinition = []; + foreach ($this->streamFilters as $projectionName => $filters) { + $filtersDefinition[$projectionName] = []; + foreach ($filters as $filter) { + $filtersDefinition[$projectionName][] = new Definition(StreamFilter::class, [ + $filter->streamName, + $filter->aggregateType, + $filter->eventStoreReferenceName, + ]); + } + } + + $messagingConfiguration->registerServiceDefinition( + StreamFilterRegistry::class, + new Definition(StreamFilterRegistry::class, [$filtersDefinition]) + ); + } + + public function canHandle($extensionObject): bool + { + return false; + } + + public function getModuleExtensions(ServiceConfiguration $serviceConfiguration, array $serviceExtensions, ?InterfaceToCallRegistry $interfaceToCallRegistry = null): array + { + return []; + } + + public function getModulePackageName(): string + { + return ModulePackageList::CORE_PACKAGE; + } +} + diff --git a/packages/Ecotone/src/Projecting/PartitionProvider.php b/packages/Ecotone/src/Projecting/PartitionProvider.php index 9b4cbbf99..39f66314a 100644 --- a/packages/Ecotone/src/Projecting/PartitionProvider.php +++ b/packages/Ecotone/src/Projecting/PartitionProvider.php @@ -10,20 +10,22 @@ interface PartitionProvider { /** - * Returns the total count of partitions. + * Returns the total count of partitions for the given stream filter. * For non-partitioned projections, returns 1. * + * @param StreamFilter $filter The stream filter containing stream name and aggregate type * @return int Total number of partitions */ - public function count(): int; + public function count(StreamFilter $filter): int; /** - * Returns partition keys for the projection. + * Returns partition keys for the projection based on the stream filter. * For non-partitioned projections, yields a single null value. * + * @param StreamFilter $filter The stream filter containing stream name and aggregate type * @param int|null $limit Maximum number of partitions to return (null for unlimited) * @param int $offset Number of partitions to skip * @return iterable Partition keys */ - public function partitions(?int $limit = null, int $offset = 0): iterable; + public function partitions(StreamFilter $filter, ?int $limit = null, int $offset = 0): iterable; } diff --git a/packages/Ecotone/src/Projecting/ProjectingManager.php b/packages/Ecotone/src/Projecting/ProjectingManager.php index 89d770f48..4ca23bd73 100644 --- a/packages/Ecotone/src/Projecting/ProjectingManager.php +++ b/packages/Ecotone/src/Projecting/ProjectingManager.php @@ -21,6 +21,7 @@ public function __construct( private ProjectorExecutor $projectorExecutor, private StreamSource $streamSource, private PartitionProvider $partitionProvider, + private StreamFilterRegistry $streamFilterRegistry, private string $projectionName, private TerminationListener $terminationListener, private MessagingEntrypoint $messagingEntrypoint, @@ -122,7 +123,16 @@ public function delete(): void */ public function prepareBackfill(): void { - $totalPartitions = $this->partitionProvider->count(); + $streamFilters = $this->streamFilterRegistry->provide($this->projectionName); + + foreach ($streamFilters as $streamFilter) { + $this->prepareBackfillForFilter($streamFilter); + } + } + + private function prepareBackfillForFilter(StreamFilter $streamFilter): void + { + $totalPartitions = $this->partitionProvider->count($streamFilter); if ($totalPartitions === 0) { return; @@ -138,6 +148,9 @@ public function prepareBackfill(): void [ 'backfill.limit' => $this->backfillBatchSize, 'backfill.offset' => $offset, + 'backfill.streamName' => $streamFilter->streamName, + 'backfill.aggregateType' => $streamFilter->aggregateType, + 'backfill.eventStoreReferenceName' => $streamFilter->eventStoreReferenceName, ], BackfillExecutorHandler::BACKFILL_EXECUTOR_CHANNEL ); diff --git a/packages/Ecotone/src/Projecting/SinglePartitionProvider.php b/packages/Ecotone/src/Projecting/SinglePartitionProvider.php index 8ac349cc1..5eccfbca2 100644 --- a/packages/Ecotone/src/Projecting/SinglePartitionProvider.php +++ b/packages/Ecotone/src/Projecting/SinglePartitionProvider.php @@ -9,12 +9,12 @@ class SinglePartitionProvider implements PartitionProvider { - public function count(): int + public function count(StreamFilter $filter): int { return 1; } - public function partitions(?int $limit = null, int $offset = 0): iterable + public function partitions(StreamFilter $filter, ?int $limit = null, int $offset = 0): iterable { // Global projection has a single null partition // If offset is 0 and limit allows at least 1, yield the single partition diff --git a/packages/Ecotone/src/Projecting/StreamFilter.php b/packages/Ecotone/src/Projecting/StreamFilter.php new file mode 100644 index 000000000..de6af389e --- /dev/null +++ b/packages/Ecotone/src/Projecting/StreamFilter.php @@ -0,0 +1,25 @@ + $filters key is projection name + */ + public function __construct( + private array $filters = [], + ) { + } + + /** + * @return StreamFilter[] + */ + public function provide(string $projectionName): array + { + return $this->filters[$projectionName] ?? []; + } +} + diff --git a/packages/PdoEventSourcing/src/Attribute/FromAggregateStream.php b/packages/PdoEventSourcing/src/Attribute/FromAggregateStream.php deleted file mode 100644 index bfc67f101..000000000 --- a/packages/PdoEventSourcing/src/Attribute/FromAggregateStream.php +++ /dev/null @@ -1,42 +0,0 @@ -findAnnotatedClasses(ProjectionV2::class) as $projectionClassName) { @@ -78,69 +77,70 @@ public static function create(AnnotationFinder $annotationRegistrationService, I } /** - * Resolve stream configurations from FromStream attributes. + * Resolve stream configurations from StreamFilters collected by StreamFilterRegistryModule. * + * @param array $allStreamFilters * @return list */ private static function resolveConfigs( AnnotationFinder $annotationRegistrationService, - array $projectionEventNames + array $allStreamFilters, + array $projectionEventNames ): array { $extensions = []; $partitionProviders = []; - foreach ($annotationRegistrationService->findAnnotatedClasses(ProjectionV2::class) as $classname) { - $projectionAttribute = $annotationRegistrationService->getAttributeForClass($classname, ProjectionV2::class); - $streamAttributes = [ - ...$annotationRegistrationService->getAnnotationsForClass($classname, FromStream::class), - ...\array_map( - fn(FromAggregateStream $aggregateStreamAttribute) => self::resolveFromAggregateStream($annotationRegistrationService, $aggregateStreamAttribute, $projectionAttribute->name), - $annotationRegistrationService->getAnnotationsForClass($classname, FromAggregateStream::class) - ) - ]; - $partitionedAttribute = $annotationRegistrationService->findAttributeForClass($classname, Partitioned::class); - - if (empty($streamAttributes)) { + foreach ($allStreamFilters as $projectionName => $streamFilters) { + // Find the projection class to check for Partitioned attribute + $projectionClass = null; + foreach ($annotationRegistrationService->findAnnotatedClasses(ProjectionV2::class) as $classname) { + $projectionAttribute = $annotationRegistrationService->getAttributeForClass($classname, ProjectionV2::class); + if ($projectionAttribute->name === $projectionName) { + $projectionClass = $classname; + break; + } + } + + if ($projectionClass === null) { continue; } - $projectionName = $projectionAttribute->name; + $partitionedAttribute = $annotationRegistrationService->findAttributeForClass($projectionClass, Partitioned::class); $isPartitioned = $partitionedAttribute !== null; // @todo: Partitioned projections cannot be declared with multiple streams because the current partition provider cannot merge partitions from multiple streams. - if ($isPartitioned && count($streamAttributes) > 1) { + if ($isPartitioned && \count($streamFilters) > 1) { throw ConfigurationException::create( "Partitioned projection {$projectionName} cannot declare multiple streams. Use a single aggregate stream or remove #[Partitioned]." ); } $sources = []; - foreach ($streamAttributes as $streamAttribute) { - if ($isPartitioned && ! $streamAttribute->aggregateType) { + foreach ($streamFilters as $streamFilter) { + if ($isPartitioned && ! $streamFilter->aggregateType) { throw ConfigurationException::create("Aggregate type must be provided for projection {$projectionName} as partition header name is provided"); } if ($isPartitioned) { - $sourceIdentifier = $streamAttribute->stream.'.'.$streamAttribute->aggregateType; + $sourceIdentifier = $streamFilter->streamName . '.' . $streamFilter->aggregateType; $sources[$sourceIdentifier] = new EventStoreAggregateStreamSourceBuilder( $projectionName, - $streamAttribute->aggregateType, - $streamAttribute->stream, + $streamFilter->aggregateType, + $streamFilter->streamName, $projectionEventNames[$projectionName] ?? [], ); - $partitionProviders[$streamAttribute->stream] ??= new AggregateIdPartitionProviderBuilder( - $projectionName, - $streamAttribute->aggregateType, - $streamAttribute->stream, - ); + // Only add partition provider once per projection + if (! isset($partitionProviders[$projectionName])) { + $partitionProviders[$projectionName] = new AggregateIdPartitionProviderBuilder($projectionName); + } } else { - $sources[$streamAttribute->stream] = new EventStoreGlobalStreamSourceBuilder( - $streamAttribute->stream, + $sources[$streamFilter->streamName] = new EventStoreGlobalStreamSourceBuilder( + $streamFilter->streamName, [$projectionName] ); } } - if (count($sources) > 1) { + if (\count($sources) > 1) { $extensions[] = new EventStoreMultiStreamSourceBuilder( $sources, [$projectionName], @@ -154,30 +154,6 @@ private static function resolveConfigs( return $extensions; } - /** - * Resolve stream configurations from FromAggregateStream attributes. - */ - private static function resolveFromAggregateStream( - AnnotationFinder $annotationRegistrationService, - FromAggregateStream $aggregateStreamAttribute, - string $projectionName - ): FromStream { - $aggregateClass = $aggregateStreamAttribute->aggregateClass; - - $eventSourcingAggregateAttribute = $annotationRegistrationService->findAttributeForClass($aggregateClass, EventSourcingAggregate::class); - if ($eventSourcingAggregateAttribute === null) { - throw ConfigurationException::create("Class {$aggregateClass} referenced in #[AggregateStream] for projection {$projectionName} must be an EventSourcingAggregate. Add #[EventSourcingAggregate] attribute to the class."); - } - - $streamAttribute = $annotationRegistrationService->findAttributeForClass($aggregateClass, Stream::class); - $streamName = $streamAttribute?->getName() ?? $aggregateClass; - - $aggregateTypeAttribute = $annotationRegistrationService->findAttributeForClass($aggregateClass, AggregateType::class); - $aggregateType = $aggregateTypeAttribute?->getName() ?? $aggregateClass; - - return new FromStream($streamName, $aggregateType, $aggregateStreamAttribute->eventStoreReferenceName); - } - public function prepare(Configuration $messagingConfiguration, array $extensionObjects, ModuleReferenceSearchService $moduleReferenceSearchService, InterfaceToCallRegistry $interfaceToCallRegistry): void { $dbalConfiguration = ExtensionObjectResolver::resolveUnique(DbalConfiguration::class, $extensionObjects, DbalConfiguration::createWithDefaults()); diff --git a/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProvider.php b/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProvider.php index 5b9dcd858..af4047800 100644 --- a/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProvider.php +++ b/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProvider.php @@ -14,6 +14,7 @@ use Ecotone\Dbal\MultiTenant\MultiTenantConnectionFactory; use Ecotone\EventSourcing\PdoStreamTableNameProvider; use Ecotone\Projecting\PartitionProvider; +use Ecotone\Projecting\StreamFilter; use Enqueue\Dbal\DbalConnectionFactory; use RuntimeException; @@ -21,19 +22,17 @@ class AggregateIdPartitionProvider implements PartitionProvider { public function __construct( private DbalConnectionFactory|MultiTenantConnectionFactory $connectionFactory, - private string $aggregateType, - private string $streamName, private PdoStreamTableNameProvider $tableNameProvider ) { } - public function count(): int + public function count(StreamFilter $filter): int { $connection = $this->getConnection(); $platform = $connection->getDatabasePlatform(); // Resolve table name at runtime using the provider - $streamTable = $this->tableNameProvider->generateTableNameForStream($this->streamName); + $streamTable = $this->tableNameProvider->generateTableNameForStream($filter->streamName); // Build platform-specific count query if ($platform instanceof PostgreSQLPlatform) { @@ -42,14 +41,14 @@ public function count(): int SELECT COUNT(DISTINCT metadata->>'_aggregate_id') FROM {$streamTable} WHERE metadata->>'_aggregate_type' = ? - SQL, [$this->aggregateType]); + SQL, [$filter->aggregateType]); } elseif ($platform instanceof MySQLPlatform || $platform instanceof MariaDBPlatform) { // MySQL/MariaDB: Use generated indexed columns for better performance $result = $connection->executeQuery(<<aggregateType]); + SQL, [$filter->aggregateType]); } else { throw new RuntimeException('Unsupported database platform: ' . get_class($platform)); } @@ -57,13 +56,13 @@ public function count(): int return (int) $result->fetchOne(); } - public function partitions(?int $limit = null, int $offset = 0): iterable + public function partitions(StreamFilter $filter, ?int $limit = null, int $offset = 0): iterable { $connection = $this->getConnection(); $platform = $connection->getDatabasePlatform(); // Resolve table name at runtime using the provider - $streamTable = $this->tableNameProvider->generateTableNameForStream($this->streamName); + $streamTable = $this->tableNameProvider->generateTableNameForStream($filter->streamName); // Build pagination clause $limitClause = ''; @@ -81,7 +80,7 @@ public function partitions(?int $limit = null, int $offset = 0): iterable WHERE metadata->>'_aggregate_type' = ? ORDER BY aggregate_id {$limitClause}{$offsetClause} - SQL, [$this->aggregateType]); + SQL, [$filter->aggregateType]); } elseif ($platform instanceof MySQLPlatform || $platform instanceof MariaDBPlatform) { // MySQL/MariaDB: Use generated indexed columns for better performance $query = $connection->executeQuery(<<aggregateType]); + SQL, [$filter->aggregateType]); } else { throw new RuntimeException('Unsupported database platform: ' . get_class($platform)); } diff --git a/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProviderBuilder.php b/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProviderBuilder.php index 8223da7ed..42285b468 100644 --- a/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProviderBuilder.php +++ b/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProviderBuilder.php @@ -17,7 +17,7 @@ class AggregateIdPartitionProviderBuilder implements ProjectionComponentBuilder { - public function __construct(public readonly string $handledProjectionName, public readonly ?string $aggregateType, private string $streamName) + public function __construct(public readonly string $handledProjectionName) { } @@ -30,8 +30,6 @@ public function compile(MessagingContainerBuilder $builder): Definition|Referenc { return new Definition(AggregateIdPartitionProvider::class, [ Reference::to(DbalConnectionFactory::class), - $this->aggregateType, - $this->streamName, Reference::to(PdoStreamTableNameProvider::class), ]); } From af22851a971dc07e1b1b6b842b5a3219c7e23081 Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Thu, 8 Jan 2026 18:15:07 +0100 Subject: [PATCH 03/10] backfill batch --- .../Attribute/ProjectionBackfill.php | 27 +++++++++++++++++++ .../Attribute/ProjectionBatchSize.php | 9 ++++++- .../EcotoneProjectionExecutorBuilder.php | 15 ++++++++--- .../Config/ProjectingAttributeModule.php | 12 ++++++++- .../Projecting/Config/ProjectingModule.php | 4 +-- .../Config/ProjectionExecutorBuilder.php | 3 ++- ...entStoreStreamingChannelAdapterBuilder.php | 7 ++++- .../src/Projecting/ProjectingManager.php | 22 +++++++-------- .../tests/Projecting/ProjectingTest.php | 2 +- .../Projecting/ProophIntegrationTest.php | 2 +- 10 files changed, 80 insertions(+), 23 deletions(-) create mode 100644 packages/Ecotone/src/Projecting/Attribute/ProjectionBackfill.php diff --git a/packages/Ecotone/src/Projecting/Attribute/ProjectionBackfill.php b/packages/Ecotone/src/Projecting/Attribute/ProjectionBackfill.php new file mode 100644 index 000000000..7ce9ecaee --- /dev/null +++ b/packages/Ecotone/src/Projecting/Attribute/ProjectionBackfill.php @@ -0,0 +1,27 @@ +partitionHeader && ! $this->automaticInitialization) { throw new ConfigurationException("Cannot set partition header for projection {$this->projectionName} with automatic initialization disabled"); @@ -93,9 +95,14 @@ public function automaticInitialization(): bool return $this->automaticInitialization; } - public function batchSize(): int + public function eventLoadingBatchSize(): int { - return $this->batchSize ?? self::DEFAULT_BATCH_SIZE; + return $this->eventLoadingBatchSize ?? self::DEFAULT_EVENT_LOADING_BATCH_SIZE; + } + + public function backfillPartitionBatchSize(): int + { + return $this->backfillPartitionBatchSize ?? self::DEFAULT_BACKFILL_PARTITION_BATCH_SIZE; } public function compile(MessagingContainerBuilder $builder): Definition|Reference diff --git a/packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php b/packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php index cca66d24e..f02ffeb1b 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php @@ -34,6 +34,7 @@ use Ecotone\Modelling\Attribute\NamedEvent; use Ecotone\Projecting\Attribute\Partitioned; use Ecotone\Projecting\Attribute\Polling; +use Ecotone\Projecting\Attribute\ProjectionBackfill; use Ecotone\Projecting\Attribute\ProjectionBatchSize; use Ecotone\Projecting\Attribute\ProjectionDeployment; use Ecotone\Projecting\Attribute\ProjectionFlush; @@ -78,6 +79,7 @@ public static function create(AnnotationFinder $annotationRegistrationService, I foreach ($annotationRegistrationService->findAnnotatedClasses(ProjectionV2::class) as $projectionClassName) { $projectionAttribute = $annotationRegistrationService->getAttributeForClass($projectionClassName, ProjectionV2::class); $batchSizeAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, ProjectionBatchSize::class); + $backfillAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, ProjectionBackfill::class); $pollingAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, Polling::class); $streamingAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, Streaming::class); $projectionDeployment = $annotationRegistrationService->findAttributeForClass($projectionClassName, ProjectionDeployment::class); @@ -88,7 +90,15 @@ public static function create(AnnotationFinder $annotationRegistrationService, I $automaticInitialization = self::resolveAutomaticInitialization($partitionAttribute, $projectionDeployment); $isLive = $projectionDeployment?->live ?? true; - $projectionBuilder = new EcotoneProjectionExecutorBuilder($projectionAttribute->name, $partitionHeaderName, $automaticInitialization, $isLive, $namedEvents, batchSize: $batchSizeAttribute?->batchSize); + $projectionBuilder = new EcotoneProjectionExecutorBuilder( + $projectionAttribute->name, + $partitionHeaderName, + $automaticInitialization, + $isLive, + $namedEvents, + eventLoadingBatchSize: $batchSizeAttribute?->eventLoadingBatchSize, + backfillPartitionBatchSize: $backfillAttribute?->backfillPartitionBatchSize, + ); $asynchronousChannelName = self::getProjectionAsynchronousChannel($annotationRegistrationService, $projectionClassName); $isPolling = $pollingAttribute !== null; diff --git a/packages/Ecotone/src/Projecting/Config/ProjectingModule.php b/packages/Ecotone/src/Projecting/Config/ProjectingModule.php index 891b672a4..46a060021 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectingModule.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectingModule.php @@ -106,9 +106,9 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO $projectionName, new Reference(TerminationListener::class), new Reference(MessagingEntrypoint::class), - $projectionBuilder->batchSize(), // batchSize + $projectionBuilder->eventLoadingBatchSize(), $projectionBuilder->automaticInitialization(), - 100, // backfillBatchSize - default value, can be made configurable via #[Backfill] attribute + $projectionBuilder->backfillPartitionBatchSize(), ]) ); $projectionRegistryMap[$projectionName] = new Reference($projectingManagerReference); diff --git a/packages/Ecotone/src/Projecting/Config/ProjectionExecutorBuilder.php b/packages/Ecotone/src/Projecting/Config/ProjectionExecutorBuilder.php index 620c84979..47da187c8 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectionExecutorBuilder.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectionExecutorBuilder.php @@ -15,5 +15,6 @@ public function projectionName(): string; public function asyncChannelName(): ?string; public function partitionHeader(): ?string; public function automaticInitialization(): bool; - public function batchSize(): int; + public function eventLoadingBatchSize(): int; + public function backfillPartitionBatchSize(): int; } diff --git a/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreStreamingChannelAdapterBuilder.php b/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreStreamingChannelAdapterBuilder.php index 243cf3387..036da1e61 100644 --- a/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreStreamingChannelAdapterBuilder.php +++ b/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreStreamingChannelAdapterBuilder.php @@ -45,11 +45,16 @@ public function automaticInitialization(): bool return true; } - public function batchSize(): int + public function eventLoadingBatchSize(): int { return $this->channelAdapter->batchSize; } + public function backfillPartitionBatchSize(): int + { + return 100; // Default value, streaming channel adapters don't support partitioned backfill + } + public function compile(MessagingContainerBuilder $builder): Definition|Reference { // Create the projection executor that forwards events to the streaming channel diff --git a/packages/Ecotone/src/Projecting/ProjectingManager.php b/packages/Ecotone/src/Projecting/ProjectingManager.php index 4ca23bd73..29336f079 100644 --- a/packages/Ecotone/src/Projecting/ProjectingManager.php +++ b/packages/Ecotone/src/Projecting/ProjectingManager.php @@ -14,7 +14,7 @@ class ProjectingManager { - private const DEFAULT_BACKFILL_BATCH_SIZE = 100; + private const DEFAULT_BACKFILL_PARTITION_BATCH_SIZE = 100; public function __construct( private ProjectionStateStorage $projectionStateStorage, @@ -25,15 +25,15 @@ public function __construct( private string $projectionName, private TerminationListener $terminationListener, private MessagingEntrypoint $messagingEntrypoint, - private int $batchSize = 1000, + private int $eventLoadingBatchSize = 1000, private bool $automaticInitialization = true, - private int $backfillBatchSize = self::DEFAULT_BACKFILL_BATCH_SIZE, + private int $backfillPartitionBatchSize = self::DEFAULT_BACKFILL_PARTITION_BATCH_SIZE, ) { - if ($batchSize < 1) { - throw new InvalidArgumentException('Batch size must be at least 1'); + if ($eventLoadingBatchSize < 1) { + throw new InvalidArgumentException('Event loading batch size must be at least 1'); } - if ($backfillBatchSize < 1) { - throw new InvalidArgumentException('Backfill batch size must be at least 1'); + if ($backfillPartitionBatchSize < 1) { + throw new InvalidArgumentException('Backfill partition batch size must be at least 1'); } } @@ -57,7 +57,7 @@ private function executeSingleBatch(?string $partitionKeyValue, bool $canInitial return 0; } - $streamPage = $this->streamSource->load($projectionState->lastPosition, $this->batchSize, $partitionKeyValue); + $streamPage = $this->streamSource->load($projectionState->lastPosition, $this->eventLoadingBatchSize, $partitionKeyValue); $userState = $projectionState->userState; $processedEvents = 0; @@ -138,15 +138,15 @@ private function prepareBackfillForFilter(StreamFilter $streamFilter): void return; } - $numberOfBatches = (int) ceil($totalPartitions / $this->backfillBatchSize); + $numberOfBatches = (int) ceil($totalPartitions / $this->backfillPartitionBatchSize); for ($batch = 0; $batch < $numberOfBatches; $batch++) { - $offset = $batch * $this->backfillBatchSize; + $offset = $batch * $this->backfillPartitionBatchSize; $this->messagingEntrypoint->sendWithHeaders( $this->projectionName, [ - 'backfill.limit' => $this->backfillBatchSize, + 'backfill.limit' => $this->backfillPartitionBatchSize, 'backfill.offset' => $offset, 'backfill.streamName' => $streamFilter->streamName, 'backfill.aggregateType' => $streamFilter->aggregateType, diff --git a/packages/Ecotone/tests/Projecting/ProjectingTest.php b/packages/Ecotone/tests/Projecting/ProjectingTest.php index 11a09cc60..34ff39903 100644 --- a/packages/Ecotone/tests/Projecting/ProjectingTest.php +++ b/packages/Ecotone/tests/Projecting/ProjectingTest.php @@ -592,7 +592,7 @@ public function flush(): void #[RequiresPhpExtension('posix')] public function test_pcntl_signals_handling(): void { - $projection = new #[ProjectionV2('signals_projection'), ProjectionBatchSize(2)] class () { + $projection = new #[ProjectionV2('signals_projection'), ProjectionBatchSize(eventLoadingBatchSize: 2)] class () { public array $processedEvents = []; #[EventHandler('*')] public function handle(array $event): void diff --git a/packages/PdoEventSourcing/tests/Projecting/ProophIntegrationTest.php b/packages/PdoEventSourcing/tests/Projecting/ProophIntegrationTest.php index 8d9202580..5bc3df6d0 100644 --- a/packages/PdoEventSourcing/tests/Projecting/ProophIntegrationTest.php +++ b/packages/PdoEventSourcing/tests/Projecting/ProophIntegrationTest.php @@ -358,7 +358,7 @@ public function init(): void public function test_it_handles_batches(): void { $connectionFactory = self::getConnectionFactory(); - $projection = new #[ProjectionV2(self::NAME), ProjectionDeployment(manualKickOff: true), FromStream(Ticket::STREAM_NAME), ProjectionBatchSize(3)] class ($connectionFactory->establishConnection()) extends DbalTicketProjection { + $projection = new #[ProjectionV2(self::NAME), ProjectionDeployment(manualKickOff: true), FromStream(Ticket::STREAM_NAME), ProjectionBatchSize(eventLoadingBatchSize: 3)] class ($connectionFactory->establishConnection()) extends DbalTicketProjection { public const NAME = 'batch_projection'; public int $flushCallCount = 0; #[ProjectionFlush] From 83426c48247afe1c56531678c776548a31f33524 Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Thu, 8 Jan 2026 18:20:28 +0100 Subject: [PATCH 04/10] naming --- .../{ProjectionBatchSize.php => ProjectionExecution.php} | 2 +- .../src/Projecting/Config/ProjectingAttributeModule.php | 4 ++-- packages/Ecotone/tests/Projecting/ProjectingTest.php | 4 ++-- .../tests/Projecting/ProophIntegrationTest.php | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) rename packages/Ecotone/src/Projecting/Attribute/{ProjectionBatchSize.php => ProjectionExecution.php} (94%) diff --git a/packages/Ecotone/src/Projecting/Attribute/ProjectionBatchSize.php b/packages/Ecotone/src/Projecting/Attribute/ProjectionExecution.php similarity index 94% rename from packages/Ecotone/src/Projecting/Attribute/ProjectionBatchSize.php rename to packages/Ecotone/src/Projecting/Attribute/ProjectionExecution.php index aa7ac001c..c02e87fd0 100644 --- a/packages/Ecotone/src/Projecting/Attribute/ProjectionBatchSize.php +++ b/packages/Ecotone/src/Projecting/Attribute/ProjectionExecution.php @@ -10,7 +10,7 @@ use Attribute; #[Attribute(Attribute::TARGET_CLASS)] -class ProjectionBatchSize +class ProjectionExecution { public function __construct( /** diff --git a/packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php b/packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php index f02ffeb1b..3882265a2 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php @@ -35,7 +35,7 @@ use Ecotone\Projecting\Attribute\Partitioned; use Ecotone\Projecting\Attribute\Polling; use Ecotone\Projecting\Attribute\ProjectionBackfill; -use Ecotone\Projecting\Attribute\ProjectionBatchSize; +use Ecotone\Projecting\Attribute\ProjectionExecution; use Ecotone\Projecting\Attribute\ProjectionDeployment; use Ecotone\Projecting\Attribute\ProjectionFlush; use Ecotone\Projecting\Attribute\ProjectionV2; @@ -78,7 +78,7 @@ public static function create(AnnotationFinder $annotationRegistrationService, I $eventStreamingProjections = []; foreach ($annotationRegistrationService->findAnnotatedClasses(ProjectionV2::class) as $projectionClassName) { $projectionAttribute = $annotationRegistrationService->getAttributeForClass($projectionClassName, ProjectionV2::class); - $batchSizeAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, ProjectionBatchSize::class); + $batchSizeAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, ProjectionExecution::class); $backfillAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, ProjectionBackfill::class); $pollingAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, Polling::class); $streamingAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, Streaming::class); diff --git a/packages/Ecotone/tests/Projecting/ProjectingTest.php b/packages/Ecotone/tests/Projecting/ProjectingTest.php index 34ff39903..6c5191b12 100644 --- a/packages/Ecotone/tests/Projecting/ProjectingTest.php +++ b/packages/Ecotone/tests/Projecting/ProjectingTest.php @@ -21,7 +21,7 @@ use Ecotone\Modelling\Attribute\EventHandler; use Ecotone\Modelling\Event; use Ecotone\Projecting\Attribute\Partitioned; -use Ecotone\Projecting\Attribute\ProjectionBatchSize; +use Ecotone\Projecting\Attribute\ProjectionExecution; use Ecotone\Projecting\Attribute\ProjectionDeployment; use Ecotone\Projecting\Attribute\ProjectionFlush; use Ecotone\Projecting\Attribute\ProjectionV2; @@ -592,7 +592,7 @@ public function flush(): void #[RequiresPhpExtension('posix')] public function test_pcntl_signals_handling(): void { - $projection = new #[ProjectionV2('signals_projection'), ProjectionBatchSize(eventLoadingBatchSize: 2)] class () { + $projection = new #[ProjectionV2('signals_projection'), ProjectionExecution(eventLoadingBatchSize: 2)] class () { public array $processedEvents = []; #[EventHandler('*')] public function handle(array $event): void diff --git a/packages/PdoEventSourcing/tests/Projecting/ProophIntegrationTest.php b/packages/PdoEventSourcing/tests/Projecting/ProophIntegrationTest.php index 5bc3df6d0..45c5ae03a 100644 --- a/packages/PdoEventSourcing/tests/Projecting/ProophIntegrationTest.php +++ b/packages/PdoEventSourcing/tests/Projecting/ProophIntegrationTest.php @@ -17,7 +17,7 @@ use Ecotone\Messaging\MessageHeaders; use Ecotone\Modelling\Attribute\EventHandler; use Ecotone\Projecting\Attribute\Partitioned; -use Ecotone\Projecting\Attribute\ProjectionBatchSize; +use Ecotone\Projecting\Attribute\ProjectionExecution; use Ecotone\Projecting\Attribute\ProjectionDeployment; use Ecotone\Projecting\Attribute\ProjectionFlush; use Ecotone\Projecting\Attribute\ProjectionV2; @@ -358,7 +358,7 @@ public function init(): void public function test_it_handles_batches(): void { $connectionFactory = self::getConnectionFactory(); - $projection = new #[ProjectionV2(self::NAME), ProjectionDeployment(manualKickOff: true), FromStream(Ticket::STREAM_NAME), ProjectionBatchSize(eventLoadingBatchSize: 3)] class ($connectionFactory->establishConnection()) extends DbalTicketProjection { + $projection = new #[ProjectionV2(self::NAME), ProjectionDeployment(manualKickOff: true), FromStream(Ticket::STREAM_NAME), ProjectionExecution(eventLoadingBatchSize: 3)] class ($connectionFactory->establishConnection()) extends DbalTicketProjection { public const NAME = 'batch_projection'; public int $flushCallCount = 0; #[ProjectionFlush] From 4ffbf3c89f03bdee3c98a0160b0459e818503147 Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Thu, 8 Jan 2026 21:00:55 +0100 Subject: [PATCH 05/10] fixes --- .../Config/StreamFilterRegistryModule.php | 6 +++ .../EventStreamingProjectionTest.php | 15 +++--- .../tests/Projecting/ProjectingTest.php | 52 +++++++++++++------ 3 files changed, 50 insertions(+), 23 deletions(-) diff --git a/packages/Ecotone/src/Projecting/Config/StreamFilterRegistryModule.php b/packages/Ecotone/src/Projecting/Config/StreamFilterRegistryModule.php index a77aaea91..9716af76c 100644 --- a/packages/Ecotone/src/Projecting/Config/StreamFilterRegistryModule.php +++ b/packages/Ecotone/src/Projecting/Config/StreamFilterRegistryModule.php @@ -68,6 +68,12 @@ public static function collectStreamFilters(AnnotationFinder $annotationFinder): foreach ($annotationFinder->getAnnotationsForClass($classname, FromAggregateStream::class) as $aggregateStreamAttribute) { $streamFilters[$projectionName][] = self::resolveFromAggregateStream($annotationFinder, $aggregateStreamAttribute, $projectionName); } + + if (! isset($streamFilters[$projectionName]) || $streamFilters[$projectionName] === []) { + throw ConfigurationException::create( + "Projection '{$projectionName}' must have at least one #[FromStream] or #[FromAggregateStream] attribute defined on class {$classname}." + ); + } } return $streamFilters; diff --git a/packages/Ecotone/tests/Projecting/EventStreamingProjectionTest.php b/packages/Ecotone/tests/Projecting/EventStreamingProjectionTest.php index 866ae678a..1dec00e45 100644 --- a/packages/Ecotone/tests/Projecting/EventStreamingProjectionTest.php +++ b/packages/Ecotone/tests/Projecting/EventStreamingProjectionTest.php @@ -7,6 +7,7 @@ namespace Test\Ecotone\Projecting; +use Ecotone\EventSourcing\Attribute\FromStream; use Ecotone\Lite\EcotoneLite; use Ecotone\Messaging\Channel\SimpleMessageChannelBuilder; use Ecotone\Messaging\Config\ModulePackageList; @@ -36,7 +37,7 @@ public function test_event_streaming_projection_consuming_from_streaming_channel $positionTracker = new InMemoryConsumerPositionTracker(); // Given a projection that consumes from streaming channel - $projection = new #[ProjectionV2('user_projection'), Streaming('streaming_channel')] class { + $projection = new #[ProjectionV2('user_projection'), FromStream('test_stream'), Streaming('streaming_channel')] class { public array $projectedUsers = []; #[EventHandler] @@ -86,7 +87,7 @@ public function test_event_streaming_projection_with_multiple_event_handlers_rou $positionTracker = new InMemoryConsumerPositionTracker(); // Given a projection with two event handlers routed by event names - $projection = new #[ProjectionV2('order_projection'), Streaming('streaming_channel')] class { + $projection = new #[ProjectionV2('order_projection'), FromStream('test_stream'), Streaming('streaming_channel')] class { public array $createdOrders = []; public array $completedOrders = []; @@ -150,7 +151,7 @@ public function test_event_streaming_projection_with_event_store_channel_adapter $positionTracker = new InMemoryConsumerPositionTracker(); // Given a projection that consumes from streaming channel - $projection = new #[ProjectionV2('product_projection'), Streaming('event_stream')] class { + $projection = new #[ProjectionV2('product_projection'), FromStream('test_stream'), Streaming('event_stream')] class { public array $projectedProducts = []; #[EventHandler] @@ -211,7 +212,7 @@ public function test_two_event_streaming_projections_consuming_from_same_channel $positionTracker = new InMemoryConsumerPositionTracker(); // Given two projections consuming from the same streaming channel - $productListProjection = new #[ProjectionV2('product_list_projection'), Streaming('event_stream')] class { + $productListProjection = new #[ProjectionV2('product_list_projection'), FromStream('test_stream'), Streaming('event_stream')] class { public array $productList = []; #[EventHandler] @@ -221,7 +222,7 @@ public function onProductRegistered(ProductRegistered $event): void } }; - $productPriceProjection = new #[ProjectionV2('product_price_projection'), Streaming('event_stream')] class { + $productPriceProjection = new #[ProjectionV2('product_price_projection'), FromStream('test_stream'), Streaming('event_stream')] class { public array $productPrices = []; #[EventHandler] @@ -293,7 +294,7 @@ public function test_event_driven_projection_combined_with_event_streaming_proje $positionTracker = new InMemoryConsumerPositionTracker(); // Given an event-driven projection (catches up from stream when triggered) - $eventDrivenProjection = new #[ProjectionV2('event_driven_product_count')] class { + $eventDrivenProjection = new #[ProjectionV2('event_driven_product_count'), FromStream('test_stream')] class { public int $productCount = 0; #[EventHandler] @@ -304,7 +305,7 @@ public function onProductRegistered(ProductRegistered $event): void }; // Given an event streaming projection (processes events in polling mode from streaming channel) - $eventStreamingProjection = new #[ProjectionV2('streaming_product_list'), Streaming('event_stream')] class { + $eventStreamingProjection = new #[ProjectionV2('streaming_product_list'), FromStream('test_stream'), Streaming('event_stream')] class { public array $productList = []; #[EventHandler] diff --git a/packages/Ecotone/tests/Projecting/ProjectingTest.php b/packages/Ecotone/tests/Projecting/ProjectingTest.php index 6c5191b12..e162381d0 100644 --- a/packages/Ecotone/tests/Projecting/ProjectingTest.php +++ b/packages/Ecotone/tests/Projecting/ProjectingTest.php @@ -7,6 +7,7 @@ namespace Test\Ecotone\Projecting; +use Ecotone\EventSourcing\Attribute\FromStream; use Ecotone\EventSourcing\Attribute\ProjectionInitialization; use Ecotone\Lite\EcotoneLite; use Ecotone\Messaging\Attribute\Asynchronous; @@ -40,7 +41,7 @@ class ProjectingTest extends TestCase public function test_asynchronous_projection(): void { // Given an asynchronous projection - $projection = new #[ProjectionV2('test'), Asynchronous('async')] class { + $projection = new #[ProjectionV2('test'), FromStream('test_stream'), Asynchronous('async')] class { public array $handledEvents = []; #[EventHandler('*')] public function handle(array $event): void @@ -71,7 +72,7 @@ public function handle(array $event): void public function test_partitioned_projection(): void { // Given a partitioned projection - $projection = new #[ProjectionV2('test'), Partitioned('partitionHeader')] class { + $projection = new #[ProjectionV2('test'), FromStream('test_stream'), Partitioned('partitionHeader')] class { public array $handledEvents = []; #[EventHandler('*')] public function handle(array $event): void @@ -105,7 +106,7 @@ public function handle(array $event): void public function test_asynchronous_partitioned_projection(): void { // Given a partitioned async projection - $projection = new #[ProjectionV2('test'), Partitioned('partitionHeader'), Asynchronous('async')] class { + $projection = new #[ProjectionV2('test'), FromStream('test_stream'), Partitioned('partitionHeader'), Asynchronous('async')] class { public array $handledEvents = []; #[EventHandler('*')] public function handle(array $event): void @@ -141,7 +142,7 @@ public function handle(array $event): void public function test_it_can_init_projection_lifecycle_state(): void { - $projection = new #[ProjectionV2('projection_with_lifecycle')] class { + $projection = new #[ProjectionV2('projection_with_lifecycle'), FromStream('test_stream')] class { public const TICKET_CREATED = 'ticket.created'; private bool $initialized = false; public array $projectedEvents = []; @@ -184,7 +185,7 @@ public function init(): void public function test_it_skips_execution_when_automatic_initialization_is_off_and_not_initialized(): void { - $projection = new #[ProjectionV2('projection_with_manual_initialization'), ProjectionDeployment(manualKickOff: true)] class { + $projection = new #[ProjectionV2('projection_with_manual_initialization'), FromStream('test_stream'), ProjectionDeployment(manualKickOff: true)] class { public const TICKET_CREATED = 'ticket.created'; public array $projectedEvents = []; @@ -224,7 +225,7 @@ public function on(array $event): void public function test_init_partition_concurrency_protection(): void { - $projection = new #[ProjectionV2('concurrent_projection')] class { + $projection = new #[ProjectionV2('concurrent_projection'), FromStream('test_stream')] class { public const TICKET_CREATED = 'ticket.created'; public array $projectedEvents = []; public int $initCallCount = 0; @@ -267,7 +268,7 @@ public function init(): void public function test_auto_initialization_mode_processes_events(): void { - $projection = new #[ProjectionV2('auto_projection'), ProjectionDeployment(manualKickOff: false)] class { + $projection = new #[ProjectionV2('auto_projection'), FromStream('test_stream'), ProjectionDeployment(manualKickOff: false)] class { public const TICKET_CREATED = 'ticket.created'; public array $projectedEvents = []; public int $initCallCount = 0; @@ -307,7 +308,7 @@ public function init(): void public function test_skip_initialization_mode_skips_events_when_not_initialized(): void { - $projection = new #[ProjectionV2('skip_projection'), ProjectionDeployment(manualKickOff: true)] class { + $projection = new #[ProjectionV2('skip_projection'), FromStream('test_stream'), ProjectionDeployment(manualKickOff: true)] class { public const TICKET_CREATED = 'ticket.created'; public array $projectedEvents = []; public int $initCallCount = 0; @@ -347,7 +348,7 @@ public function init(): void public function test_skip_mode_with_multiple_events(): void { - $projection = new #[ProjectionV2('skip_multiple_events'), ProjectionDeployment(manualKickOff: true)] class { + $projection = new #[ProjectionV2('skip_multiple_events'), FromStream('test_stream'), ProjectionDeployment(manualKickOff: true)] class { public const TICKET_CREATED = 'ticket.created'; public array $projectedEvents = []; public int $initCallCount = 0; @@ -390,7 +391,7 @@ public function init(): void public function test_auto_mode_with_multiple_events(): void { - $projection = new #[ProjectionV2('auto_multiple_events'), ProjectionDeployment(manualKickOff: false)] class { + $projection = new #[ProjectionV2('auto_multiple_events'), FromStream('test_stream'), ProjectionDeployment(manualKickOff: false)] class { public const TICKET_CREATED = 'ticket.created'; public array $projectedEvents = []; public int $initCallCount = 0; @@ -432,7 +433,7 @@ public function init(): void public function test_projection_with_partitioned_events(): void { - $projection = new #[ProjectionV2('partitioned_auto_projection'), Partitioned('tenantId')] class { + $projection = new #[ProjectionV2('partitioned_auto_projection'), FromStream('test_stream'), Partitioned('tenantId')] class { public const TICKET_CREATED = 'ticket.created'; public array $projectedEvents = []; public int $initCallCount = 0; @@ -475,7 +476,7 @@ public function test_it_throws_exception_when_no_licence(): void $this->expectException(ConfigurationException::class); $this->expectExceptionMessage('Projections are part of Ecotone Enterprise. To use projections, please acquire an enterprise licence.'); - $projection = new #[ProjectionV2('test')] class { + $projection = new #[ProjectionV2('test'), FromStream('test_stream')] class { #[EventHandler('*')] public function handle(array $event): void { @@ -490,10 +491,29 @@ public function handle(array $event): void ); } + public function test_it_throws_exception_when_no_stream_defined(): void + { + $this->expectException(ConfigurationException::class); + $this->expectExceptionMessage("Projection 'test_no_stream' must have at least one #[FromStream] or #[FromAggregateStream] attribute defined"); + + $projection = new #[ProjectionV2('test_no_stream')] class { + #[EventHandler('*')] + public function handle(array $event): void + { + } + }; + EcotoneLite::bootstrapFlowTesting( + [$projection::class], + [$projection], + configuration: ServiceConfiguration::createWithDefaults() + ->withLicenceKey(LicenceTesting::VALID_LICENCE) + ); + } + public function test_it_with_event_handler_priority(): void { $db = []; - $projectionA = new #[ProjectionV2('A')] class ($db) { + $projectionA = new #[ProjectionV2('A'), FromStream('test_stream')] class ($db) { public function __construct(private array &$db) { } @@ -509,7 +529,7 @@ public function handleHighPriority(array $event): void $this->db[] = 'projectionA-with-priority'; } }; - $projectionB = new #[ProjectionV2('B')] class ($db) { + $projectionB = new #[ProjectionV2('B'), FromStream('test_stream')] class ($db) { public function __construct(private array &$db) { } @@ -551,7 +571,7 @@ public function handleHighPriority(array $event): void public function test_it_can_flush_by_batches(): void { - $projection = new #[ProjectionV2('batch_projection')] class () { + $projection = new #[ProjectionV2('batch_projection'), FromStream('test_stream')] class () { public array $processingEvents = []; public array $flushedEvents = []; #[EventHandler('*')] @@ -592,7 +612,7 @@ public function flush(): void #[RequiresPhpExtension('posix')] public function test_pcntl_signals_handling(): void { - $projection = new #[ProjectionV2('signals_projection'), ProjectionExecution(eventLoadingBatchSize: 2)] class () { + $projection = new #[ProjectionV2('signals_projection'), FromStream('test_stream'), ProjectionExecution(eventLoadingBatchSize: 2)] class () { public array $processedEvents = []; #[EventHandler('*')] public function handle(array $event): void From d4c54e98bc591e8418650c9e6baf730fdae2bd69 Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Thu, 8 Jan 2026 21:56:32 +0100 Subject: [PATCH 06/10] async backfill --- .../Attribute/ProjectionBackfill.php | 13 +- .../EcotoneProjectionExecutorBuilder.php | 6 + .../Config/ProjectingAttributeModule.php | 1 + .../Projecting/Config/ProjectingModule.php | 1 + .../Config/ProjectionExecutorBuilder.php | 1 + ...entStoreStreamingChannelAdapterBuilder.php | 5 + .../src/Projecting/ProjectingManager.php | 31 +- .../InMemoryEventStoreRegistrationTest.php | 4 +- .../Projecting/BackfillProjectionTest.php | 329 ++++++++++++++++++ 9 files changed, 381 insertions(+), 10 deletions(-) create mode 100644 packages/PdoEventSourcing/tests/Projecting/BackfillProjectionTest.php diff --git a/packages/Ecotone/src/Projecting/Attribute/ProjectionBackfill.php b/packages/Ecotone/src/Projecting/Attribute/ProjectionBackfill.php index 7ce9ecaee..200f1a2ab 100644 --- a/packages/Ecotone/src/Projecting/Attribute/ProjectionBackfill.php +++ b/packages/Ecotone/src/Projecting/Attribute/ProjectionBackfill.php @@ -8,6 +8,7 @@ namespace Ecotone\Projecting\Attribute; use Attribute; +use InvalidArgumentException; /** * Configure projection backfill settings. @@ -19,9 +20,19 @@ class ProjectionBackfill public function __construct( /** * Number of partitions to process in a single batch during backfill. + * Must be at least 1. */ - public readonly int $backfillPartitionBatchSize = 100 + public readonly int $backfillPartitionBatchSize = 100, + /** + * Async channel name for backfill operations. + * When set, backfill batches are sent to this channel first, then routed to the backfill handler. + * When null, backfill executes synchronously. + */ + public readonly ?string $asyncChannelName = null, ) { + if ($this->backfillPartitionBatchSize < 1) { + throw new InvalidArgumentException('Backfill partition batch size must be at least 1'); + } } } diff --git a/packages/Ecotone/src/Projecting/Config/EcotoneProjectionExecutorBuilder.php b/packages/Ecotone/src/Projecting/Config/EcotoneProjectionExecutorBuilder.php index 488e471fb..a2224250f 100644 --- a/packages/Ecotone/src/Projecting/Config/EcotoneProjectionExecutorBuilder.php +++ b/packages/Ecotone/src/Projecting/Config/EcotoneProjectionExecutorBuilder.php @@ -44,6 +44,7 @@ public function __construct( private ?string $asyncChannelName = null, private ?int $eventLoadingBatchSize = null, private ?int $backfillPartitionBatchSize = null, + private ?string $backfillAsyncChannelName = null, ) { if ($this->partitionHeader && ! $this->automaticInitialization) { throw new ConfigurationException("Cannot set partition header for projection {$this->projectionName} with automatic initialization disabled"); @@ -105,6 +106,11 @@ public function backfillPartitionBatchSize(): int return $this->backfillPartitionBatchSize ?? self::DEFAULT_BACKFILL_PARTITION_BATCH_SIZE; } + public function backfillAsyncChannelName(): ?string + { + return $this->backfillAsyncChannelName; + } + public function compile(MessagingContainerBuilder $builder): Definition|Reference { $routerProcessor = $this->buildExecutionRouter($builder); diff --git a/packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php b/packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php index 3882265a2..2c8ce7e9f 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php @@ -98,6 +98,7 @@ public static function create(AnnotationFinder $annotationRegistrationService, I $namedEvents, eventLoadingBatchSize: $batchSizeAttribute?->eventLoadingBatchSize, backfillPartitionBatchSize: $backfillAttribute?->backfillPartitionBatchSize, + backfillAsyncChannelName: $backfillAttribute?->asyncChannelName, ); $asynchronousChannelName = self::getProjectionAsynchronousChannel($annotationRegistrationService, $projectionClassName); diff --git a/packages/Ecotone/src/Projecting/Config/ProjectingModule.php b/packages/Ecotone/src/Projecting/Config/ProjectingModule.php index 46a060021..2ad37e20e 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectingModule.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectingModule.php @@ -109,6 +109,7 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO $projectionBuilder->eventLoadingBatchSize(), $projectionBuilder->automaticInitialization(), $projectionBuilder->backfillPartitionBatchSize(), + $projectionBuilder->backfillAsyncChannelName(), ]) ); $projectionRegistryMap[$projectionName] = new Reference($projectingManagerReference); diff --git a/packages/Ecotone/src/Projecting/Config/ProjectionExecutorBuilder.php b/packages/Ecotone/src/Projecting/Config/ProjectionExecutorBuilder.php index 47da187c8..e51f3dda5 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectionExecutorBuilder.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectionExecutorBuilder.php @@ -17,4 +17,5 @@ public function partitionHeader(): ?string; public function automaticInitialization(): bool; public function eventLoadingBatchSize(): int; public function backfillPartitionBatchSize(): int; + public function backfillAsyncChannelName(): ?string; } diff --git a/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreStreamingChannelAdapterBuilder.php b/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreStreamingChannelAdapterBuilder.php index 036da1e61..bfef18135 100644 --- a/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreStreamingChannelAdapterBuilder.php +++ b/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreStreamingChannelAdapterBuilder.php @@ -55,6 +55,11 @@ public function backfillPartitionBatchSize(): int return 100; // Default value, streaming channel adapters don't support partitioned backfill } + public function backfillAsyncChannelName(): ?string + { + return null; // Streaming channel adapters don't support async backfill + } + public function compile(MessagingContainerBuilder $builder): Definition|Reference { // Create the projection executor that forwards events to the streaming channel diff --git a/packages/Ecotone/src/Projecting/ProjectingManager.php b/packages/Ecotone/src/Projecting/ProjectingManager.php index 29336f079..955c946d7 100644 --- a/packages/Ecotone/src/Projecting/ProjectingManager.php +++ b/packages/Ecotone/src/Projecting/ProjectingManager.php @@ -28,6 +28,7 @@ public function __construct( private int $eventLoadingBatchSize = 1000, private bool $automaticInitialization = true, private int $backfillPartitionBatchSize = self::DEFAULT_BACKFILL_PARTITION_BATCH_SIZE, + private ?string $backfillAsyncChannelName = null, ) { if ($eventLoadingBatchSize < 1) { throw new InvalidArgumentException('Event loading batch size must be at least 1'); @@ -143,15 +144,31 @@ private function prepareBackfillForFilter(StreamFilter $streamFilter): void for ($batch = 0; $batch < $numberOfBatches; $batch++) { $offset = $batch * $this->backfillPartitionBatchSize; + $headers = [ + 'backfill.limit' => $this->backfillPartitionBatchSize, + 'backfill.offset' => $offset, + 'backfill.streamName' => $streamFilter->streamName, + 'backfill.aggregateType' => $streamFilter->aggregateType, + 'backfill.eventStoreReferenceName' => $streamFilter->eventStoreReferenceName, + ]; + + $this->sendBackfillMessage($headers); + } + } + + private function sendBackfillMessage(array $headers): void + { + if ($this->backfillAsyncChannelName !== null) { + $this->messagingEntrypoint->sendWithHeaders( + $this->projectionName, + $headers, + $this->backfillAsyncChannelName, + BackfillExecutorHandler::BACKFILL_EXECUTOR_CHANNEL + ); + } else { $this->messagingEntrypoint->sendWithHeaders( $this->projectionName, - [ - 'backfill.limit' => $this->backfillPartitionBatchSize, - 'backfill.offset' => $offset, - 'backfill.streamName' => $streamFilter->streamName, - 'backfill.aggregateType' => $streamFilter->aggregateType, - 'backfill.eventStoreReferenceName' => $streamFilter->eventStoreReferenceName, - ], + $headers, BackfillExecutorHandler::BACKFILL_EXECUTOR_CHANNEL ); } diff --git a/packages/PdoEventSourcing/tests/InMemory/InMemoryEventStoreRegistrationTest.php b/packages/PdoEventSourcing/tests/InMemory/InMemoryEventStoreRegistrationTest.php index 749f84721..b840f0580 100644 --- a/packages/PdoEventSourcing/tests/InMemory/InMemoryEventStoreRegistrationTest.php +++ b/packages/PdoEventSourcing/tests/InMemory/InMemoryEventStoreRegistrationTest.php @@ -4,6 +4,7 @@ namespace Test\Ecotone\EventSourcing\InMemory; +use Ecotone\EventSourcing\Attribute\FromStream; use Ecotone\EventSourcing\EventSourcingConfiguration; use Ecotone\Lite\EcotoneLite; use Ecotone\Messaging\Attribute\Converter; @@ -24,8 +25,7 @@ class InMemoryEventStoreRegistrationTest extends TestCase { public function test_registers_in_memory_event_store_stream_source_when_pdo_event_sourcing_is_in_memory_mode(): void { - // Given a polling projection (polling projections read from stream sources) - $projection = new #[ProjectionV2('test_projection'), Polling('test_projection_poller')] class { + $projection = new #[ProjectionV2('test_projection'), Polling('test_projection_poller'), FromStream('test_stream')] class { public array $events = []; public int $callCount = 0; diff --git a/packages/PdoEventSourcing/tests/Projecting/BackfillProjectionTest.php b/packages/PdoEventSourcing/tests/Projecting/BackfillProjectionTest.php new file mode 100644 index 000000000..f7f7922fd --- /dev/null +++ b/packages/PdoEventSourcing/tests/Projecting/BackfillProjectionTest.php @@ -0,0 +1,329 @@ +connection->executeStatement( + "INSERT INTO {$this->tableName()} VALUES (?,?)", + [$event->getTicketId(), $event->getTicketType()] + ); + } + + #[ProjectionInitialization] + public function initialization(): void + { + $this->connection->executeStatement( + "CREATE TABLE IF NOT EXISTS {$this->tableName()} (ticket_id VARCHAR(36) PRIMARY KEY, ticket_type VARCHAR(25))" + ); + } + + #[ProjectionDelete] + public function delete(): void + { + $this->connection->executeStatement("DROP TABLE IF EXISTS {$this->tableName()}"); + } + + #[ProjectionReset] + public function reset(): void + { + $this->connection->executeStatement("DELETE FROM {$this->tableName()}"); + } + + public function getTickets(): array + { + return $this->connection->executeQuery("SELECT * FROM {$this->tableName()} ORDER BY ticket_id ASC")->fetchAllAssociative(); + } +} + +/** + * licence Enterprise + * @internal + */ +final class BackfillProjectionTest extends ProjectingTestCase +{ + public function test_throws_exception_when_backfill_batch_size_is_less_than_one(): void + { + $this->expectException(InvalidArgumentException::class); + $this->expectExceptionMessage('Backfill partition batch size must be at least 1'); + + $connection = $this->getConnection(); + $projection = new #[ + ProjectionV2('batch0_projection'), + Partitioned(MessageHeaders::EVENT_AGGREGATE_ID), + ProjectionBackfill(backfillPartitionBatchSize: 0), + FromStream(stream: Ticket::class, aggregateType: Ticket::class) + ] class ($connection) extends AbstractTicketProjection { + protected function tableName(): string + { + return 'batch0_tickets'; + } + }; + + $this->bootstrapEcotone([$projection::class], [$projection], true); + } + + public function test_partitioned_projection_async_backfill_with_batch_of_2_processes_5_partitions_in_3_runs(): void + { + $connection = $this->getConnection(); + $projection = new #[ + ProjectionV2('batch2_async_projection'), + Partitioned(MessageHeaders::EVENT_AGGREGATE_ID), + ProjectionBackfill(backfillPartitionBatchSize: 2, asyncChannelName: 'backfill_channel'), + FromStream(stream: Ticket::class, aggregateType: Ticket::class) + ] class ($connection) extends AbstractTicketProjection { + #[QueryHandler('getBackfillTickets')] + public function query(): array + { + return $this->getTickets(); + } + + protected function tableName(): string + { + return 'batch2_tickets'; + } + }; + + $ecotone = $this->bootstrapEcotone( + [$projection::class], + [$projection], + [SimpleMessageChannelBuilder::createQueueChannel('backfill_channel')], + TestConfiguration::createWithDefaults()->withSpyOnChannel('backfill_channel') + ); + + $this->createPartitions($ecotone, 5); + + $ecotone->deleteProjection('batch2_async_projection') + ->initializeProjection('batch2_async_projection'); + + $ecotone->runConsoleCommand('ecotone:projection:backfill', ['name' => 'batch2_async_projection']); + + $messages = $ecotone->getRecordedMessagePayloadsFrom('backfill_channel'); + self::assertCount(3, $messages); + + $ecotone->run('backfill_channel', ExecutionPollingMetadata::createWithTestingSetup(amountOfMessagesToHandle: 1)); + $ecotone->run('backfill_channel', ExecutionPollingMetadata::createWithTestingSetup(amountOfMessagesToHandle: 1)); + self::assertCount(4, $ecotone->sendQueryWithRouting('getBackfillTickets')); + + $ecotone->run('backfill_channel', ExecutionPollingMetadata::createWithTestingSetup(amountOfMessagesToHandle: 1)); + self::assertCount(5, $ecotone->sendQueryWithRouting('getBackfillTickets')); + } + + public function test_partitioned_projection_async_backfill_with_batch_of_5_completes_in_single_run(): void + { + $connection = $this->getConnection(); + $projection = new #[ + ProjectionV2('batch5_async_projection'), + Partitioned(MessageHeaders::EVENT_AGGREGATE_ID), + ProjectionBackfill(backfillPartitionBatchSize: 5, asyncChannelName: 'backfill_channel'), + FromStream(stream: Ticket::class, aggregateType: Ticket::class) + ] class ($connection) extends AbstractTicketProjection { + #[QueryHandler('getBackfillTickets5')] + public function query(): array + { + return $this->getTickets(); + } + + protected function tableName(): string + { + return 'batch5_tickets'; + } + }; + + $ecotone = $this->bootstrapEcotone( + [$projection::class], + [$projection], + [SimpleMessageChannelBuilder::createQueueChannel('backfill_channel')], + TestConfiguration::createWithDefaults()->withSpyOnChannel('backfill_channel') + ); + + $this->createPartitions($ecotone, 5); + + $ecotone->deleteProjection('batch5_async_projection') + ->initializeProjection('batch5_async_projection'); + + $ecotone->runConsoleCommand('ecotone:projection:backfill', ['name' => 'batch5_async_projection']); + + $messages = $ecotone->getRecordedMessagePayloadsFrom('backfill_channel'); + self::assertCount(1, $messages); + + $ecotone->run('backfill_channel', ExecutionPollingMetadata::createWithTestingSetup(amountOfMessagesToHandle: 1)); + self::assertCount(5, $ecotone->sendQueryWithRouting('getBackfillTickets5')); + } + + public function test_partitioned_projection_sync_backfill_processes_all_partitions_immediately(): void + { + $connection = $this->getConnection(); + $projection = new #[ + ProjectionV2('sync_partitioned_projection'), + Partitioned(MessageHeaders::EVENT_AGGREGATE_ID), + ProjectionBackfill(backfillPartitionBatchSize: 2), + FromStream(stream: Ticket::class, aggregateType: Ticket::class) + ] class ($connection) extends AbstractTicketProjection { + #[QueryHandler('getSyncBackfillTickets')] + public function query(): array + { + return $this->getTickets(); + } + + protected function tableName(): string + { + return 'sync_partitioned_tickets'; + } + }; + + $ecotone = $this->bootstrapEcotone([$projection::class], [$projection], true); + + $ecotone->deleteProjection('sync_partitioned_projection') + ->initializeProjection('sync_partitioned_projection'); + + $this->createPartitions($ecotone, 5); + + $ecotone->runConsoleCommand('ecotone:projection:backfill', ['name' => 'sync_partitioned_projection']); + + self::assertCount(5, $ecotone->sendQueryWithRouting('getSyncBackfillTickets')); + } + + public function test_global_projection_async_backfill_processes_all_events_after_running_channel(): void + { + $connection = $this->getConnection(); + $projection = new #[ + ProjectionV2('global_async_projection'), + ProjectionBackfill(asyncChannelName: 'backfill_global_channel'), + FromStream(Ticket::class) + ] class ($connection) extends AbstractTicketProjection { + #[QueryHandler('getGlobalAsyncTickets')] + public function query(): array + { + return $this->getTickets(); + } + + protected function tableName(): string + { + return 'global_async_tickets'; + } + }; + + $ecotone = $this->bootstrapEcotone( + [$projection::class], + [$projection], + [SimpleMessageChannelBuilder::createQueueChannel('backfill_global_channel')], + TestConfiguration::createWithDefaults()->withSpyOnChannel('backfill_global_channel') + ); + + $this->createTickets($ecotone, 3); + + $ecotone->deleteProjection('global_async_projection') + ->initializeProjection('global_async_projection'); + + $ecotone->runConsoleCommand('ecotone:projection:backfill', ['name' => 'global_async_projection']); + + self::assertCount(0, $ecotone->sendQueryWithRouting('getGlobalAsyncTickets')); + + $messages = $ecotone->getRecordedMessagePayloadsFrom('backfill_global_channel'); + self::assertCount(1, $messages); + + $ecotone->run('backfill_global_channel', ExecutionPollingMetadata::createWithTestingSetup(amountOfMessagesToHandle: 1)); + self::assertCount(3, $ecotone->sendQueryWithRouting('getGlobalAsyncTickets')); + } + + public function test_global_projection_sync_backfill_processes_all_events_immediately(): void + { + $connection = $this->getConnection(); + $projection = new #[ + ProjectionV2('global_sync_projection'), + ProjectionBackfill(), + FromStream(Ticket::class) + ] class ($connection) extends AbstractTicketProjection { + #[QueryHandler('getGlobalSyncTickets')] + public function query(): array + { + return $this->getTickets(); + } + + protected function tableName(): string + { + return 'global_sync_tickets'; + } + }; + + $ecotone = $this->bootstrapEcotone([$projection::class], [$projection], true); + + $ecotone->deleteProjection('global_sync_projection') + ->initializeProjection('global_sync_projection'); + + $this->createTickets($ecotone, 3); + + $ecotone->runConsoleCommand('ecotone:projection:backfill', ['name' => 'global_sync_projection']); + + self::assertCount(3, $ecotone->sendQueryWithRouting('getGlobalSyncTickets')); + } + + private function createPartitions(FlowTestSupport $ecotone, int $count): void + { + for ($i = 1; $i <= $count; $i++) { + $ecotone->sendCommand(new RegisterTicket((string)$i, "User{$i}", "type{$i}")); + } + } + + private function createTickets(FlowTestSupport $ecotone, int $count): void + { + for ($i = 1; $i <= $count; $i++) { + $ecotone->sendCommand(new RegisterTicket((string)$i, "User{$i}", 'alert')); + } + } + + private function bootstrapEcotone(array $classesToResolve, array $services, bool|array $channels, ?TestConfiguration $testConfiguration = null): FlowTestSupport + { + return EcotoneLite::bootstrapFlowTestingWithEventStore( + classesToResolve: [...$classesToResolve, Ticket::class, TicketEventConverter::class], + containerOrAvailableServices: [...$services, new TicketEventConverter(), self::getConnectionFactory()], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ + ModulePackageList::DBAL_PACKAGE, + ModulePackageList::EVENT_SOURCING_PACKAGE, + ModulePackageList::ASYNCHRONOUS_PACKAGE, + ])), + runForProductionEventStore: true, + enableAsynchronousProcessing: $channels, + licenceKey: LicenceTesting::VALID_LICENCE, + testConfiguration: $testConfiguration, + ); + } +} From bd19befb2bdd9b08b84e14009fcc10a70e6dc7c6 Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Fri, 9 Jan 2026 10:16:16 +0100 Subject: [PATCH 07/10] fixes --- .../Lite/InMemoryEventStoreRegistrationTest.php | 12 +++--------- .../src/Config/ProophProjectingModule.php | 6 +++--- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/packages/Ecotone/tests/Lite/InMemoryEventStoreRegistrationTest.php b/packages/Ecotone/tests/Lite/InMemoryEventStoreRegistrationTest.php index 974058340..5040ad372 100644 --- a/packages/Ecotone/tests/Lite/InMemoryEventStoreRegistrationTest.php +++ b/packages/Ecotone/tests/Lite/InMemoryEventStoreRegistrationTest.php @@ -4,6 +4,7 @@ namespace Test\Ecotone\Lite; +use Ecotone\EventSourcing\Attribute\FromStream; use Ecotone\Lite\EcotoneLite; use Ecotone\Messaging\Attribute\Converter; use Ecotone\Messaging\Config\ModulePackageList; @@ -25,15 +26,13 @@ class InMemoryEventStoreRegistrationTest extends TestCase { public function test_registers_in_memory_event_store_stream_source_when_pdo_event_sourcing_is_disabled(): void { - // Given a test event class $testEvent = new class () { public function __construct(public int $id = 0, public string $name = '') { } }; - // Given a polling projection (polling projections read from stream sources) - $projection = new #[ProjectionV2('test_projection'), Polling('test_projection_poller')] class ($testEvent) { + $projection = new #[ProjectionV2('test_projection'), Polling('test_projection_poller'), FromStream('test_stream')] class ($testEvent) { public array $events = []; public int $callCount = 0; private string $eventClass; @@ -77,18 +76,13 @@ public function onEvent(object $event): void public function test_does_not_register_in_memory_stream_source_when_custom_stream_source_is_provided(): void { - // This test verifies that when a custom stream source is provided, - // the InMemoryEventStoreStreamSourceBuilder is NOT registered - - // Given a test event class $testEvent = new class () { public function __construct(public int $id = 0, public string $name = '') { } }; - // Given a polling projection - $projection = new #[ProjectionV2('test_projection'), Polling('test_projection_poller')] class { + $projection = new #[ProjectionV2('test_projection'), Polling('test_projection_poller'), FromStream('test_stream')] class { public array $events = []; public int $callCount = 0; diff --git a/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php b/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php index 62dedb032..df8648f2e 100644 --- a/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php +++ b/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php @@ -175,7 +175,8 @@ public function canHandle($extensionObject): bool public function getModuleExtensions(ServiceConfiguration $serviceConfiguration, array $serviceExtensions): array { - $extensions = [...$this->extensions]; + $eventSourcingConfiguration = ExtensionObjectResolver::resolveUnique(EventSourcingConfiguration::class, $serviceExtensions, EventSourcingConfiguration::createWithDefaults()); + $extensions = $eventSourcingConfiguration->isInMemory() ? [] : [...$this->extensions]; foreach ($serviceExtensions as $extensionObject) { if (! ($extensionObject instanceof EventStreamingChannelAdapter)) { @@ -187,11 +188,10 @@ public function getModuleExtensions(ServiceConfiguration $serviceConfiguration, $extensionObject->fromStream, [$projectionName] ); - } + }; $extensions[] = new DbalTableManagerReference(ProjectionStateTableManager::class); - $eventSourcingConfiguration = ExtensionObjectResolver::resolveUnique(EventSourcingConfiguration::class, $serviceExtensions, EventSourcingConfiguration::createWithDefaults()); $eventStreamingChannelAdapters = ExtensionObjectResolver::resolve(EventStreamingChannelAdapter::class, $serviceExtensions); if (($this->projectionNames || $eventStreamingChannelAdapters) && ! $eventSourcingConfiguration->isInMemory()) { From 0d51369575de6eeb69d737f557424fdbfaa84d50 Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Fri, 9 Jan 2026 10:49:04 +0100 Subject: [PATCH 08/10] use stream filter --- .../EcotoneConfiguration.php | 4 +- .../Config/StreamFilterRegistryModule.php | 93 ++++++++++++++--- .../Ecotone/src/Projecting/StreamFilter.php | 6 +- .../src/Config/ProophProjectingModule.php | 99 ++----------------- .../EventStoreAggregateStreamSource.php | 23 ++--- ...EventStoreAggregateStreamSourceBuilder.php | 20 ++-- .../EventStoreGlobalStreamSource.php | 22 ++--- .../EventStoreGlobalStreamSourceBuilder.php | 10 +- 8 files changed, 127 insertions(+), 150 deletions(-) diff --git a/Monorepo/ExampleAppEventSourcing/EcotoneProjection/EcotoneConfiguration.php b/Monorepo/ExampleAppEventSourcing/EcotoneProjection/EcotoneConfiguration.php index 3b56d4390..9d682ce92 100644 --- a/Monorepo/ExampleAppEventSourcing/EcotoneProjection/EcotoneConfiguration.php +++ b/Monorepo/ExampleAppEventSourcing/EcotoneProjection/EcotoneConfiguration.php @@ -9,6 +9,7 @@ use Ecotone\EventSourcing\Projecting\StreamSource\EventStoreGlobalStreamSourceBuilder; use Ecotone\Messaging\Attribute\ServiceContext; use Ecotone\Projecting\Config\ProjectionComponentBuilder; +use Ecotone\Projecting\StreamFilter; use Monorepo\ExampleAppEventSourcing\Common\Product; class EcotoneConfiguration @@ -16,7 +17,6 @@ class EcotoneConfiguration #[ServiceContext] public function enableStreamSourceFromEventStore(): ProjectionComponentBuilder { - return new EventStoreGlobalStreamSourceBuilder(Product::class, [PriceChangeOverTimeProjectionWithEcotoneProjection::NAME]); -// return new EventStoreAggregateStreamSourceBuilder(PriceChangeOverTimeProjectionWithEcotoneProjection::NAME, Product::class, Product::class); + return new EventStoreGlobalStreamSourceBuilder(new StreamFilter(Product::class), [PriceChangeOverTimeProjectionWithEcotoneProjection::NAME]); } } \ No newline at end of file diff --git a/packages/Ecotone/src/Projecting/Config/StreamFilterRegistryModule.php b/packages/Ecotone/src/Projecting/Config/StreamFilterRegistryModule.php index 9716af76c..a58732ca3 100644 --- a/packages/Ecotone/src/Projecting/Config/StreamFilterRegistryModule.php +++ b/packages/Ecotone/src/Projecting/Config/StreamFilterRegistryModule.php @@ -21,15 +21,14 @@ use Ecotone\Messaging\Config\ModuleReferenceSearchService; use Ecotone\Messaging\Config\ServiceConfiguration; use Ecotone\Messaging\Handler\InterfaceToCallRegistry; +use Ecotone\Modelling\Attribute\EventHandler; use Ecotone\Modelling\Attribute\EventSourcingAggregate; +use Ecotone\Modelling\Attribute\NamedEvent; +use Ecotone\Modelling\Config\Routing\BusRoutingMapBuilder; use Ecotone\Projecting\Attribute\ProjectionV2; use Ecotone\Projecting\StreamFilter; use Ecotone\Projecting\StreamFilterRegistry; -/** - * Collects stream filters from #[FromStream] and #[FromAggregateStream] attributes - * and registers StreamFilterRegistry as a service. - */ #[ModuleAnnotation] class StreamFilterRegistryModule implements AnnotationModule { @@ -40,33 +39,36 @@ public function __construct(private array $streamFilters) public static function create(AnnotationFinder $annotationRegistrationService, InterfaceToCallRegistry $interfaceToCallRegistry): static { - return new self(self::collectStreamFilters($annotationRegistrationService)); + return new self(self::collectStreamFilters($annotationRegistrationService, $interfaceToCallRegistry)); } /** - * Collects stream filters from all #[ProjectionV2] classes. - * This method can be reused by other modules that need stream filter information. - * * @return array Map of projection name to stream filters */ - public static function collectStreamFilters(AnnotationFinder $annotationFinder): array + public static function collectStreamFilters(AnnotationFinder $annotationFinder, ?InterfaceToCallRegistry $interfaceToCallRegistry = null): array { + $projectionEventNames = $interfaceToCallRegistry !== null + ? self::collectProjectionEventNames($annotationFinder, $interfaceToCallRegistry) + : []; + $streamFilters = []; foreach ($annotationFinder->findAnnotatedClasses(ProjectionV2::class) as $classname) { $projectionAttribute = $annotationFinder->getAttributeForClass($classname, ProjectionV2::class); $projectionName = $projectionAttribute->name; + $eventNames = $projectionEventNames[$projectionName] ?? []; foreach ($annotationFinder->getAnnotationsForClass($classname, FromStream::class) as $streamAttribute) { $streamFilters[$projectionName][] = new StreamFilter( $streamAttribute->stream, $streamAttribute->aggregateType, $streamAttribute->eventStoreReferenceName, + $eventNames, ); } foreach ($annotationFinder->getAnnotationsForClass($classname, FromAggregateStream::class) as $aggregateStreamAttribute) { - $streamFilters[$projectionName][] = self::resolveFromAggregateStream($annotationFinder, $aggregateStreamAttribute, $projectionName); + $streamFilters[$projectionName][] = self::resolveFromAggregateStream($annotationFinder, $aggregateStreamAttribute, $projectionName, $eventNames); } if (! isset($streamFilters[$projectionName]) || $streamFilters[$projectionName] === []) { @@ -79,10 +81,74 @@ public static function collectStreamFilters(AnnotationFinder $annotationFinder): return $streamFilters; } + /** + * @param array $namedEvents Map of class name to named event name + * @return array> Map of projection name to event names (empty array means no filtering) + */ + public static function collectProjectionEventNames( + AnnotationFinder $annotationFinder, + InterfaceToCallRegistry $interfaceToCallRegistry, + ): array { + $namedEvents = []; + foreach ($annotationFinder->findAnnotatedClasses(NamedEvent::class) as $className) { + $attribute = $annotationFinder->getAttributeForClass($className, NamedEvent::class); + $namedEvents[$className] = $attribute->getName(); + } + + $projectionEventNames = []; + $disabledFiltering = []; + $routingMapBuilder = new BusRoutingMapBuilder(); + + foreach ($annotationFinder->findCombined(ProjectionV2::class, EventHandler::class) as $projectionEventHandler) { + /** @var ProjectionV2 $projectionAttribute */ + $projectionAttribute = $projectionEventHandler->getAnnotationForClass(); + $projectionName = $projectionAttribute->name; + + if (! isset($projectionEventNames[$projectionName])) { + $projectionEventNames[$projectionName] = []; + } + + if (isset($disabledFiltering[$projectionName])) { + continue; + } + + $routes = $routingMapBuilder->getRoutesFromAnnotatedFinding($projectionEventHandler, $interfaceToCallRegistry); + foreach ($routes as $route) { + if ($route === '*' || $route === 'object') { + $projectionEventNames[$projectionName] = []; + $disabledFiltering[$projectionName] = true; + break; + } + + if (str_contains($route, '*')) { + throw ConfigurationException::create( + "Projection {$projectionName} uses glob pattern '{$route}' which is not allowed. " . + 'For query optimization, event handlers must use explicit event names. Use union type parameters instead.' + ); + } + + if (class_exists($route) && isset($namedEvents[$route])) { + $projectionEventNames[$projectionName][] = $namedEvents[$route]; + } else { + $projectionEventNames[$projectionName][] = $route; + } + } + } + + foreach ($projectionEventNames as $projectionName => $eventNames) { + if (! isset($disabledFiltering[$projectionName]) && $eventNames !== []) { + $projectionEventNames[$projectionName] = array_values(array_unique($eventNames)); + } + } + + return $projectionEventNames; + } + private static function resolveFromAggregateStream( AnnotationFinder $annotationFinder, FromAggregateStream $attribute, - string $projectionName + string $projectionName, + array $eventNames = [] ): StreamFilter { $aggregateClass = $attribute->aggregateClass; @@ -91,19 +157,17 @@ private static function resolveFromAggregateStream( throw ConfigurationException::create("Class {$aggregateClass} referenced in #[FromAggregateStream] for projection {$projectionName} must be an EventSourcingAggregate."); } - // Resolve stream name from #[Stream] attribute if available $streamName = $aggregateClass; if (class_exists(Stream::class)) { $streamAttribute = $annotationFinder->findAttributeForClass($aggregateClass, Stream::class); $streamName = $streamAttribute?->getName() ?? $aggregateClass; } - // Resolve aggregate type from #[AggregateType] attribute if available $aggregateType = $aggregateClass; $aggregateTypeAttribute = $annotationFinder->findAttributeForClass($aggregateClass, AggregateType::class); $aggregateType = $aggregateTypeAttribute?->getName() ?? $aggregateClass; - return new StreamFilter($streamName, $aggregateType, $attribute->eventStoreReferenceName); + return new StreamFilter($streamName, $aggregateType, $attribute->eventStoreReferenceName, $eventNames); } public function prepare(Configuration $messagingConfiguration, array $extensionObjects, ModuleReferenceSearchService $moduleReferenceSearchService, InterfaceToCallRegistry $interfaceToCallRegistry): void @@ -116,6 +180,7 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO $filter->streamName, $filter->aggregateType, $filter->eventStoreReferenceName, + $filter->eventNames, ]); } } diff --git a/packages/Ecotone/src/Projecting/StreamFilter.php b/packages/Ecotone/src/Projecting/StreamFilter.php index de6af389e..212efe65b 100644 --- a/packages/Ecotone/src/Projecting/StreamFilter.php +++ b/packages/Ecotone/src/Projecting/StreamFilter.php @@ -11,14 +11,18 @@ /** * Value object representing a filter for stream-based projections. - * Contains stream name, optional aggregate type, and event store reference. + * Contains stream name, optional aggregate type, event store reference, and event names. */ final class StreamFilter { + /** + * @param array $eventNames Event names to filter, empty array means no filtering + */ public function __construct( public readonly string $streamName, public readonly ?string $aggregateType = null, public readonly string $eventStoreReferenceName = EventStore::class, + public readonly array $eventNames = [], ) { } } diff --git a/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php b/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php index df8648f2e..2b723ac34 100644 --- a/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php +++ b/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php @@ -27,9 +27,6 @@ use Ecotone\Messaging\Config\ModuleReferenceSearchService; use Ecotone\Messaging\Config\ServiceConfiguration; use Ecotone\Messaging\Handler\InterfaceToCallRegistry; -use Ecotone\Modelling\Attribute\EventHandler; -use Ecotone\Modelling\Attribute\NamedEvent; -use Ecotone\Modelling\Config\Routing\BusRoutingMapBuilder; use Ecotone\Projecting\Attribute\Partitioned; use Ecotone\Projecting\Attribute\ProjectionV2; use Ecotone\Projecting\Config\ProjectionComponentBuilder; @@ -52,17 +49,9 @@ public function __construct( public static function create(AnnotationFinder $annotationRegistrationService, InterfaceToCallRegistry $interfaceToCallRegistry): static { - $namedEvents = []; - foreach ($annotationRegistrationService->findAnnotatedClasses(NamedEvent::class) as $className) { - $attribute = $annotationRegistrationService->getAttributeForClass($className, NamedEvent::class); - $namedEvents[$className] = $attribute->getName(); - } - - $projectionEventNames = self::collectProjectionEventNames($annotationRegistrationService, $interfaceToCallRegistry, $namedEvents); - - $allStreamFilters = StreamFilterRegistryModule::collectStreamFilters($annotationRegistrationService); + $allStreamFilters = StreamFilterRegistryModule::collectStreamFilters($annotationRegistrationService, $interfaceToCallRegistry); - $extensions = self::resolveConfigs($annotationRegistrationService, $allStreamFilters, $projectionEventNames); + $extensions = self::resolveConfigs($annotationRegistrationService, $allStreamFilters); $projectionNames = []; foreach ($annotationRegistrationService->findAnnotatedClasses(ProjectionV2::class) as $projectionClassName) { @@ -77,22 +66,17 @@ public static function create(AnnotationFinder $annotationRegistrationService, I } /** - * Resolve stream configurations from StreamFilters collected by StreamFilterRegistryModule. - * * @param array $allStreamFilters * @return list */ private static function resolveConfigs( AnnotationFinder $annotationRegistrationService, array $allStreamFilters, - array $projectionEventNames - ): array - { + ): array { $extensions = []; $partitionProviders = []; foreach ($allStreamFilters as $projectionName => $streamFilters) { - // Find the projection class to check for Partitioned attribute $projectionClass = null; foreach ($annotationRegistrationService->findAnnotatedClasses(ProjectionV2::class) as $classname) { $projectionAttribute = $annotationRegistrationService->getAttributeForClass($classname, ProjectionV2::class); @@ -109,7 +93,6 @@ private static function resolveConfigs( $partitionedAttribute = $annotationRegistrationService->findAttributeForClass($projectionClass, Partitioned::class); $isPartitioned = $partitionedAttribute !== null; - // @todo: Partitioned projections cannot be declared with multiple streams because the current partition provider cannot merge partitions from multiple streams. if ($isPartitioned && \count($streamFilters) > 1) { throw ConfigurationException::create( "Partitioned projection {$projectionName} cannot declare multiple streams. Use a single aggregate stream or remove #[Partitioned]." @@ -125,17 +108,14 @@ private static function resolveConfigs( $sourceIdentifier = $streamFilter->streamName . '.' . $streamFilter->aggregateType; $sources[$sourceIdentifier] = new EventStoreAggregateStreamSourceBuilder( $projectionName, - $streamFilter->aggregateType, - $streamFilter->streamName, - $projectionEventNames[$projectionName] ?? [], + $streamFilter, ); - // Only add partition provider once per projection if (! isset($partitionProviders[$projectionName])) { $partitionProviders[$projectionName] = new AggregateIdPartitionProviderBuilder($projectionName); } } else { $sources[$streamFilter->streamName] = new EventStoreGlobalStreamSourceBuilder( - $streamFilter->streamName, + $streamFilter, [$projectionName] ); } @@ -185,10 +165,10 @@ public function getModuleExtensions(ServiceConfiguration $serviceConfiguration, $projectionName = $extensionObject->getProjectionName(); $extensions[] = new EventStoreGlobalStreamSourceBuilder( - $extensionObject->fromStream, + new StreamFilter($extensionObject->fromStream), [$projectionName] ); - }; + } $extensions[] = new DbalTableManagerReference(ProjectionStateTableManager::class); @@ -207,69 +187,4 @@ public function getModulePackageName(): string { return ModulePackageList::EVENT_SOURCING_PACKAGE; } - - /** - * Collect event names for each partitioned projection. - * Returns empty array for projections that use catch-all patterns or object types. - * - * @param array $namedEvents Map of class name to named event name - * @return array> Map of projection name to event names (empty array means no filtering) - */ - private static function collectProjectionEventNames( - AnnotationFinder $annotationRegistrationService, - InterfaceToCallRegistry $interfaceToCallRegistry, - array $namedEvents - ): array { - $projectionEventNames = []; - $disabledFiltering = []; - $routingMapBuilder = new BusRoutingMapBuilder(); - - foreach ($annotationRegistrationService->findCombined(ProjectionV2::class, EventHandler::class) as $projectionEventHandler) { - /** @var ProjectionV2 $projectionAttribute */ - $projectionAttribute = $projectionEventHandler->getAnnotationForClass(); - $projectionName = $projectionAttribute->name; - - if (! isset($projectionEventNames[$projectionName])) { - $projectionEventNames[$projectionName] = []; - } - - if (isset($disabledFiltering[$projectionName])) { - continue; - } - - $routes = $routingMapBuilder->getRoutesFromAnnotatedFinding($projectionEventHandler, $interfaceToCallRegistry); - foreach ($routes as $route) { - // Check for catch-all pattern - disable filtering by keeping empty array - if ($route === '*' || $route === 'object') { - $projectionEventNames[$projectionName] = []; - $disabledFiltering[$projectionName] = true; - break; - } - - // Check for glob patterns (containing * but not exactly *) - if (str_contains($route, '*')) { - throw ConfigurationException::create( - "Projection {$projectionName} uses glob pattern '{$route}' which is not allowed. " . - 'For query optimization, event handlers must use explicit event names. Use union type parameters instead.' - ); - } - - // Check if route is a class with NamedEvent annotation - if (class_exists($route) && isset($namedEvents[$route])) { - $projectionEventNames[$projectionName][] = $namedEvents[$route]; - } else { - $projectionEventNames[$projectionName][] = $route; - } - } - } - - // Deduplicate event names (skip disabled ones which are empty arrays) - foreach ($projectionEventNames as $projectionName => $eventNames) { - if (! isset($disabledFiltering[$projectionName]) && $eventNames !== []) { - $projectionEventNames[$projectionName] = array_values(array_unique($eventNames)); - } - } - - return $projectionEventNames; - } } diff --git a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSource.php b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSource.php index 35f53d069..336de6319 100644 --- a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSource.php +++ b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSource.php @@ -13,20 +13,16 @@ use Ecotone\EventSourcing\EventStore\Operator; use Ecotone\Messaging\MessageHeaders; use Ecotone\Messaging\Support\Assert; +use Ecotone\Projecting\StreamFilter; use Ecotone\Projecting\StreamPage; use Ecotone\Projecting\StreamSource; use RuntimeException; class EventStoreAggregateStreamSource implements StreamSource { - /** - * @param array $eventNames Event names to filter by, empty array means no filtering - */ public function __construct( - private EventStore $eventStore, - private string $streamName, - private ?string $aggregateType, - private array $eventNames = [], + private EventStore $eventStore, + private StreamFilter $streamFilter, ) { } @@ -34,17 +30,16 @@ public function load(?string $lastPosition, int $count, ?string $partitionKey = { Assert::notNull($partitionKey, 'Partition key cannot be null for aggregate stream source'); - if (! $this->eventStore->hasStream($this->streamName)) { + if (! $this->eventStore->hasStream($this->streamFilter->streamName)) { return new StreamPage([], $lastPosition ?? ''); } $metadataMatcher = new MetadataMatcher(); - if ($this->aggregateType !== null) { - // @todo: watch out ! Prooph's event store has an index on (aggregate_type, aggregate_id). Not adding aggregate type here will result in a full table scan + if ($this->streamFilter->aggregateType !== null) { $metadataMatcher = $metadataMatcher->withMetadataMatch( MessageHeaders::EVENT_AGGREGATE_TYPE, Operator::EQUALS, - $this->aggregateType + $this->streamFilter->aggregateType ); } $metadataMatcher = $metadataMatcher->withMetadataMatch( @@ -58,17 +53,17 @@ public function load(?string $lastPosition, int $count, ?string $partitionKey = (int)$lastPosition + 1 ); - if ($this->eventNames !== []) { + if ($this->streamFilter->eventNames !== []) { $metadataMatcher = $metadataMatcher->withMetadataMatch( 'event_name', Operator::IN, - $this->eventNames, + $this->streamFilter->eventNames, FieldType::MESSAGE_PROPERTY ); } $events = $this->eventStore->load( - $this->streamName, + $this->streamFilter->streamName, 1, $count, $metadataMatcher, diff --git a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSourceBuilder.php b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSourceBuilder.php index 7b6909bcf..e24b29193 100644 --- a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSourceBuilder.php +++ b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSourceBuilder.php @@ -7,23 +7,18 @@ namespace Ecotone\EventSourcing\Projecting\StreamSource; -use Ecotone\EventSourcing\EventStore; use Ecotone\Messaging\Config\Container\Definition; use Ecotone\Messaging\Config\Container\MessagingContainerBuilder; use Ecotone\Messaging\Config\Container\Reference; use Ecotone\Projecting\Config\ProjectionComponentBuilder; +use Ecotone\Projecting\StreamFilter; use Ecotone\Projecting\StreamSource; class EventStoreAggregateStreamSourceBuilder implements ProjectionComponentBuilder { - /** - * @param array $eventNames Event names to filter, empty array means no filtering - */ public function __construct( public readonly string $handledProjectionName, - public ?string $aggregateType, - private string $streamName, - private array $eventNames = [], + public readonly StreamFilter $streamFilter, ) { } @@ -37,10 +32,13 @@ public function compile(MessagingContainerBuilder $builder): Definition|Referenc return new Definition( EventStoreAggregateStreamSource::class, [ - new Reference(EventStore::class), - $this->streamName, - $this->aggregateType, - $this->eventNames, + new Reference($this->streamFilter->eventStoreReferenceName), + new Definition(StreamFilter::class, [ + $this->streamFilter->streamName, + $this->streamFilter->aggregateType, + $this->streamFilter->eventStoreReferenceName, + $this->streamFilter->eventNames, + ]), ], ); } diff --git a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php index 8fed841e1..8ebc2beb2 100644 --- a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php +++ b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php @@ -18,7 +18,7 @@ use Ecotone\Messaging\Scheduling\Duration; use Ecotone\Messaging\Scheduling\EcotoneClockInterface; use Ecotone\Messaging\Support\Assert; -use Ecotone\Modelling\Event; +use Ecotone\Projecting\StreamFilter; use Ecotone\Projecting\StreamPage; use Ecotone\Projecting\StreamSource; use Enqueue\Dbal\DbalConnectionFactory; @@ -31,7 +31,7 @@ class EventStoreGlobalStreamSource implements StreamSource public function __construct( private DbalConnectionFactory|ManagerRegistryConnectionFactory|MultiTenantConnectionFactory $connectionFactory, private EcotoneClockInterface $clock, - private string $streamName, + private StreamFilter $streamFilter, private PdoStreamTableNameProvider $tableNameProvider, private int $maxGapOffset = 5_000, private ?Duration $gapTimeout = null, @@ -53,8 +53,7 @@ public function load(?string $lastPosition, int $count, ?string $partitionKey = $connection = $this->getConnection(); - // Resolve table name at runtime using the provider - $proophStreamTable = $this->tableNameProvider->generateTableNameForStream($this->streamName); + $proophStreamTable = $this->tableNameProvider->generateTableNameForStream($this->streamFilter->streamName); if (empty($lastPosition) && ! SchemaManagerCompatibility::tableExists($connection, $proophStreamTable)) { return new StreamPage([], ''); @@ -111,12 +110,10 @@ private function cleanGapsByTimeout(GapAwarePosition $tracking, Connection $conn } $minGap = $gaps[0]; - $maxGap = $gaps[count($gaps) - 1]; + $maxGap = $gaps[\count($gaps) - 1]; - // Resolve table name at runtime - $proophStreamTable = $this->tableNameProvider->generateTableNameForStream($this->streamName); + $proophStreamTable = $this->tableNameProvider->generateTableNameForStream($this->streamFilter->streamName); - // Query interleaved events in the gap range $interleavedEvents = $connection->executeQuery(<<clock->now()->sub($this->gapTimeout)->unixTime()->inSeconds(); - // Find the highest position with timestamp < timeThreshold - $cutoffPosition = $minGap; // default: keep all gaps + $cutoffPosition = $minGap; foreach ($interleavedEvents as $event) { $interleavedEventPosition = $event['no']; $timestamp = $this->getTimestamp($event['created_at']); if ($timestamp > $timestampThreshold) { - // Event is recent, do not remove any gaps below this position break; } - if (in_array($interleavedEventPosition, $gaps, true)) { - // This position is a gap that could be filled, stop cleaning + if (\in_array($interleavedEventPosition, $gaps, true)) { break; } if ($timestamp < $timestampThreshold && $interleavedEventPosition > $cutoffPosition) { - $cutoffPosition = $interleavedEventPosition + 1; // Remove gaps below this position + $cutoffPosition = $interleavedEventPosition + 1; } } diff --git a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSourceBuilder.php b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSourceBuilder.php index 39975353d..7455cc258 100644 --- a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSourceBuilder.php +++ b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSourceBuilder.php @@ -14,13 +14,14 @@ use Ecotone\Messaging\Scheduling\Duration; use Ecotone\Messaging\Scheduling\EcotoneClockInterface; use Ecotone\Projecting\Config\ProjectionComponentBuilder; +use Ecotone\Projecting\StreamFilter; use Ecotone\Projecting\StreamSource; use Enqueue\Dbal\DbalConnectionFactory; class EventStoreGlobalStreamSourceBuilder implements ProjectionComponentBuilder { public function __construct( - private string $streamName, + private StreamFilter $streamFilter, private array $handledProjectionNames, ) { } @@ -37,7 +38,12 @@ public function compile(MessagingContainerBuilder $builder): Definition|Referenc [ Reference::to(DbalConnectionFactory::class), Reference::to(EcotoneClockInterface::class), - $this->streamName, + new Definition(StreamFilter::class, [ + $this->streamFilter->streamName, + $this->streamFilter->aggregateType, + $this->streamFilter->eventStoreReferenceName, + $this->streamFilter->eventNames, + ]), Reference::to(PdoStreamTableNameProvider::class), 5_000, new Definition(Duration::class, [60], 'seconds'), From 18dd3007f7bb1682247212c4b834cd10371414b8 Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Fri, 9 Jan 2026 18:00:30 +0100 Subject: [PATCH 09/10] fixes --- .../tests/Projecting/GapAwarePositionIntegrationTest.php | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/packages/PdoEventSourcing/tests/Projecting/GapAwarePositionIntegrationTest.php b/packages/PdoEventSourcing/tests/Projecting/GapAwarePositionIntegrationTest.php index 4f0e17308..c24337882 100644 --- a/packages/PdoEventSourcing/tests/Projecting/GapAwarePositionIntegrationTest.php +++ b/packages/PdoEventSourcing/tests/Projecting/GapAwarePositionIntegrationTest.php @@ -21,6 +21,7 @@ use Ecotone\Projecting\Attribute\ProjectionV2; use Ecotone\Projecting\ProjectingManager; use Ecotone\Projecting\ProjectionRegistry; +use Ecotone\Projecting\StreamFilter; use Ecotone\Test\LicenceTesting; use Enqueue\Dbal\DbalConnectionFactory; use Psr\Clock\ClockInterface; @@ -113,7 +114,7 @@ public function test_max_gap_offset_cleaning(): void $streamSource = new EventStoreGlobalStreamSource( self::$connectionFactory, self::$clock, - Ticket::STREAM_NAME, + new StreamFilter(Ticket::STREAM_NAME), self::$tableNameProvider, maxGapOffset: 3, // Only keep gaps within 3 positions gapTimeout: null @@ -151,7 +152,7 @@ public function test_gap_timeout_cleaning(): void $streamSource = new EventStoreGlobalStreamSource( self::$connectionFactory, self::$clock, - Ticket::STREAM_NAME, + new StreamFilter(Ticket::STREAM_NAME), self::$tableNameProvider, gapTimeout: Duration::seconds(5) ); @@ -185,7 +186,7 @@ public function test_gap_cleaning_noop_when_no_gaps(): void $streamSource = new EventStoreGlobalStreamSource( self::$connectionFactory, self::$clock, - Ticket::STREAM_NAME, + new StreamFilter(Ticket::STREAM_NAME), self::$tableNameProvider, maxGapOffset: 1000, gapTimeout: Duration::seconds(5) @@ -207,7 +208,7 @@ public function test_gap_cleaning_noop_when_timeout_disabled(): void $streamSource = new EventStoreGlobalStreamSource( self::$connectionFactory, self::$clock, - Ticket::STREAM_NAME, + new StreamFilter(Ticket::STREAM_NAME), self::$tableNameProvider, maxGapOffset: 1000, gapTimeout: null // No timeout From 48289714248174e5217d4db34ded417d2623d1e4 Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Fri, 9 Jan 2026 19:39:26 +0100 Subject: [PATCH 10/10] fixes --- .../EcotoneConfiguration.php | 22 ------------------- ...verTimeProjectionWithEcotoneProjection.php | 6 ++++- 2 files changed, 5 insertions(+), 23 deletions(-) delete mode 100644 Monorepo/ExampleAppEventSourcing/EcotoneProjection/EcotoneConfiguration.php diff --git a/Monorepo/ExampleAppEventSourcing/EcotoneProjection/EcotoneConfiguration.php b/Monorepo/ExampleAppEventSourcing/EcotoneProjection/EcotoneConfiguration.php deleted file mode 100644 index 9d682ce92..000000000 --- a/Monorepo/ExampleAppEventSourcing/EcotoneProjection/EcotoneConfiguration.php +++ /dev/null @@ -1,22 +0,0 @@ -