Skip to content

Commit f46c3b4

Browse files
committed
fix for transactions
1 parent ca54a7c commit f46c3b4

File tree

6 files changed

+27
-41
lines changed

6 files changed

+27
-41
lines changed

packages/Amqp/src/AmqpOutboundChannelAdapter.php

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
use AMQPChannelException;
88
use AMQPConnectionException;
9+
use Ecotone\Amqp\Transaction\AmqpTransactionInterceptor;
910
use Ecotone\Enqueue\CachedConnectionFactory;
1011
use Ecotone\Messaging\Channel\PollableChannel\Serialization\OutboundMessageConverter;
1112
use Ecotone\Messaging\Conversion\ConversionService;
@@ -39,7 +40,8 @@ public function __construct(
3940
private bool $defaultPersistentDelivery,
4041
private bool $autoDeclare,
4142
private OutboundMessageConverter $outboundMessageConverter,
42-
private ConversionService $conversionService
43+
private ConversionService $conversionService,
44+
private AmqpTransactionInterceptor $amqpTransactionInterceptor,
4345
) {
4446
}
4547

@@ -79,8 +81,10 @@ public function handle(Message $message): void
7981

8082
/** @var AmqpContext $context */
8183
$context = $this->connectionFactory->createContext();
82-
/** Ensures no messages are lost along the way when heartbeat is lost and ensures messages was peristed on the Broker side. Without this message can be simply "swallowed" without throwing exception */
83-
$context->getExtChannel()->confirmSelect();
84+
if (!$this->amqpTransactionInterceptor->isRunningInTransaction()) {
85+
/** Ensures no messages are lost along the way when heartbeat is lost and ensures messages was peristed on the Broker side. Without this message can be simply "swallowed" without throwing exception */
86+
$context->getExtChannel()->confirmSelect();
87+
}
8488

8589
$this->connectionFactory->getProducer()
8690
->setTimeToLive($outboundMessage->getTimeToLive())
@@ -89,7 +93,8 @@ public function handle(Message $message): void
8993
// this allow for having queue per delay instead of queue per delay + exchangeName
9094
->send(new AmqpTopic($exchangeName), $messageToSend);
9195

92-
$context->getExtChannel()->setConfirmCallback(fn() => false, fn() => throw new \RuntimeException("Message was not sent to exchange {$exchangeName} with routing key {$routingKey}"));
93-
$context->getExtChannel()->waitForConfirm();
96+
if (!$this->amqpTransactionInterceptor->isRunningInTransaction()) {
97+
$context->getExtChannel()->waitForConfirm();
98+
}
9499
}
95100
}

packages/Amqp/src/AmqpOutboundChannelAdapterBuilder.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
namespace Ecotone\Amqp;
66

7+
use Ecotone\Amqp\Transaction\AmqpTransactionInterceptor;
78
use Ecotone\Enqueue\CachedConnectionFactory;
89
use Ecotone\Enqueue\EnqueueOutboundChannelAdapterBuilder;
910
use Ecotone\Messaging\Channel\PollableChannel\Serialization\OutboundMessageConverter;
@@ -127,6 +128,7 @@ public function compile(MessagingContainerBuilder $builder): Definition
127128
$this->autoDeclare,
128129
$outboundMessageConverter,
129130
new Reference(ConversionService::REFERENCE_NAME),
131+
Reference::to(AmqpTransactionInterceptor::class),
130132
]);
131133
}
132134
}

packages/Amqp/src/AmqpReconnectableConnectionFactory.php

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,10 @@ public function createContext(): Context
3737
$this->reconnect();
3838
}
3939

40-
return $this->connectionFactory->createContext();
40+
$context = $this->connectionFactory->createContext();
41+
$context->getExtChannel()->setConfirmCallback(fn() => false, fn() => throw new \RuntimeException("Message was failed to be persisted in RabbitMQ instance. Check RabbitMQ server logs."));
42+
43+
return $context;
4144
}
4245

4346
public function getConnectionInstanceId(): string

packages/Amqp/src/Transaction/AmqpTransactionInterceptor.php

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,9 @@ public function transactional(
9090
$this->logger->info(
9191
'AMQP transaction was roll backed',
9292
$message,
93-
$exception
93+
[
94+
'exception' => $exception
95+
]
9496
);
9597

9698
throw $exception;
@@ -104,4 +106,9 @@ public function transactional(
104106
$this->isRunningTransaction = false;
105107
return $result;
106108
}
109+
110+
public function isRunningInTransaction(): bool
111+
{
112+
return $this->isRunningTransaction;
113+
}
107114
}

packages/Amqp/tests/Configuration/AmqpModuleTest.php

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -48,39 +48,6 @@ public function test_registering_amqp_backed_message_channel()
4848
);
4949
}
5050

51-
public function test_registering_amqp_backed_message_channel_with_application_media_type()
52-
{
53-
$amqpChannelBuilder = AmqpBackedMessageChannelBuilder::create('amqpChannel');
54-
$messagingSystem = MessagingSystemConfiguration::prepareWithDefaults(
55-
InMemoryModuleMessaging::createWith(
56-
[AmqpModule::create(InMemoryAnnotationFinder::createEmpty(), InterfaceToCallRegistry::createEmpty())],
57-
[
58-
ServiceConfiguration::createWithDefaults()
59-
->withDefaultSerializationMediaType(MediaType::APPLICATION_JSON),
60-
$amqpChannelBuilder,
61-
]
62-
)
63-
)
64-
->registerMessageChannel($amqpChannelBuilder)
65-
->registerConverter(new ArrayToJsonConverterBuilder())
66-
->buildMessagingSystemFromConfiguration(
67-
InMemoryReferenceSearchService::createWith(
68-
[
69-
AmqpConnectionFactory::class => $this->getCachedConnectionFactory(),
70-
]
71-
)
72-
);
73-
74-
/** @var PollableChannel $channel */
75-
$channel = $messagingSystem->getMessageChannelByName('amqpChannel');
76-
$channel->send(MessageBuilder::withPayload([1, 2, 3])->setContentType(MediaType::createApplicationXPHPArray())->build());
77-
78-
$this->assertEquals(
79-
'[1,2,3]',
80-
$channel->receive()->getPayload()
81-
);
82-
}
83-
8451
public function test_registering_amqp_configuration()
8552
{
8653
$amqpExchange = AmqpExchange::createDirectExchange('exchange');

packages/Amqp/tests/Integration/GeneralAmqpTest.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@ public function test_messages_are_delivered_after_lost_heartbeat(): void
4747
{
4848
$ecotone = $this->bootstrapEcotone(
4949
namespaces: ['Test\Ecotone\Amqp\Fixture\Order'],
50-
services: [new OrderService(), new OrderErrorHandler(), 'logger' => new EchoLogger()],
50+
services: [new OrderService(), new OrderErrorHandler(),
51+
// 'logger' => new EchoLogger()
52+
],
5153
amqpConfig: ['heartbeat' => 2]
5254
);
5355

0 commit comments

Comments
 (0)