Skip to content

Commit f1bc414

Browse files
committed
amqp reconnect on sending failure
1 parent 411efe6 commit f1bc414

File tree

1 file changed

+13
-5
lines changed

1 file changed

+13
-5
lines changed

packages/Amqp/src/AmqpOutboundChannelAdapter.php

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
use Ecotone\Messaging\MessageHandler;
1212
use Interop\Amqp\AmqpMessage;
1313
use Interop\Amqp\Impl\AmqpTopic;
14+
use AMQPChannelException;
15+
use AMQPConnectionException;
1416

1517
/**
1618
* @author Dariusz Gafka <support@simplycodedsoftware.com>
@@ -74,11 +76,17 @@ public function handle(Message $message): void
7476
$messageToSend
7577
->setDeliveryMode($this->defaultPersistentDelivery ? AmqpMessage::DELIVERY_MODE_PERSISTENT : AmqpMessage::DELIVERY_MODE_NON_PERSISTENT);
7678

77-
$this->connectionFactory->getProducer()
78-
->setTimeToLive($outboundMessage->getTimeToLive())
79-
->setDelayStrategy(new HeadersExchangeDelayStrategy())
80-
->setDeliveryDelay($outboundMessage->getDeliveryDelay())
79+
try {
80+
$this->connectionFactory->getProducer()
81+
->setTimeToLive($outboundMessage->getTimeToLive())
82+
->setDelayStrategy(new HeadersExchangeDelayStrategy())
83+
->setDeliveryDelay($outboundMessage->getDeliveryDelay())
8184
// this allow for having queue per delay instead of queue per delay + exchangeName
82-
->send(new AmqpTopic($exchangeName), $messageToSend);
85+
->send(new AmqpTopic($exchangeName), $messageToSend);
86+
}catch (AMQPConnectionException|AMQPChannelException $exception) {
87+
$this->connectionFactory->reconnect();
88+
89+
throw $exception;
90+
}
8391
}
8492
}

0 commit comments

Comments
 (0)