Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions packages/Amqp/src/AmqpBackedMessageChannelBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,18 @@ public static function create(
);
}

private function getAmqpOutboundChannelAdapter(): AmqpOutboundChannelAdapterBuilder
{
return $this->outboundChannelAdapter;
}

public function withPublisherAcknowledgments(bool $enabled): self
{
$this->getAmqpOutboundChannelAdapter()->withPublisherAcknowledgments($enabled);

return $this;
}

public function getMessageChannelName(): string
{
return $this->channelName;
Expand Down
47 changes: 28 additions & 19 deletions packages/Amqp/src/AmqpOutboundChannelAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@

use AMQPChannelException;
use AMQPConnectionException;
use Ecotone\Amqp\Transaction\AmqpTransactionInterceptor;
use Ecotone\Enqueue\CachedConnectionFactory;
use Ecotone\Messaging\Channel\PollableChannel\Serialization\OutboundMessageConverter;
use Ecotone\Messaging\Conversion\ConversionService;
use Ecotone\Messaging\Message;
use Ecotone\Messaging\MessageHandler;
use Enqueue\AmqpExt\AmqpContext;
use Interop\Amqp\AmqpMessage;
use Interop\Amqp\Impl\AmqpTopic;

Expand All @@ -29,16 +31,18 @@ class AmqpOutboundChannelAdapter implements MessageHandler
private $initialized = false;

public function __construct(
private CachedConnectionFactory $connectionFactory,
private AmqpAdmin $amqpAdmin,
private string $exchangeName,
private ?string $routingKey,
private ?string $routingKeyFromHeaderName,
private ?string $exchangeFromHeaderName,
private bool $defaultPersistentDelivery,
private bool $autoDeclare,
private OutboundMessageConverter $outboundMessageConverter,
private ConversionService $conversionService
private CachedConnectionFactory $connectionFactory,
private AmqpAdmin $amqpAdmin,
private string $exchangeName,
private ?string $routingKey,
private ?string $routingKeyFromHeaderName,
private ?string $exchangeFromHeaderName,
private bool $defaultPersistentDelivery,
private bool $autoDeclare,
private bool $publisherAcknowledgments,
private OutboundMessageConverter $outboundMessageConverter,
private ConversionService $conversionService,
private AmqpTransactionInterceptor $amqpTransactionInterceptor,
) {
}

Expand Down Expand Up @@ -76,17 +80,22 @@ public function handle(Message $message): void
$messageToSend
->setDeliveryMode($this->defaultPersistentDelivery ? AmqpMessage::DELIVERY_MODE_PERSISTENT : AmqpMessage::DELIVERY_MODE_NON_PERSISTENT);

try {
$this->connectionFactory->getProducer()
->setTimeToLive($outboundMessage->getTimeToLive())
->setDelayStrategy(new HeadersExchangeDelayStrategy())
->setDeliveryDelay($outboundMessage->getDeliveryDelay())
/** @var AmqpContext $context */
$context = $this->connectionFactory->createContext();
if ($this->publisherAcknowledgments && !$this->amqpTransactionInterceptor->isRunningInTransaction()) {
/** Ensures no messages are lost along the way when heartbeat is lost and ensures messages was peristed on the Broker side. Without this message can be simply "swallowed" without throwing exception */
$context->getExtChannel()->confirmSelect();
}

$this->connectionFactory->getProducer()
->setTimeToLive($outboundMessage->getTimeToLive())
->setDelayStrategy(new HeadersExchangeDelayStrategy())
->setDeliveryDelay($outboundMessage->getDeliveryDelay())
// this allow for having queue per delay instead of queue per delay + exchangeName
->send(new AmqpTopic($exchangeName), $messageToSend);
} catch (AMQPConnectionException|AMQPChannelException $exception) {
$this->connectionFactory->reconnect();
->send(new AmqpTopic($exchangeName), $messageToSend);

throw $exception;
if ($this->publisherAcknowledgments && !$this->amqpTransactionInterceptor->isRunningInTransaction()) {
$context->getExtChannel()->waitForConfirm();
}
}
}
11 changes: 11 additions & 0 deletions packages/Amqp/src/AmqpOutboundChannelAdapterBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Ecotone\Amqp;

use Ecotone\Amqp\Transaction\AmqpTransactionInterceptor;
use Ecotone\Enqueue\CachedConnectionFactory;
use Ecotone\Enqueue\EnqueueOutboundChannelAdapterBuilder;
use Ecotone\Messaging\Channel\PollableChannel\Serialization\OutboundMessageConverter;
Expand All @@ -26,6 +27,7 @@ class AmqpOutboundChannelAdapterBuilder extends EnqueueOutboundChannelAdapterBui
private string $exchangeName;
private bool $defaultPersistentDelivery = self::DEFAULT_PERSISTENT_MODE;
private array $staticHeadersToAdd = [];
private bool $publisherAcknowledgments = true;

private function __construct(string $exchangeName, string $amqpConnectionFactoryReferenceName)
{
Expand Down Expand Up @@ -56,6 +58,13 @@ public function withDefaultRoutingKey(string $routingKey): self
return $this;
}

