Skip to content

Commit eb36c82

Browse files
authored
Feat: partition projections optimalization (#596)
* feat: partition projection optimalization * refactor * flaky tests * fix phpstan
1 parent c0fdccc commit eb36c82

18 files changed

+619
-80
lines changed

packages/Amqp/tests/AmqpMessagingTestCase.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use AMQPQueueException;
66
use Ecotone\Amqp\Distribution\AmqpDistributionModule;
7+
use Ecotone\Enqueue\CachedConnectionFactory;
78
use Enqueue\AmqpExt\AmqpConnectionFactory as AmqpExtConnection;
89
use Enqueue\AmqpLib\AmqpConnectionFactory as AmqpLibConnection;
910
use Interop\Amqp\AmqpConnectionFactory;
@@ -75,6 +76,9 @@ public static function getRabbitConnectionFactory(array $config = []): AmqpConne
7576

7677
public function setUp(): void
7778
{
79+
// Clear cached connection factories to prevent channel mode conflicts between tests
80+
// (e.g., confirm mode vs transaction mode on the same channel)
81+
CachedConnectionFactory::clearInstances();
7882
// Ensure cache directory is writable for tests
7983
$this->queueCleanUp();
8084
}

packages/Amqp/tests/Integration/SuccessTransactionTest.php

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,6 @@ final class SuccessTransactionTest extends AmqpMessagingTestCase
2020
{
2121
public function test_order_is_placed_when_transaction_is_successful(): void
2222
{
23-
if (getenv('AMQP_IMPLEMENTATION') === 'lib') {
24-
$this->markTestSkipped('Transaction tests require Ext');
25-
}
26-
2723
$ecotone = $this->bootstrapFlowTesting(
2824
containerOrAvailableServices: [new OrderService(), ...$this->getConnectionFactoryReferences()],
2925
configuration: ServiceConfiguration::createWithDefaults()

packages/Ecotone/src/Projecting/Config/ProjectingModule.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,6 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
6767
/** @var array<string, array<string, string>> $components [projection name][component name][reference] */
6868
$components = [];
6969
foreach ($componentBuilders as $componentBuilder) {
70-
$reference = Uuid::uuid4()->toString();
71-
$moduleReferenceSearchService->store($reference, $componentBuilder);
7270
foreach ($projectionBuilders as $projectionBuilder) {
7371
$projectionName = $projectionBuilder->projectionName();
7472
foreach ([StreamSource::class, PartitionProvider::class, ProjectionStateStorage::class] as $component) {
@@ -79,6 +77,9 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
7977
. ' You can only register one component of each type per projection. Please check your configuration.'
8078
);
8179
}
80+
81+
$reference = Uuid::uuid4()->toString();
82+
$moduleReferenceSearchService->store($reference, $componentBuilder);
8283
$components[$projectionName][$component] = new Reference($reference);
8384
}
8485
}

packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreAdapterModule.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public function getModuleExtensions(ServiceConfiguration $serviceConfiguration,
7272
continue;
7373
}
7474

75-
$extensions[] = new EventStoreChannelAdapterProjectionBuilder($extensionObject);
75+
$extensions[] = new EventStoreStreamingChannelAdapterBuilder($extensionObject);
7676
}
7777

7878
return $extensions;

packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreChannelAdapterProjectionBuilder.php renamed to packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreStreamingChannelAdapterBuilder.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
*
1919
* @internal
2020
*/
21-
class EventStoreChannelAdapterProjectionBuilder implements ProjectionExecutorBuilder
21+
class EventStoreStreamingChannelAdapterBuilder implements ProjectionExecutorBuilder
2222
{
2323
public function __construct(
2424
private EventStoreChannelAdapter $channelAdapter

packages/Ecotone/src/Projecting/InMemory/InMemoryEventStoreStreamSource.php

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
namespace Ecotone\Projecting\InMemory;
99

10+
use Ecotone\EventSourcing\EventStore\FieldType;
1011
use Ecotone\EventSourcing\EventStore\InMemoryEventStore;
1112
use Ecotone\EventSourcing\EventStore\MetadataMatcher;
1213
use Ecotone\EventSourcing\EventStore\Operator;
@@ -16,10 +17,14 @@
1617

1718
class InMemoryEventStoreStreamSource implements StreamSource
1819
{
20+
/**
21+
* @param array<string> $eventNames Event names to filter by, empty array means no filtering
22+
*/
1923
public function __construct(
2024
private InMemoryEventStore $eventStore,
2125
private ?string $streamName = null,
22-
private ?string $partitionHeader = null
26+
private ?string $partitionHeader = null,
27+
private array $eventNames = [],
2328
) {
2429
}
2530

@@ -38,20 +43,26 @@ public function load(?string $lastPosition, int $count, ?string $partitionKey =
3843
continue;
3944
}
4045

41-
$metadataMatcher = null;
46+
$metadataMatcher = new MetadataMatcher();
4247
if ($partitionKey !== null && $this->partitionHeader !== null) {
43-
$metadataMatcher = (new MetadataMatcher())
48+
$metadataMatcher = $metadataMatcher
4449
->withMetadataMatch($this->partitionHeader, Operator::EQUALS, $partitionKey);
4550
}
4651

52+
// Filter by event names if specified (optimization for partitioned projections)
53+
if ($this->eventNames !== []) {
54+
$metadataMatcher = $metadataMatcher
55+
->withMetadataMatch('event_name', Operator::IN, $this->eventNames, FieldType::MESSAGE_PROPERTY);
56+
}
57+
4758
// Load all events from this stream (starting from position 1)
4859
$events = $this->eventStore->load($stream, 1, null, $metadataMatcher);
49-
$allEvents = array_merge($allEvents, is_array($events) ? $events : iterator_to_array($events));
60+
$allEvents = array_merge($allEvents, \is_array($events) ? $events : iterator_to_array($events));
5061
}
5162

5263
// Slice based on global position
5364
$events = array_slice($allEvents, $from, $count);
54-
$to = $from + count($events);
65+
$to = $from + \count($events);
5566

5667
return new StreamPage($events, (string) $to);
5768
}

packages/Ecotone/src/Projecting/InMemory/InMemoryEventStoreStreamSourceBuilder.php

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,21 @@
1616

1717
class InMemoryEventStoreStreamSourceBuilder implements ProjectionComponentBuilder
1818
{
19+
/**
20+
* @param array<string> $eventNames Event names to filter by, empty array means no filtering
21+
*/
1922
public function __construct(
2023
private ?array $projectionNames = null,
2124
private ?string $streamName = null,
22-
private ?string $partitionHeader = null
25+
private ?string $partitionHeader = null,
26+
private array $eventNames = [],
2327
) {
2428
}
2529

2630
public function canHandle(string $projectionName, string $component): bool
2731
{
2832
return $component === StreamSource::class
29-
&& ($this->projectionNames === null || in_array($projectionName, $this->projectionNames, true));
33+
&& ($this->projectionNames === null || \in_array($projectionName, $this->projectionNames, true));
3034
}
3135

3236
public function compile(MessagingContainerBuilder $builder): Definition|Reference
@@ -37,6 +41,7 @@ public function compile(MessagingContainerBuilder $builder): Definition|Referenc
3741
Reference::to(InMemoryEventStore::class),
3842
$this->streamName,
3943
$this->partitionHeader,
44+
$this->eventNames,
4045
]
4146
);
4247
}

packages/Enqueue/src/CachedConnectionFactory.php

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,16 @@ public function reconnect(): void
5151
$this->cachedContext = [];
5252
}
5353

54+
/**
55+
* Clear all cached connection factory instances.
56+
* This is useful in tests to ensure clean state between test runs,
57+
* especially when switching between different channel modes (confirm vs tx).
58+
*/
59+
public static function clearInstances(): void
60+
{
61+
self::$instances = [];
62+
}
63+
5464
public function getConsumer(Destination $destination): Consumer
5565
{
5666
return $this->createContext()->createConsumer($destination);

0 commit comments

Comments
 (0)