Skip to content

Commit ca54a7c

Browse files
committed
ack
1 parent b8ea8d0 commit ca54a7c

File tree

2 files changed

+11
-2
lines changed

2 files changed

+11
-2
lines changed

packages/Amqp/src/AmqpOutboundChannelAdapter.php

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use Ecotone\Messaging\Conversion\ConversionService;
1212
use Ecotone\Messaging\Message;
1313
use Ecotone\Messaging\MessageHandler;
14+
use Enqueue\AmqpExt\AmqpContext;
1415
use Interop\Amqp\AmqpMessage;
1516
use Interop\Amqp\Impl\AmqpTopic;
1617

@@ -76,11 +77,19 @@ public function handle(Message $message): void
7677
$messageToSend
7778
->setDeliveryMode($this->defaultPersistentDelivery ? AmqpMessage::DELIVERY_MODE_PERSISTENT : AmqpMessage::DELIVERY_MODE_NON_PERSISTENT);
7879

80+
/** @var AmqpContext $context */
81+
$context = $this->connectionFactory->createContext();
82+
/** Ensures no messages are lost along the way when heartbeat is lost and ensures messages was peristed on the Broker side. Without this message can be simply "swallowed" without throwing exception */
83+
$context->getExtChannel()->confirmSelect();
84+
7985
$this->connectionFactory->getProducer()
8086
->setTimeToLive($outboundMessage->getTimeToLive())
8187
->setDelayStrategy(new HeadersExchangeDelayStrategy())
8288
->setDeliveryDelay($outboundMessage->getDeliveryDelay())
8389
// this allow for having queue per delay instead of queue per delay + exchangeName
8490
->send(new AmqpTopic($exchangeName), $messageToSend);
91+
92+
$context->getExtChannel()->setConfirmCallback(fn() => false, fn() => throw new \RuntimeException("Message was not sent to exchange {$exchangeName} with routing key {$routingKey}"));
93+
$context->getExtChannel()->waitForConfirm();
8594
}
8695
}

packages/Ecotone/src/Messaging/Channel/PollableChannel/SendRetries/SendRetryChannelInterceptor.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public function afterSendCompletion(Message $message, MessageChannel $messageCha
4141
if ($exception !== null) {
4242
$attempt = 1;
4343
while ($this->retryTemplate->canBeCalledNextTime($attempt)) {
44-
$this->logger->info("Message was not sent to {$this->relatedChannel} due to exception. Trying to self-heal by doing retry attempt: {$attempt}/{$this->retryTemplate->getMaxAttempts()}", [
44+
$this->logger->info("Message was not sent to {$this->relatedChannel} due to exception. Trying to self-heal by doing retry attempt: {$attempt}/{$this->retryTemplate->getMaxAttempts()}. Exception message: `{$exception->getMessage()}`", [
4545
'exception' => $exception->getMessage(),
4646
'relatedChannel' => $this->relatedChannel,
4747
]);
@@ -60,7 +60,7 @@ public function afterSendCompletion(Message $message, MessageChannel $messageCha
6060
}
6161
}
6262

63-
$this->logger->error("Message was not sent to {$this->relatedChannel} due to exception. No more retries will be done", [
63+
$this->logger->error("Message was not sent to {$this->relatedChannel} due to exception. No more retries will be done. Exception message: `{$exception->getMessage()}`", [
6464
'exception' => $exception->getMessage(),
6565
'relatedChannel' => $this->relatedChannel,
6666
]);

0 commit comments

Comments
 (0)