Skip to content

Commit 8330ffa

Browse files
committed
feat: partitioned multi stream projections
1 parent fb35463 commit 8330ffa

16 files changed

+969
-68
lines changed

.claude/settings.local.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{
2+
"permissions": {
3+
"allow": [
4+
"Bash(grep:*)",
5+
"Bash(./vendor/bin/phpunit:*)",
6+
"Bash(docker exec:*)"
7+
]
8+
}
9+
}

packages/Ecotone/src/Projecting/BackfillExecutorHandler.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public function executeBackfillBatch(
4646
$streamFilter = new StreamFilter($streamName, $aggregateType, $eventStoreReferenceName);
4747

4848
foreach ($projectingManager->getPartitionProvider()->partitions($streamFilter, $limit, $offset) as $partition) {
49-
$projectingManager->execute($partition, true);
49+
$projectingManager->execute($partition, true, $streamName);
5050
if ($this->terminationListener->shouldTerminate()) {
5151
break;
5252
}

packages/Ecotone/src/Projecting/Config/ProjectingModule.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
106106
? HeaderBuilder::create('partitionKeyValue', $projectionBuilder->partitionHeader())
107107
: ValueBuilder::create('partitionKeyValue', null),
108108
HeaderBuilder::createOptional('manualInitialization', ProjectingHeaders::MANUAL_INITIALIZATION),
109+
ValueBuilder::create('streamName', null),
109110
],
110111
)
111112
)

packages/Ecotone/src/Projecting/InMemory/InMemoryProjectionStateStorage.php

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,18 +35,18 @@ public function canHandle(string $projectionName): bool
3535
return $this->projectionNames === null || in_array($projectionName, $this->projectionNames, true);
3636
}
3737

38-
public function loadPartition(string $projectionName, ?string $partitionKey = null, bool $lock = true): ?ProjectionPartitionState
38+
public function loadPartition(string $projectionName, ?string $partitionKey = null, ?string $streamName = null, bool $lock = true): ?ProjectionPartitionState
3939
{
40-
$key = $this->getKey($projectionName, $partitionKey);
40+
$key = $this->getKey($projectionName, $partitionKey, $streamName);
4141
return $this->projectionStates[$key] ?? null;
4242
}
4343

