Projecting: handle multiple prooph's streams#592
Projecting: handle multiple prooph's streams#592dgafka merged 20 commits intoecotoneframework:mainfrom
Conversation
|
|
||
| $events = array_map(fn(array $tuple) => $tuple[1], $all); | ||
|
|
||
| return new StreamPage($events, $this->encodePositions($newPositions)); |
There was a problem hiding this comment.
For single stream we should also encode positions same way, so if someone would add one more FromStream to existing projection, it would still works.
There was a problem hiding this comment.
Right, I will do that in another PR, so I can add some migration code with the existing encoding
| if ($partitionHeaderName !== null) { | ||
| if (count($streamAttributes) > 1) { | ||
| throw ConfigurationException::create("Projection {$projectionName} cannot be partitioned by aggregate id when multiple streams are configured"); | ||
| } |
There was a problem hiding this comment.
If there is no ordering guarnateed between different event logs, then what is the blocker for partition based?
We would still guarnatee order within given partition which is inside specific stream
There was a problem hiding this comment.
There is no blocker, we can add it in a following pr
There was a problem hiding this comment.
The blocker is the partition provider: in case of multiple streams, we have to find a way to merge partitions from all streams. I may do it in a following PR.
There was a problem hiding this comment.
Please wait with doing the changes to Partition based, as I am doing the work to enable asynchronous batched rebuild, which will require changes to Partition Provider
| $extensions[] = new EventStoreMultiStreamSourceBuilder( | ||
| $map, | ||
| [$projectionName], | ||
| ); |
There was a problem hiding this comment.
to simplify we could have EventStoreMultiStreamSourceBuilder::create([$streamTrackerBuilders]), and if there is only single one passed, we simply return it (no need for if else then)
packages/PdoEventSourcing/src/Projecting/PersistedProophEvent.php
Outdated
Show resolved
Hide resolved
packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamProjectionTest.php
Show resolved
Hide resolved
packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamProjectionTest.php
Show resolved
Hide resolved
# Conflicts: # packages/PdoEventSourcing/src/Config/ProophProjectingModule.php # packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php
ff0ca76 to
77f384d
Compare
| * @return ($attributeClassName is null ? list<object> : list<T>) | ||
| */ | ||
| public function getAnnotationsForClass(string $className): array; | ||
| public function getAnnotationsForClass(string $className, ?string $attributeClassName = null): array; |
There was a problem hiding this comment.
I've added this parameter to allow finding repeated attributes on the same class.
| foreach ($this->sources as $stream => $source) { | ||
| $orderIndex[$stream] = $i++; | ||
|
|
||
| $limit = (int)ceil($count / max(1, count($this->sources))) + 5; |
There was a problem hiding this comment.
Where does this magic 5, comes from? :)
There was a problem hiding this comment.
From AI :)
This is the minimum events to fetch per stream, I will add a constant
| if ($a->timestamp === $b->timestamp) { | ||
| return $orderIndex[$aStream] <=> $orderIndex[$bStream]; | ||
| } | ||
| return $a->timestamp <=> $b->timestamp; |
There was a problem hiding this comment.
I wonder if we should even bother with ordering that, if that's not really true order - it will all depends on the positions in the stream. E.g. in one load batch, we may sort event from 2024 and 2026.
For non-rebuild scenarios, it may create feeling for end users that it's indeed somehow sorted, but it's really not and rebuild of the projection will point all assumptions like this out.
In Kafka joining different streams, there is no order guarantee, not even sorting like this being done. You want to join streams, then design the system in a way, it does not depend on the order between those (basically treat them like a separate partitions)
But not a blocker, if you feel ordering make sense
There was a problem hiding this comment.
This is for prooph compatibility.
Why is this change proposed?
Handle projecting from multiple prooph's stream
Description of Changes
created_atcolumnThe requested batch size is handled on a best-effort basis: it may be smaller or larger depending on how events are distributed across streams.
Pull Request Contribution Terms