public function withPublisherAcknowledgments(bool $publisherAcknowledgments): self
{
$this->publisherAcknowledgments = $publisherAcknowledgments;

return $this;
}

/**
* @param string $headerName
*
Expand Down Expand Up @@ -125,8 +134,10 @@ public function compile(MessagingContainerBuilder $builder): Definition
$this->exchangeFromHeader,
$this->defaultPersistentDelivery,
$this->autoDeclare,
$this->publisherAcknowledgments,
$outboundMessageConverter,
new Reference(ConversionService::REFERENCE_NAME),
Reference::to(AmqpTransactionInterceptor::class),
]);
}
}
9 changes: 8 additions & 1 deletion packages/Amqp/src/AmqpReconnectableConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ public function createContext(): Context
$this->reconnect();
}

return $this->connectionFactory->createContext();
$context = $this->connectionFactory->createContext();
$context->getExtChannel()->setConfirmCallback(fn() => false, fn() => throw new \RuntimeException("Message was failed to be persisted in RabbitMQ instance. Check RabbitMQ server logs."));

return $context;
}

public function getConnectionInstanceId(): string
Expand All @@ -63,6 +66,10 @@ public function isDisconnected(?Context $context): bool

Assert::isSubclassOf($context, AmqpContext::class, 'Context must be ' . AmqpContext::class);

if (!$context->getExtChannel()->getConnection()->isConnected()) {
return true;
}

return ! $context->getExtChannel()->isConnected();
}

Expand Down
9 changes: 8 additions & 1 deletion packages/Amqp/src/Transaction/AmqpTransactionInterceptor.php
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ public function transactional(
$this->logger->info(
'AMQP transaction was roll backed',
$message,
$exception
[
'exception' => $exception
]
);

throw $exception;
Expand All @@ -104,4 +106,9 @@ public function transactional(
$this->isRunningTransaction = false;
return $result;
}

public function isRunningInTransaction(): bool
{
return $this->isRunningTransaction;
}
}
8 changes: 2 additions & 6 deletions packages/Amqp/src/Transaction/AmqpTransactionModule.php
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,14 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
$amqpConfiguration = ExtensionObjectResolver::resolveUnique(AmqpConfiguration::class, $extensionObjects, AmqpConfiguration::createWithDefaults());
;

$isTransactionWrapperEnabled = false;
if ($amqpConfiguration->isTransactionOnAsynchronousEndpoints()) {
$pointcut .= '||' . AsynchronousRunningEndpoint::class;
$isTransactionWrapperEnabled = true;
}
if ($amqpConfiguration->isTransactionOnCommandBus()) {
$pointcut .= '||' . CommandBus::class . '';
$isTransactionWrapperEnabled = true;
$pointcut .= '||' . CommandBus::class;
}
if ($amqpConfiguration->isTransactionOnConsoleCommands()) {
$pointcut .= '||' . ConsoleCommand::class . '';
$isTransactionWrapperEnabled = true;
$pointcut .= '||' . ConsoleCommand::class;
}
if ($amqpConfiguration->getDefaultConnectionReferenceNames()) {
$connectionFactories = $amqpConfiguration->getDefaultConnectionReferenceNames();
Expand Down
33 changes: 0 additions & 33 deletions packages/Amqp/tests/Configuration/AmqpModuleTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,39 +48,6 @@ public function test_registering_amqp_backed_message_channel()
);
}

public function test_registering_amqp_backed_message_channel_with_application_media_type()
{
$amqpChannelBuilder = AmqpBackedMessageChannelBuilder::create('amqpChannel');
$messagingSystem = MessagingSystemConfiguration::prepareWithDefaults(
InMemoryModuleMessaging::createWith(
[AmqpModule::create(InMemoryAnnotationFinder::createEmpty(), InterfaceToCallRegistry::createEmpty())],
[
ServiceConfiguration::createWithDefaults()
->withDefaultSerializationMediaType(MediaType::APPLICATION_JSON),
$amqpChannelBuilder,
]
)
)
->registerMessageChannel($amqpChannelBuilder)
->registerConverter(new ArrayToJsonConverterBuilder())
->buildMessagingSystemFromConfiguration(
InMemoryReferenceSearchService::createWith(
[
AmqpConnectionFactory::class => $this->getCachedConnectionFactory(),
]
)
);

/** @var PollableChannel $channel */
$channel = $messagingSystem->getMessageChannelByName('amqpChannel');
$channel->send(MessageBuilder::withPayload([1, 2, 3])->setContentType(MediaType::createApplicationXPHPArray())->build());

$this->assertEquals(
'[1,2,3]',
$channel->receive()->getPayload()
);
}

public function test_registering_amqp_configuration()
{
$amqpExchange = AmqpExchange::createDirectExchange('exchange');
Expand Down
30 changes: 30 additions & 0 deletions packages/Amqp/tests/Integration/AmqpMessageChannelTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,36 @@ public function test_sending_and_receiving_message_from_amqp_message_channel()
$this->assertNull($messageChannel->receiveWithTimeout(1));
}