44-
public function initPartition(string $projectionName, ?string $partitionKey = null): ?ProjectionPartitionState
44+
public function initPartition(string $projectionName, ?string $partitionKey = null, ?string $streamName = null): ?ProjectionPartitionState
4545
{
46-
$key = $this->getKey($projectionName, $partitionKey);
46+
$key = $this->getKey($projectionName, $partitionKey, $streamName);
4747

4848
if (! isset($this->projectionStates[$key])) {
49-
$this->projectionStates[$key] = new ProjectionPartitionState($projectionName, $partitionKey, null, null, ProjectionInitializationStatus::UNINITIALIZED);
49+
$this->projectionStates[$key] = new ProjectionPartitionState($projectionName, $partitionKey, $streamName, null, null, ProjectionInitializationStatus::UNINITIALIZED);
5050
return $this->projectionStates[$key];
5151
}
5252

@@ -55,16 +55,19 @@ public function initPartition(string $projectionName, ?string $partitionKey = nu
5555

5656
public function savePartition(ProjectionPartitionState $projectionState): void
5757
{
58-
$key = $this->getKey($projectionState->projectionName, $projectionState->partitionKey);
58+
$key = $this->getKey($projectionState->projectionName, $projectionState->partitionKey, $projectionState->streamName);
5959
$this->projectionStates[$key] = $projectionState;
6060
}
6161

62-
private function getKey(string $projectionName, ?string $partitionKey): string
62+
private function getKey(string $projectionName, ?string $partitionKey, ?string $streamName = null): string
6363
{
64-
if ($partitionKey === null) {
64+
if ($partitionKey === null && $streamName === null) {
6565
return $projectionName;
6666
}
67-
return $projectionName . '-' . $partitionKey;
67+
if ($streamName === null) {
68+
return $projectionName . '-' . $partitionKey;
69+
}
70+
return $projectionName . '-' . ($partitionKey ?? '') . '-' . $streamName;
6871
}
6972

7073
public function delete(string $projectionName): void

packages/Ecotone/src/Projecting/ProjectingManager.php

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -48,28 +48,45 @@ private function getProjectionStateStorage(): ProjectionStateStorage
4848
return $this->projectionStateStorage;
4949
}
5050

51-
public function execute(?string $partitionKeyValue = null, bool $manualInitialization = false): void
51+
public function execute(?string $partitionKeyValue = null, bool $manualInitialization = false, ?string $streamName = null): void
52+
{
53+
$streamFilters = $this->streamFilterRegistry->provide($this->projectionName);
54+
55+
if ($streamName !== null) {
56+
$this->executeForStream($partitionKeyValue, $streamName, $manualInitialization || $this->automaticInitialization);
57+
return;
58+
}
59+
60+
foreach ($streamFilters as $streamFilter) {
61+
$this->executeForStream($partitionKeyValue, $streamFilter->streamName, $manualInitialization || $this->automaticInitialization);
62+
if ($this->terminationListener->shouldTerminate()) {
63+
break;
64+
}
65+
}
66+
}
67+
68+
private function executeForStream(?string $partitionKeyValue, string $streamName, bool $canInitialize): void
5269
{
5370
do {
54-
$processedEvents = $this->executeSingleBatch($partitionKeyValue, $manualInitialization || $this->automaticInitialization);
71+
$processedEvents = $this->executeSingleBatch($partitionKeyValue, $streamName, $canInitialize);
5572
} while ($processedEvents > 0 && $this->terminationListener->shouldTerminate() !== true);
5673
}
5774

5875
/**
5976
* @return int Number of processed events
6077
*/
61-
private function executeSingleBatch(?string $partitionKeyValue, bool $canInitialize): int
78+
private function executeSingleBatch(?string $partitionKeyValue, string $streamName, bool $canInitialize): int
6279
{
6380
$transaction = $this->getProjectionStateStorage()->beginTransaction();
6481
try {
65-
$projectionState = $this->loadOrInitializePartitionState($partitionKeyValue, $canInitialize);
82+
$projectionState = $this->loadOrInitializePartitionState($partitionKeyValue, $streamName, $canInitialize);
6683
if ($projectionState === null) {
6784
$transaction->commit();
6885
return 0;
6986
}
7087

7188
$streamSource = $this->streamSourceRegistry->getFor($this->projectionName);
72-
$streamPage = $streamSource->load($this->projectionName, $projectionState->lastPosition, $this->eventLoadingBatchSize, $partitionKeyValue);
89+
$streamPage = $streamSource->load($this->projectionName, $projectionState->lastPosition, $this->eventLoadingBatchSize, $partitionKeyValue, $streamName);
7390

7491
$userState = $projectionState->userState;
7592
$processedEvents = 0;
@@ -99,9 +116,9 @@ private function executeSingleBatch(?string $partitionKeyValue, bool $canInitial
99116
}
100117
}
101118

102-
public function loadState(?string $partitionKey = null): ProjectionPartitionState
119+
public function loadState(?string $partitionKey = null, ?string $streamName = null): ProjectionPartitionState
103120
{
104-
return $this->getProjectionStateStorage()->loadPartition($this->projectionName, $partitionKey);
121+
return $this->getProjectionStateStorage()->loadPartition($this->projectionName, $partitionKey, $streamName);
105122
}
106123

107124
public function getPartitionProvider(): PartitionProvider
@@ -193,10 +210,10 @@ public function backfill(): void
193210
$this->prepareBackfill();
194211
}
195212

196-
private function loadOrInitializePartitionState(?string $partitionKey, bool $canInitialize): ?ProjectionPartitionState
213+
private function loadOrInitializePartitionState(?string $partitionKey, string $streamName, bool $canInitialize): ?ProjectionPartitionState
197214
{
198215
$storage = $this->getProjectionStateStorage();
199-
$projectionState = $storage->loadPartition($this->projectionName, $partitionKey);
216+
$projectionState = $storage->loadPartition($this->projectionName, $partitionKey, $streamName);
200217

201218
if (! $canInitialize && $projectionState?->status === ProjectionInitializationStatus::UNINITIALIZED) {
202219
return null;
@@ -206,11 +223,11 @@ private function loadOrInitializePartitionState(?string $partitionKey, bool $can
206223
}
207224

208225
if ($canInitialize) {
209-
$projectionState = $storage->initPartition($this->projectionName, $partitionKey);
226+
$projectionState = $storage->initPartition($this->projectionName, $partitionKey, $streamName);
210227
if ($projectionState) {
211228
$this->projectorExecutor->init();
212229
} else {
213-
$projectionState = $storage->loadPartition($this->projectionName, $partitionKey);
230+
$projectionState = $storage->loadPartition($this->projectionName, $partitionKey, $streamName);
214231
}
215232
return $projectionState;
216233
}

packages/Ecotone/src/Projecting/ProjectionPartitionState.php

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ class ProjectionPartitionState
1212
public function __construct(
1313
public readonly string $projectionName,
1414
public readonly ?string $partitionKey,
15+
public readonly ?string $streamName = null,
1516
public readonly ?string $lastPosition = null,
1617
public readonly mixed $userState = null,
1718
public readonly ?ProjectionInitializationStatus $status = null,
@@ -20,16 +21,16 @@ public function __construct(
2021

2122
public function withLastPosition(string $lastPosition): self
2223
{
23-
return new self($this->projectionName, $this->partitionKey, $lastPosition, $this->userState, $this->status);
24+
return new self($this->projectionName, $this->partitionKey, $this->streamName, $lastPosition, $this->userState, $this->status);
2425
}
2526

2627
public function withUserState(mixed $userState): self
2728
{
28-
return new self($this->projectionName, $this->partitionKey, $this->lastPosition, $userState, $this->status);
29+
return new self($this->projectionName, $this->partitionKey, $this->streamName, $this->lastPosition, $userState, $this->status);
2930
}
3031

3132
public function withStatus(ProjectionInitializationStatus $status): self
3233
{
33-
return new self($this->projectionName, $this->partitionKey, $this->lastPosition, $this->userState, $status);
34+
return new self($this->projectionName, $this->partitionKey, $this->streamName, $this->lastPosition, $this->userState, $status);
3435
}
3536
}

packages/Ecotone/src/Projecting/ProjectionStateStorage.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010
interface ProjectionStateStorage
1111
{
1212
public function canHandle(string $projectionName): bool;
13-
public function loadPartition(string $projectionName, ?string $partitionKey = null, bool $lock = true): ?ProjectionPartitionState;
14-
public function initPartition(string $projectionName, ?string $partitionKey = null): ?ProjectionPartitionState;
13+
public function loadPartition(string $projectionName, ?string $partitionKey = null, ?string $streamName = null, bool $lock = true): ?ProjectionPartitionState;
14+
public function initPartition(string $projectionName, ?string $partitionKey = null, ?string $streamName = null): ?ProjectionPartitionState;
1515
public function savePartition(ProjectionPartitionState $projectionState): void;
1616
public function delete(string $projectionName): void;
1717
public function init(string $projectionName): void;

packages/Ecotone/src/Projecting/StreamSource.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,5 @@ interface StreamSource
1111
{
1212
public function canHandle(string $projectionName): bool;
1313

14-
public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage;
14+
public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null, ?string $streamName = null): StreamPage;
1515
}

packages/PdoEventSourcing/src/Config/ProophProjectingModule.php

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -107,12 +107,6 @@ private static function resolveProjectionTypes(
107107
$partitionedAttribute = $annotationRegistrationService->findAttributeForClass($projectionClass, Partitioned::class);
108108
$isPartitioned = $partitionedAttribute !== null;
109109

110-
if ($isPartitioned && count($streamFilters) > 1) {
111-
throw ConfigurationException::create(
112-
"Partitioned projection {$projectionName} cannot declare multiple streams. Use a single aggregate stream or remove #[Partitioned]."
113-
);
114-
}
115-
116110
foreach ($streamFilters as $streamFilter) {
117111
if ($isPartitioned && ! $streamFilter->aggregateType) {
118112
throw ConfigurationException::create("Aggregate type must be provided for projection {$projectionName} as partition header name is provided");

packages/PdoEventSourcing/src/Database/ProjectionStateTableManager.php

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,11 @@ private function getPostgresCreateSql(): string
100100
CREATE TABLE IF NOT EXISTS {$this->tableName} (
101101
projection_name VARCHAR(255) NOT NULL,
102102
partition_key VARCHAR(255) NOT NULL DEFAULT '',
103+
stream_name VARCHAR(255) NOT NULL DEFAULT '',
103104
last_position TEXT NOT NULL,
104105
metadata JSON NOT NULL,
105106
user_state JSON,
106-
PRIMARY KEY (projection_name, partition_key)
107+
PRIMARY KEY (projection_name, partition_key, stream_name)
107108
)
108109
SQL;
109110
}
@@ -114,10 +115,11 @@ private function getMysqlCreateSql(): string
114115
CREATE TABLE IF NOT EXISTS `{$this->tableName}` (
115116
`projection_name` VARCHAR(255) NOT NULL,
116117
`partition_key` VARCHAR(255) NOT NULL DEFAULT '',
118+
`stream_name` VARCHAR(255) NOT NULL DEFAULT '',
117119
`last_position` TEXT NOT NULL,
118120
`metadata` JSON NOT NULL,
119121
`user_state` JSON,
120-
PRIMARY KEY (`projection_name`, `partition_key`)
122+
PRIMARY KEY (`projection_name`, `partition_key`, `stream_name`)
121123
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
122124
SQL;
123125
}

0 commit comments

Comments
 (0)