2020use Ecotone \EventSourcing \Projecting \PartitionState \DbalProjectionStateStorageBuilder ;
2121use Ecotone \EventSourcing \Projecting \StreamSource \EventStoreAggregateStreamSourceBuilder ;
2222use Ecotone \EventSourcing \Projecting \StreamSource \EventStoreGlobalStreamSourceBuilder ;
23+ use Ecotone \EventSourcing \Projecting \StreamSource \EventStoreMultiStreamSourceBuilder ;
2324use Ecotone \Messaging \Attribute \ModuleAnnotation ;
2425use Ecotone \Messaging \Config \Annotation \AnnotationModule ;
2526use Ecotone \Messaging \Config \Annotation \ModuleConfiguration \ExtensionObjectResolver ;
4041use Ecotone \Projecting \EventStoreAdapter \EventStreamingChannelAdapter ;
4142
4243#[ModuleAnnotation]
43- /**
44- * @phpstan-type ProjectionConfiguration array{projectionName: string, streamName: string, aggregateType: ?string, isPartitioned: bool, eventNames: array}
45- */
4644class ProophProjectingModule implements AnnotationModule
4745{
4846 /**
@@ -57,8 +55,6 @@ public function __construct(
5755
5856 public static function create (AnnotationFinder $ annotationRegistrationService , InterfaceToCallRegistry $ interfaceToCallRegistry ): static
5957 {
60- $ extensions = [];
61-
6258 $ namedEvents = [];
6359 foreach ($ annotationRegistrationService ->findAnnotatedClasses (NamedEvent::class) as $ className ) {
6460 $ attribute = $ annotationRegistrationService ->getAttributeForClass ($ className , NamedEvent::class);
@@ -67,14 +63,7 @@ public static function create(AnnotationFinder $annotationRegistrationService, I
6763
6864 $ projectionEventNames = self ::collectProjectionEventNames ($ annotationRegistrationService , $ interfaceToCallRegistry , $ namedEvents );
6965
70- $ resolvedConfigs = [
71- ...self ::resolveFromStreamConfigs ($ annotationRegistrationService , $ projectionEventNames ),
72- ...self ::resolveFromAggregateStream ($ annotationRegistrationService , $ projectionEventNames ),
73- ];
74-
75- foreach ($ resolvedConfigs as $ config ) {
76- $ extensions = [...$ extensions , ...self ::createStreamSourceExtensions ($ config )];
77- }
66+ $ extensions = self ::resolveConfigs ($ annotationRegistrationService , $ projectionEventNames );
7867
7968 $ projectionNames = [];
8069 foreach ($ annotationRegistrationService ->findAnnotatedClasses (ProjectionV2::class) as $ projectionClassName ) {
@@ -91,20 +80,23 @@ public static function create(AnnotationFinder $annotationRegistrationService, I
9180 /**
9281 * Resolve stream configurations from FromStream attributes.
9382 *
94- * @return list<ProjectionConfiguration >
83+ * @return list<ProjectionComponentBuilder >
9584 */
96- private static function resolveFromStreamConfigs (
85+ private static function resolveConfigs (
9786 AnnotationFinder $ annotationRegistrationService ,
98- array $ projectionEventNames
99- ): array {
100- $ configs = [];
87+ array $ projectionEventNames
88+ ): array
89+ {
90+ $ extensions = [];
10191
10292 foreach ($ annotationRegistrationService ->findAnnotatedClasses (ProjectionV2::class) as $ classname ) {
10393 $ projectionAttribute = $ annotationRegistrationService ->getAttributeForClass ($ classname , ProjectionV2::class);
104- $ aggregateStreamAttributes = $ annotationRegistrationService ->getAnnotationsForClass ($ classname , FromAggregateStream::class);
10594 $ streamAttributes = [
10695 ...$ annotationRegistrationService ->getAnnotationsForClass ($ classname , FromStream::class),
107- ...array_map (fn (FromAggregateStream $ attribute ) => self ::resolveFromAggregateStream ($ annotationRegistrationService , $ attribute , $ projectionAttribute ->name ), $ aggregateStreamAttributes )
96+ ...\array_map (
97+ fn (FromAggregateStream $ aggregateStreamAttribute ) => self ::resolveFromAggregateStream ($ annotationRegistrationService , $ aggregateStreamAttribute , $ projectionAttribute ->name ),
98+ $ annotationRegistrationService ->getAnnotationsForClass ($ classname , FromAggregateStream::class)
99+ )
108100 ];
109101 $ partitionedAttribute = $ annotationRegistrationService ->findAttributeForClass ($ classname , Partitioned::class);
110102
@@ -115,20 +107,41 @@ private static function resolveFromStreamConfigs(
115107 $ projectionName = $ projectionAttribute ->name ;
116108 $ isPartitioned = $ partitionedAttribute !== null ;
117109
118- if ($ isPartitioned && ! $ streamAttributes ->aggregateType ) {
119- throw ConfigurationException::create ("Aggregate type must be provided for projection {$ projectionName } as partition header name is provided " );
110+ $ sources = [];
111+ foreach ($ streamAttributes as $ streamAttribute ) {
112+ if ($ isPartitioned && ! $ streamAttribute ->aggregateType ) {
113+ throw ConfigurationException::create ("Aggregate type must be provided for projection {$ projectionName } as partition header name is provided " );
114+ }
115+ if ($ isPartitioned ) {
116+ $ sources [$ streamAttribute ->stream .'. ' .$ streamAttribute ->aggregateType ] = new EventStoreAggregateStreamSourceBuilder (
117+ $ projectionName ,
118+ $ streamAttribute ->aggregateType ,
119+ $ streamAttribute ->stream ,
120+ $ projectionEventNames [$ projectionName ] ?? [],
121+ );
122+ $ extensions [] = new AggregateIdPartitionProviderBuilder (
123+ $ projectionName ,
124+ $ streamAttribute ->aggregateType ,
125+ $ streamAttribute ->stream ,
126+ );
127+ } else {
128+ $ sources [$ streamAttribute ->stream ] = new EventStoreGlobalStreamSourceBuilder (
129+ $ streamAttribute ->stream ,
130+ [$ projectionName ]
131+ );
132+ }
133+ }
134+ if (count ($ sources ) > 1 ) {
135+ $ extensions [] = new EventStoreMultiStreamSourceBuilder (
136+ $ sources ,
137+ [$ projectionName ],
138+ );
139+ } else {
140+ $ extensions [] = current ($ sources );
120141 }
121-
122- $ configs [] = [
123- 'projectionName ' => $ projectionName ,
124- 'streamName ' => $ streamAttributes ->stream ,
125- 'aggregateType ' => $ streamAttributes ->aggregateType ,
126- 'isPartitioned ' => $ isPartitioned ,
127- 'eventNames ' => $ projectionEventNames [$ projectionName ] ?? [],
128- ];
129142 }
130143
131- return $ configs ;
144+ return $ extensions ;
132145 }
133146
134147 /**
@@ -155,38 +168,6 @@ private static function resolveFromAggregateStream(
155168 return new FromStream ($ streamName , $ aggregateType , $ aggregateStreamAttribute ->eventStoreReferenceName );
156169 }
157170
158- /**
159- * Create stream source extensions based on resolved configuration.
160- *
161- * @param ProjectionConfiguration $config
162- * @return ProjectionComponentBuilder[]
163- */
164- private static function createStreamSourceExtensions (array $ config ): array
165- {
166- if ($ config ['isPartitioned ' ]) {
167- return [
168- new EventStoreAggregateStreamSourceBuilder (
169- $ config ['projectionName ' ],
170- $ config ['aggregateType ' ],
171- $ config ['streamName ' ],
172- $ config ['eventNames ' ],
173- ),
174- new AggregateIdPartitionProviderBuilder (
175- $ config ['projectionName ' ],
176- $ config ['aggregateType ' ],
177- $ config ['streamName ' ]
178- ),
179- ];
180- }
181-
182- return [
183- new EventStoreGlobalStreamSourceBuilder (
184- $ config ['streamName ' ],
185- [$ config ['projectionName ' ]],
186- ),
187- ];
188- }
189-
190171 public function prepare (Configuration $ messagingConfiguration , array $ extensionObjects , ModuleReferenceSearchService $ moduleReferenceSearchService , InterfaceToCallRegistry $ interfaceToCallRegistry ): void
191172 {
192173 $ dbalConfiguration = ExtensionObjectResolver::resolveUnique (DbalConfiguration::class, $ extensionObjects , DbalConfiguration::createWithDefaults ());
0 commit comments