public function test_sending_and_receiving_without_delivery_guarantee()
{
$queueName = Uuid::uuid4()->toString();
$messagePayload = 'some';

$ecotoneLite = EcotoneLite::bootstrapForTesting(
containerOrAvailableServices: [
AmqpConnectionFactory::class => $this->getCachedConnectionFactory(),
],
configuration: ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::AMQP_PACKAGE]))
->withExtensionObjects([
AmqpBackedMessageChannelBuilder::create($queueName)
->withPublisherAcknowledgments(false),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

withPublisherConfirms maybe better?

])
);

/** @var PollableChannel $messageChannel */
$messageChannel = $ecotoneLite->getMessageChannelByName($queueName);

$messageChannel->send(MessageBuilder::withPayload($messagePayload)->build());

$this->assertEquals(
'some',
$messageChannel->receiveWithTimeout(100)->getPayload()
);

$this->assertNull($messageChannel->receiveWithTimeout(1));
}

public function test_sending_and_receiving_message_from_amqp_using_consumer()
{
$queueName = 'orders';
Expand Down
26 changes: 24 additions & 2 deletions packages/Amqp/tests/Integration/GeneralAmqpTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Ecotone\Lite\Test\FlowTestSupport;
use Ecotone\Messaging\Config\ModulePackageList;
use Ecotone\Messaging\Config\ServiceConfiguration;
use Ecotone\Messaging\Handler\Logger\EchoLogger;
use Enqueue\AmqpExt\AmqpConnectionFactory;
use Test\Ecotone\Amqp\AmqpMessagingTestCase;
use Test\Ecotone\Amqp\Fixture\Order\OrderErrorHandler;
Expand Down Expand Up @@ -42,6 +43,27 @@ public function test_products_are_on_list_after_being_ordered(): void
);
}

public function test_messages_are_delivered_after_lost_heartbeat(): void
{
$ecotone = $this->bootstrapEcotone(
namespaces: ['Test\Ecotone\Amqp\Fixture\Order'],
services: [new OrderService(), new OrderErrorHandler(),
// 'logger' => new EchoLogger()
],
amqpConfig: ['heartbeat' => 2]
);

$ecotone->sendCommandWithRoutingKey('order.register', 'milk');
sleep(5);
$ecotone->sendCommandWithRoutingKey('order.register', 'salt');
sleep(5);
$ecotone->sendCommandWithRoutingKey('order.register', 'sunflower');
$ecotone->run('orders');
$ecotone->run('orders');
$ecotone->run('orders');
self::assertEquals(['milk', 'salt', 'sunflower'], $ecotone->sendQueryWithRouting('order.getOrders'));
}

public function test_adding_product_to_shopping_cart_with_publisher_and_consumer(): void
{
$ecotone = $this->bootstrapEcotone(
Expand All @@ -58,10 +80,10 @@ public function test_adding_product_to_shopping_cart_with_publisher_and_consumer
);
}

private function bootstrapEcotone(array $namespaces, array $services): FlowTestSupport
private function bootstrapEcotone(array $namespaces, array $services, array $amqpConfig = []): FlowTestSupport
{
return EcotoneLite::bootstrapFlowTesting(
containerOrAvailableServices: array_merge([AmqpConnectionFactory::class => $this->getCachedConnectionFactory()], $services),
containerOrAvailableServices: array_merge([AmqpConnectionFactory::class => $this->getCachedConnectionFactory($amqpConfig)], $services),
configuration: ServiceConfiguration::createWithDefaults()
->withEnvironment('prod')
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::AMQP_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE]))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public function afterSendCompletion(Message $message, MessageChannel $messageCha
if ($exception !== null) {
$attempt = 1;
while ($this->retryTemplate->canBeCalledNextTime($attempt)) {
$this->logger->info("Message was not sent to {$this->relatedChannel} due to exception. Trying to self-heal by doing retry attempt: {$attempt}/{$this->retryTemplate->getMaxAttempts()}", [
$this->logger->info("Message was not sent to {$this->relatedChannel} due to exception. Trying to self-heal by doing retry attempt: {$attempt}/{$this->retryTemplate->getMaxAttempts()}. Exception message: `{$exception->getMessage()}`", [
'exception' => $exception->getMessage(),
'relatedChannel' => $this->relatedChannel,
]);
Expand All @@ -60,7 +60,7 @@ public function afterSendCompletion(Message $message, MessageChannel $messageCha
}
}

$this->logger->error("Message was not sent to {$this->relatedChannel} due to exception. No more retries will be done", [
$this->logger->error("Message was not sent to {$this->relatedChannel} due to exception. No more retries will be done. Exception message: `{$exception->getMessage()}`", [
'exception' => $exception->getMessage(),
'relatedChannel' => $this->relatedChannel,
]);
Expand Down
1 change: 1 addition & 0 deletions packages/Enqueue/src/CachedConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public function createContext(): Context
public function reconnect(): void
{
$this->connectionFactory->reconnect();
$this->cachedContext = [];
}

public function getConsumer(Destination $destination): Consumer
Expand Down
Loading