Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,21 @@ public static function createBoolHeaderFilter(string $headerName, ?bool $default
);
}

/**
* Creates a filter that returns the NEGATED value of a boolean header.
* When header is true, the message PASSES through.
* When header is false, the message is DISCARDED.
*
* @param bool|null $defaultResultWhenHeaderIsMissing When no presented exception will be thrown on missing header
*/
public static function createNotBoolHeaderFilter(string $headerName, ?bool $defaultResultWhenHeaderIsMissing = null): self
{
return new self(
new NotBoolHeaderBasedFilter($headerName, $defaultResultWhenHeaderIsMissing),
InterfaceToCallReference::create(NotBoolHeaderBasedFilter::class, 'filter')
);
}

/**
* @inheritDoc
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php

declare(strict_types=1);

namespace Ecotone\Messaging\Handler\Filter;

use Ecotone\Messaging\Config\Container\DefinedObject;
use Ecotone\Messaging\Config\Container\Definition;
use Ecotone\Messaging\Message;

/**
* Filter that returns the NEGATED value of a boolean header.
* When header is true, filter returns false (pass through).
* When header is false, filter returns true (discard).
*
* licence Apache-2.0
*/
final class NotBoolHeaderBasedFilter implements DefinedObject
{
public function __construct(private string $headerName, private ?bool $defaultResultWhenHeaderIsMissing)
{
}

public function filter(Message $message): bool
{
if (! is_null($this->defaultResultWhenHeaderIsMissing) && ! $message->getHeaders()->containsKey($this->headerName)) {
return $this->defaultResultWhenHeaderIsMissing;
}

return ! (bool)$message->getHeaders()->get($this->headerName);
}

public function getDefinition(): Definition
{
return new Definition(self::class, [
$this->headerName,
$this->defaultResultWhenHeaderIsMissing,
]);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class MessageHeadersPropagatorInterceptor
private array $currentlyPropagatedHeaders = [];
private bool $isPollingConsumer = false;

public function storeHeaders(MethodInvocation $methodInvocation, Message $message, ?PropagateHeaders $propagateHeaders = null)
public function storeHeaders(MethodInvocation|\Closure $methodInvocation, Message $message, ?PropagateHeaders $propagateHeaders = null)
{
if ($propagateHeaders !== null && ! $propagateHeaders->doPropagation()) {
$userlandHeaders = [];
Expand All @@ -40,7 +40,11 @@ public function storeHeaders(MethodInvocation $methodInvocation, Message $messag

$this->currentlyPropagatedHeaders[] = $userlandHeaders;
try {
$reply = $methodInvocation->proceed();
if ($methodInvocation instanceof MethodInvocation) {
$reply = $methodInvocation->proceed();
} else {
$reply = $methodInvocation();
}
} finally {
array_pop($this->currentlyPropagatedHeaders);
}
Expand Down

This file was deleted.

37 changes: 37 additions & 0 deletions packages/Ecotone/src/Projecting/Attribute/ProjectionDeployment.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?php

/*
* licence Enterprise
*/
declare(strict_types=1);

namespace Ecotone\Projecting\Attribute;

use Attribute;

/**
* Configure projection deployment settings for blue/green deployment scenarios.
*/
#[Attribute(Attribute::TARGET_CLASS)]
class ProjectionDeployment
{
public function __construct(
/**
* When true, projection will not be automatically initialized.
* It will require manual trigger
*
* Default: false (automatic initialization)
*/
public readonly bool $manualKickOff = false,
/**
* When false, emitted events via EventStreamEmitter will not be published.
* Use this for blue/green deployment to rebuild projection without
* re-emitting events to downstream consumers.
*
* Default: true (events are emitted)
*/
public readonly bool $live = true,
) {
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use Ecotone\Modelling\Config\Routing\BusRouteSelector;
use Ecotone\Modelling\Config\Routing\BusRoutingKeyResolver;
use Ecotone\Modelling\Config\Routing\BusRoutingMapBuilder;
use Ecotone\Modelling\MessageHandling\MetadataPropagator\MessageHeadersPropagatorInterceptor;
use Ecotone\Projecting\EcotoneProjectorExecutor;
use Ecotone\Projecting\ProjectingHeaders;

Expand All @@ -33,6 +34,7 @@ public function __construct(
private string $projectionName,
private ?string $partitionHeader = null,
private bool $automaticInitialization = true,
private bool $isLive = true,
private array $namedEvents = [],
private ?string $initChannel = null,
private ?string $deleteChannel = null,
Expand Down Expand Up @@ -100,12 +102,14 @@ public function compile(MessagingContainerBuilder $builder): Definition|Referenc
{
$routerProcessor = $this->buildExecutionRouter($builder);
return new Definition(EcotoneProjectorExecutor::class, [
new Reference(MessagingEntrypoint::class), // Headers propagation is required for EventStreamEmitter
new Reference(MessagingEntrypoint::class),
new Reference(MessageHeadersPropagatorInterceptor::class),
$this->projectionName,
$routerProcessor,
$this->initChannel,
$this->deleteChannel,
$this->flushChannel,
$this->isLive,
]);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
use Ecotone\Projecting\Attribute\Partitioned;
use Ecotone\Projecting\Attribute\Polling;
use Ecotone\Projecting\Attribute\ProjectionBatchSize;
use Ecotone\Projecting\Attribute\ProjectionConfiguration;
use Ecotone\Projecting\Attribute\ProjectionDeployment;
use Ecotone\Projecting\Attribute\ProjectionFlush;
use Ecotone\Projecting\Attribute\ProjectionV2;
use Ecotone\Projecting\Attribute\Streaming;
Expand Down Expand Up @@ -80,13 +80,15 @@ public static function create(AnnotationFinder $annotationRegistrationService, I
$batchSizeAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, ProjectionBatchSize::class);
$pollingAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, Polling::class);
$streamingAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, Streaming::class);
$projectionConfiguration = $annotationRegistrationService->findAttributeForClass($projectionClassName, ProjectionConfiguration::class) ?? new ProjectionConfiguration();
$projectionDeployment = $annotationRegistrationService->findAttributeForClass($projectionClassName, ProjectionDeployment::class);
$partitionAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, Partitioned::class);

$partitionHeaderName = $partitionAttribute?->partitionHeaderName;
$automaticInitialization = $partitionAttribute ? true : $projectionConfiguration->automaticInitialization;
// Resolve automatic initialization: manualKickOff: true means automaticInitialization: false
$automaticInitialization = self::resolveAutomaticInitialization($partitionAttribute, $projectionDeployment);
$isLive = $projectionDeployment?->live ?? true;

$projectionBuilder = new EcotoneProjectionExecutorBuilder($projectionAttribute->name, $partitionHeaderName, $automaticInitialization, $namedEvents, batchSize: $batchSizeAttribute?->batchSize);
$projectionBuilder = new EcotoneProjectionExecutorBuilder($projectionAttribute->name, $partitionHeaderName, $automaticInitialization, $isLive, $namedEvents, batchSize: $batchSizeAttribute?->batchSize);

$asynchronousChannelName = self::getProjectionAsynchronousChannel($annotationRegistrationService, $projectionClassName);
$isPolling = $pollingAttribute !== null;
Expand Down Expand Up @@ -276,4 +278,22 @@ private static function verifyCorrectApiUsage(bool $isPolling, ?string $asynchro
);
}
}

private static function resolveAutomaticInitialization(
?Partitioned $partitionAttribute,
?ProjectionDeployment $projectionDeployment,
): bool {
// Partitioned projections always require automatic initialization
if ($partitionAttribute !== null) {
return true;
}

// ProjectionDeployment: manualKickOff: true means automaticInitialization: false
if ($projectionDeployment !== null) {
return ! $projectionDeployment->manualKickOff;
}

// Default: automatic initialization is enabled
return true;
}
}
21 changes: 14 additions & 7 deletions packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,20 @@
use Ecotone\Messaging\Support\MessageBuilder;
use Ecotone\Modelling\Event;

use Ecotone\Modelling\MessageHandling\MetadataPropagator\MessageHeadersPropagatorInterceptor;
use function is_null;

class EcotoneProjectorExecutor implements ProjectorExecutor
{
public function __construct(
private MessagingEntrypoint $messagingEntrypoint,
private MessageHeadersPropagatorInterceptor $messageHeadersPropagatorInterceptor,
private string $projectionName, // this is required for event stream emitter so it can create a stream with this name
private MessageProcessor $routerProcessor,
private ?string $initChannel = null,
private ?string $deleteChannel = null,
private ?string $flushChannel = null,
private bool $isLive = true,
) {
}

Expand All @@ -33,16 +36,20 @@ public function project(Event $event, mixed $userState = null): mixed
$metadata = $event->getMetadata();
$metadata[ProjectingHeaders::PROJECTION_STATE] = $userState ?? null;
$metadata[ProjectingHeaders::PROJECTION_EVENT_NAME] = $event->getEventName();

// Those three headers are required by EventStreamEmitter
$metadata[ProjectingHeaders::PROJECTION_NAME] = $this->projectionName;
$metadata[ProjectingHeaders::PROJECTION_IS_REBUILDING] = false;
$metadata[ProjectingHeaders::PROJECTION_LIVE] = $this->isLive;
$metadata[MessageHeaders::STREAM_BASED_SOURCED] = true; // this one is required for correct header propagation in EventStreamEmitter...
$metadata[MessageHeaders::REPLY_CHANNEL] = $responseQueue = new QueueChannel('response_channel');
$this->routerProcessor->process(
MessageBuilder::withPayload($event->getPayload())
->setMultipleHeaders($metadata)
->build()

$requestMessage = MessageBuilder::withPayload($event->getPayload())
->setMultipleHeaders($metadata)
->build();

$this->messageHeadersPropagatorInterceptor->storeHeaders(
function () use ($requestMessage) {
$this->routerProcessor->process($requestMessage);
},
$requestMessage
);
$response = $responseQueue->receive();
$newUserState = $response?->getPayload();
Expand Down
6 changes: 5 additions & 1 deletion packages/Ecotone/src/Projecting/ProjectingHeaders.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ class ProjectingHeaders
public const PROJECTION_STATE = 'projection.state';
public const PROJECTION_NAME = 'projection.name';
public const PROJECTION_EVENT_NAME = 'projection.event_name';
public const PROJECTION_IS_REBUILDING = 'projection.is_rebuilding';
/**
* Indicates whether the projection is live and should emit events.
* When false, events emitted via EventStreamEmitter will be filtered out.
*/
public const PROJECTION_LIVE = 'projection.live';
public const MANUAL_INITIALIZATION = 'projection.manual_initialization';
}
12 changes: 6 additions & 6 deletions packages/Ecotone/tests/Projecting/ProjectingTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use Ecotone\Modelling\Attribute\EventHandler;
use Ecotone\Modelling\Event;
use Ecotone\Projecting\Attribute\Partitioned;
use Ecotone\Projecting\Attribute\ProjectionConfiguration;
use Ecotone\Projecting\Attribute\ProjectionDeployment;
use Ecotone\Projecting\Attribute\ProjectionFlush;
use Ecotone\Projecting\Attribute\ProjectionV2;
use Ecotone\Projecting\InMemory\InMemoryStreamSourceBuilder;
Expand Down Expand Up @@ -178,7 +178,7 @@ public function init(): void

public function test_it_skips_execution_when_automatic_initialization_is_off_and_not_initialized(): void
{
$projection = new #[ProjectionV2('projection_with_manual_initialization'), ProjectionConfiguration(automaticInitialization: false)] class {
$projection = new #[ProjectionV2('projection_with_manual_initialization'), ProjectionDeployment(manualKickOff: true)] class {
public const TICKET_CREATED = 'ticket.created';
public array $projectedEvents = [];

Expand Down Expand Up @@ -261,7 +261,7 @@ public function init(): void

public function test_auto_initialization_mode_processes_events(): void
{
$projection = new #[ProjectionV2('auto_projection'), ProjectionConfiguration(automaticInitialization: true)] class {
$projection = new #[ProjectionV2('auto_projection'), ProjectionDeployment(manualKickOff: false)] class {
public const TICKET_CREATED = 'ticket.created';
public array $projectedEvents = [];
public int $initCallCount = 0;
Expand Down Expand Up @@ -301,7 +301,7 @@ public function init(): void

public function test_skip_initialization_mode_skips_events_when_not_initialized(): void
{
$projection = new #[ProjectionV2('skip_projection'), ProjectionConfiguration(automaticInitialization: false)] class {
$projection = new #[ProjectionV2('skip_projection'), ProjectionDeployment(manualKickOff: true)] class {
public const TICKET_CREATED = 'ticket.created';
public array $projectedEvents = [];
public int $initCallCount = 0;
Expand Down Expand Up @@ -341,7 +341,7 @@ public function init(): void

public function test_skip_mode_with_multiple_events(): void
{
$projection = new #[ProjectionV2('skip_multiple_events'), ProjectionConfiguration(automaticInitialization: false)] class {
$projection = new #[ProjectionV2('skip_multiple_events'), ProjectionDeployment(manualKickOff: true)] class {
public const TICKET_CREATED = 'ticket.created';
public array $projectedEvents = [];
public int $initCallCount = 0;
Expand Down Expand Up @@ -384,7 +384,7 @@ public function init(): void

public function test_auto_mode_with_multiple_events(): void
{
$projection = new #[ProjectionV2('auto_multiple_events'), ProjectionConfiguration(automaticInitialization: true)] class {
$projection = new #[ProjectionV2('auto_multiple_events'), ProjectionDeployment(manualKickOff: false)] class {
public const TICKET_CREATED = 'ticket.created';
public array $projectedEvents = [];
public int $initCallCount = 0;
Expand Down
5 changes: 2 additions & 3 deletions packages/PdoEventSourcing/src/Config/EventSourcingModule.php
Original file line number Diff line number Diff line change
Expand Up @@ -520,8 +520,7 @@ private function registerEventStreamEmitter(Configuration $configuration, EventS
$linkingRouterHandler =
MessageProcessorActivatorBuilder::create()
->withInputChannelName(Uuid::uuid4()->toString())
/** linkTo can be used outside of Projection, then we should NOT filter events out */
->chain(MessageFilterBuilder::createBoolHeaderFilter(ProjectionEventHandler::PROJECTION_IS_REBUILDING, false))
->chain(MessageFilterBuilder::createNotBoolHeaderFilter(ProjectionEventHandler::PROJECTION_LIVE, false))
->chain(RouterProcessorBuilder::createRecipientListRouter([
$eventStoreHandler->getInputMessageChannelName(),
$eventBusChannelName,
Expand All @@ -539,7 +538,7 @@ private function registerEventStreamEmitter(Configuration $configuration, EventS
MessageProcessorActivatorBuilder::create()
->withInputChannelName(Uuid::uuid4()->toString())
->chain(new Definition(StreamNameMapper::class))
->chain(MessageFilterBuilder::createBoolHeaderFilter(ProjectionEventHandler::PROJECTION_IS_REBUILDING))
->chain(MessageFilterBuilder::createNotBoolHeaderFilter(ProjectionEventHandler::PROJECTION_LIVE))
->chain(RouterProcessorBuilder::createRecipientListRouter([
$eventStoreHandler->getInputMessageChannelName(),
$eventBusChannelName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class ProjectionEventHandler
{
public const PROJECTION_STATE = ProjectingHeaders::PROJECTION_STATE;
public const PROJECTION_EVENT_NAME = ProjectingHeaders::PROJECTION_EVENT_NAME;
public const PROJECTION_IS_REBUILDING = ProjectingHeaders::PROJECTION_IS_REBUILDING;
public const PROJECTION_LIVE = ProjectingHeaders::PROJECTION_LIVE;
public const PROJECTION_NAME = ProjectingHeaders::PROJECTION_NAME;

public function __construct(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,14 +226,15 @@ public function run(string $projectionName, ProjectionStreamSource $projectionSt
$routerChannel = $this->projectionSetupConfigurations[$projectionName]->getActionRouterChannel();
$conversionService = $this->conversionService;
$projection = $projection->whenAny(function ($state, Message $event) use ($projectionName, $status, $messagingEntrypoint, $routerChannel, $conversionService): mixed {
$isLive = $status != \Ecotone\EventSourcing\ProjectionStatus::REBUILDING();
$state = $messagingEntrypoint->sendWithHeaders(
$event->payload(),
array_merge(
$event->metadata(),
[
ProjectionEventHandler::PROJECTION_STATE => $state,
ProjectionEventHandler::PROJECTION_EVENT_NAME => $event->messageName(),
ProjectionEventHandler::PROJECTION_IS_REBUILDING => $status == \Ecotone\EventSourcing\ProjectionStatus::REBUILDING(),
ProjectionEventHandler::PROJECTION_LIVE => $isLive,
ProjectionEventHandler::PROJECTION_NAME => $projectionName,
MessageHeaders::STREAM_BASED_SOURCED => true,
]
Expand Down
Loading