Skip to content

Commit 785a86c

Browse files
committed
feat: partitioned multiple stream support
1 parent fb35463 commit 785a86c

17 files changed

+519
-98
lines changed

packages/Ecotone/src/Projecting/BackfillExecutorHandler.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public function executeBackfillBatch(
4646
$streamFilter = new StreamFilter($streamName, $aggregateType, $eventStoreReferenceName);
4747

4848
foreach ($projectingManager->getPartitionProvider()->partitions($streamFilter, $limit, $offset) as $partition) {
49-
$projectingManager->execute($partition, true);
49+
$projectingManager->execute($partition, $streamName, true);
5050
if ($this->terminationListener->shouldTerminate()) {
5151
break;
5252
}

packages/Ecotone/src/Projecting/Config/ProjectingModule.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
100100
->chainInterceptedProcessor(
101101
MethodInvokerBuilder::create(
102102
$projectingManagerReference,
103-
InterfaceToCallReference::create(ProjectingManager::class, 'execute'),
103+
InterfaceToCallReference::create(ProjectingManager::class, 'executeAllStreams'),
104104
[
105105
$projectionBuilder->partitionHeader()
106106
? HeaderBuilder::create('partitionKeyValue', $projectionBuilder->partitionHeader())

packages/Ecotone/src/Projecting/InMemory/InMemoryEventStoreStreamSource.php

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,12 @@ public function canHandle(string $projectionName): bool
4141
return $this->projectionNames === null || in_array($projectionName, $this->projectionNames, true);
4242
}
4343

44-
public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage
44+
public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey, string $streamName): StreamPage
4545
{
46-
// Position is 0-based index into the global event array (like InMemoryStreamSource)
4746
$from = $lastPosition !== null ? (int) $lastPosition : 0;
4847

49-
// Determine which streams to read from
50-
$streams = $this->getStreamsToRead();
48+
$streams = $this->getStreamsToRead($streamName);
5149

52-
// Collect all events from all streams
5350
$allEvents = [];
5451
foreach ($streams as $stream) {
5552
if (! $this->eventStore->hasStream($stream)) {
@@ -62,36 +59,37 @@ public function load(string $projectionName, ?string $lastPosition, int $count,
6259
->withMetadataMatch($this->partitionHeader, Operator::EQUALS, $partitionKey);
6360
}
6461

65-
// Filter by event names if specified (optimization for partitioned projections)
6662
if ($this->eventNames !== []) {
6763
$metadataMatcher = $metadataMatcher
6864
->withMetadataMatch('event_name', Operator::IN, $this->eventNames, FieldType::MESSAGE_PROPERTY);
6965
}
7066

71-
// Load all events from this stream (starting from position 1)
7267
$events = $this->eventStore->load($stream, 1, null, $metadataMatcher);
7368
$allEvents = array_merge($allEvents, is_array($events) ? $events : iterator_to_array($events));
7469
}
7570

76-
// Slice based on global position
7771
$events = array_slice($allEvents, $from, $count);
7872
$to = $from + count($events);
7973

8074
return new StreamPage($events, (string) $to);
8175
}
8276

83-
private function getStreamsToRead(): array
77+
/**
78+
* @return array<string>
79+
*/
80+
private function getStreamsToRead(string $streamName): array
8481
{
82+
if ($streamName !== '') {
83+
return [$streamName];
84+
}
85+
8586
if ($this->streamName !== null) {
8687
return [$this->streamName];
8788
}
8889

89-
// Read from all streams (global stream)
9090
$reflection = new ReflectionProperty($this->eventStore, 'streams');
91-
$reflection->setAccessible(true);
9291
$allStreams = array_keys($reflection->getValue($this->eventStore));
9392

94-
// Filter out internal streams (starting with $)
9593
return array_filter($allStreams, fn ($stream) => ! str_starts_with($stream, '$'));
9694
}
9795
}

packages/Ecotone/src/Projecting/InMemory/InMemoryProjectionStateStorage.php

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,18 +35,18 @@ public function canHandle(string $projectionName): bool
3535
return $this->projectionNames === null || in_array($projectionName, $this->projectionNames, true);
3636
}
3737

38-
public function loadPartition(string $projectionName, ?string $partitionKey = null, bool $lock = true): ?ProjectionPartitionState
38+
public function loadPartition(string $projectionName, ?string $partitionKey, string $streamName, bool $lock = true): ?ProjectionPartitionState
3939
{
40-
$key = $this->getKey($projectionName, $partitionKey);
40+
$key = $this->getKey($projectionName, $partitionKey, $streamName);
4141
return $this->projectionStates[$key] ?? null;
4242
}
4343

44-
public function initPartition(string $projectionName, ?string $partitionKey = null): ?ProjectionPartitionState
44+
public function initPartition(string $projectionName, ?string $partitionKey, string $streamName): ?ProjectionPartitionState
4545
{
46-
$key = $this->getKey($projectionName, $partitionKey);
46+
$key = $this->getKey($projectionName, $partitionKey, $streamName);
4747

4848
if (! isset($this->projectionStates[$key])) {
49-
$this->projectionStates[$key] = new ProjectionPartitionState($projectionName, $partitionKey, null, null, ProjectionInitializationStatus::UNINITIALIZED);
49+
$this->projectionStates[$key] = new ProjectionPartitionState($projectionName, $partitionKey, $streamName, null, null, ProjectionInitializationStatus::UNINITIALIZED);
5050
return $this->projectionStates[$key];
5151
}
5252

@@ -55,21 +55,25 @@ public function initPartition(string $projectionName, ?string $partitionKey = nu
5555

5656
public function savePartition(ProjectionPartitionState $projectionState): void
5757
{
58-
$key = $this->getKey($projectionState->projectionName, $projectionState->partitionKey);
58+
$key = $this->getKey($projectionState->projectionName, $projectionState->partitionKey, $projectionState->streamName);
5959
$this->projectionStates[$key] = $projectionState;
6060
}
6161

62-
private function getKey(string $projectionName, ?string $partitionKey): string
62+
private function getKey(string $projectionName, ?string $partitionKey, string $streamName): string
6363
{
64-
if ($partitionKey === null) {
65-
return $projectionName;
64+
$key = $projectionName;
65+
if ($streamName !== '') {
66+
$key .= '::' . $streamName;
6667
}
67-
return $projectionName . '-' . $partitionKey;
68+
if ($partitionKey !== null) {
69+
$key .= '-' . $partitionKey;
70+
}
71+
return $key;
6872
}
6973

7074
public function delete(string $projectionName): void
7175
{
72-
$projectionStartKey = $this->getKey($projectionName, null);
76+
$projectionStartKey = $projectionName;
7377
foreach ($this->projectionStates as $key => $value) {
7478
if (str_starts_with($key, $projectionStartKey)) {
7579
unset($this->projectionStates[$key]);

packages/Ecotone/src/Projecting/InMemory/InMemoryStreamSource.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public function append(Event ...$events): void
3939
}
4040
}
4141

42-
public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage
42+
public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey, string $streamName): StreamPage
4343
{
4444
$from = $lastPosition !== null ? (int) $lastPosition : 0;
4545

packages/Ecotone/src/Projecting/ProjectingManager.php

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -48,28 +48,36 @@ private function getProjectionStateStorage(): ProjectionStateStorage
4848
return $this->projectionStateStorage;
4949
}
5050

51-
public function execute(?string $partitionKeyValue = null, bool $manualInitialization = false): void
51+
public function execute(?string $partitionKeyValue, string $streamName, bool $manualInitialization = false): void
5252
{
5353
do {
54-
$processedEvents = $this->executeSingleBatch($partitionKeyValue, $manualInitialization || $this->automaticInitialization);
54+
$processedEvents = $this->executeSingleBatch($partitionKeyValue, $streamName, $manualInitialization || $this->automaticInitialization);
5555
} while ($processedEvents > 0 && $this->terminationListener->shouldTerminate() !== true);
5656
}
5757

58+
public function executeAllStreams(?string $partitionKeyValue = null, bool $manualInitialization = false): void
59+
{
60+
$streamFilters = $this->streamFilterRegistry->provide($this->projectionName);
61+
foreach ($streamFilters as $streamFilter) {
62+
$this->execute($partitionKeyValue, $streamFilter->streamName, $manualInitialization);
63+
}
64+
}
65+
5866
/**
5967
* @return int Number of processed events
6068
*/
61-
private function executeSingleBatch(?string $partitionKeyValue, bool $canInitialize): int
69+
private function executeSingleBatch(?string $partitionKeyValue, string $streamName, bool $canInitialize): int
6270
{
6371
$transaction = $this->getProjectionStateStorage()->beginTransaction();
6472
try {
65-
$projectionState = $this->loadOrInitializePartitionState($partitionKeyValue, $canInitialize);
73+
$projectionState = $this->loadOrInitializePartitionState($partitionKeyValue, $streamName, $canInitialize);
6674
if ($projectionState === null) {
6775
$transaction->commit();
6876
return 0;
6977
}
7078

7179
$streamSource = $this->streamSourceRegistry->getFor($this->projectionName);
72-
$streamPage = $streamSource->load($this->projectionName, $projectionState->lastPosition, $this->eventLoadingBatchSize, $partitionKeyValue);
80+
$streamPage = $streamSource->load($this->projectionName, $projectionState->lastPosition, $this->eventLoadingBatchSize, $partitionKeyValue, $streamName);
7381

7482
$userState = $projectionState->userState;
7583
$processedEvents = 0;
@@ -86,7 +94,6 @@ private function executeSingleBatch(?string $partitionKeyValue, bool $canInitial
8694
->withUserState($userState);
8795

8896
if ($processedEvents === 0 && $canInitialize) {
89-
// If we are forcing execution and there are no new events, we still want to enable the projection if it was uninitialized
9097
$projectionState = $projectionState->withStatus(ProjectionInitializationStatus::INITIALIZED);
9198
}
9299

@@ -99,9 +106,9 @@ private function executeSingleBatch(?string $partitionKeyValue, bool $canInitial
99106
}
100107
}
101108

102-
public function loadState(?string $partitionKey = null): ProjectionPartitionState
109+
public function loadState(?string $partitionKey, string $streamName): ProjectionPartitionState
103110
{
104-
return $this->getProjectionStateStorage()->loadPartition($this->projectionName, $partitionKey);
111+
return $this->getProjectionStateStorage()->loadPartition($this->projectionName, $partitionKey, $streamName);
105112
}
106113

107114
public function getPartitionProvider(): PartitionProvider
@@ -193,10 +200,10 @@ public function backfill(): void
193200
$this->prepareBackfill();
194201
}
195202

196-
private function loadOrInitializePartitionState(?string $partitionKey, bool $canInitialize): ?ProjectionPartitionState
203+
private function loadOrInitializePartitionState(?string $partitionKey, string $streamName, bool $canInitialize): ?ProjectionPartitionState
197204
{
198205
$storage = $this->getProjectionStateStorage();
199-
$projectionState = $storage->loadPartition($this->projectionName, $partitionKey);
206+
$projectionState = $storage->loadPartition($this->projectionName, $partitionKey, $streamName);
200207

201208
if (! $canInitialize && $projectionState?->status === ProjectionInitializationStatus::UNINITIALIZED) {
202209
return null;
@@ -206,11 +213,11 @@ private function loadOrInitializePartitionState(?string $partitionKey, bool $can
206213
}
207214

208215
if ($canInitialize) {
209-
$projectionState = $storage->initPartition($this->projectionName, $partitionKey);
216+
$projectionState = $storage->initPartition($this->projectionName, $partitionKey, $streamName);
210217
if ($projectionState) {
211218
$this->projectorExecutor->init();
212219
} else {
213-
$projectionState = $storage->loadPartition($this->projectionName, $partitionKey);
220+
$projectionState = $storage->loadPartition($this->projectionName, $partitionKey, $streamName);
214221
}
215222
return $projectionState;
216223
}

packages/Ecotone/src/Projecting/ProjectionPartitionState.php

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ class ProjectionPartitionState
1212
public function __construct(
1313
public readonly string $projectionName,
1414
public readonly ?string $partitionKey,
15+
public readonly string $streamName,
1516
public readonly ?string $lastPosition = null,
1617
public readonly mixed $userState = null,
1718
public readonly ?ProjectionInitializationStatus $status = null,
@@ -20,16 +21,16 @@ public function __construct(
2021

2122
public function withLastPosition(string $lastPosition): self
2223
{
23-
return new self($this->projectionName, $this->partitionKey, $lastPosition, $this->userState, $this->status);
24+
return new self($this->projectionName, $this->partitionKey, $this->streamName, $lastPosition, $this->userState, $this->status);
2425
}
2526

2627
public function withUserState(mixed $userState): self
2728
{
28-
return new self($this->projectionName, $this->partitionKey, $this->lastPosition, $userState, $this->status);
29+
return new self($this->projectionName, $this->partitionKey, $this->streamName, $this->lastPosition, $userState, $this->status);
2930
}
3031

3132
public function withStatus(ProjectionInitializationStatus $status): self
3233
{
33-
return new self($this->projectionName, $this->partitionKey, $this->lastPosition, $this->userState, $status);
34+
return new self($this->projectionName, $this->partitionKey, $this->streamName, $this->lastPosition, $this->userState, $status);
3435
}
3536
}

packages/Ecotone/src/Projecting/ProjectionStateStorage.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010
interface ProjectionStateStorage
1111
{
1212
public function canHandle(string $projectionName): bool;
13-
public function loadPartition(string $projectionName, ?string $partitionKey = null, bool $lock = true): ?ProjectionPartitionState;
14-
public function initPartition(string $projectionName, ?string $partitionKey = null): ?ProjectionPartitionState;
13+
public function loadPartition(string $projectionName, ?string $partitionKey, string $streamName, bool $lock = true): ?ProjectionPartitionState;
14+
public function initPartition(string $projectionName, ?string $partitionKey, string $streamName): ?ProjectionPartitionState;
1515
public function savePartition(ProjectionPartitionState $projectionState): void;
1616
public function delete(string $projectionName): void;
1717
public function init(string $projectionName): void;

packages/Ecotone/src/Projecting/StreamSource.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,5 @@ interface StreamSource
1111
{
1212
public function canHandle(string $projectionName): bool;
1313

14-
public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage;
14+
public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey, string $streamName): StreamPage;
1515
}

packages/Ecotone/tests/Lite/InMemoryEventStoreRegistrationTest.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public function canHandle(string $projectionName): bool
110110
return true;
111111
}
112112

113-
public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage
113+
public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey, string $streamName): StreamPage
114114
{
115115
$from = $lastPosition !== null ? (int) $lastPosition : 0;
116116
$events = array_slice($this->events, $from, $count);

0 commit comments

Comments
 (0)