-
Notifications
You must be signed in to change notification settings - Fork 238
Description
Expected Behavior
Kafka Sender
KafkaSender<Object, Object> kafkaSender;
Create sender options and update kafkaSender
this.senderOptions = SenderOptions.<Object, Object>create(configurationProperties);
this.kafkaSender = KafkaSender.create(senderOptions);
Producer Record
ProducerRecord<Object, Object> record = new ProducerRecord<>(topic, value);
Sender Record
SenderRecord<Object, Object, Object> senderRecord = SenderRecord.create(record, requestId);
Send API
kafkaSender.send(Mono.just(senderRecord)).subscribe()
ACK 0 and bufferMemory: 33554432
When sending 3.15K messages per second (MPS) using two Kafka broker pods alongside the mentioned KafkaSender and sender record within the specified send API, scaling down the first broker pod initially resulted in no observable impact. However, upon scaling down the second broker pod, the application pod restarted due to an Out of Memory (OOM) kill with error code 137.
Even though buffer memory is limited to 32MB, the complete depletion of the 3GB heap memory caused by Kafka Producer crashed the application
Actual Behavior
The application is expected to handle exceptions gracefully, preventing the complete depletion of heap memory.
Steps to Reproduce
Kafka Sender
KafkaSender<Object, Object> kafkaSender;
Create sender options and update kafkaSender
this.senderOptions = SenderOptions.<Object, Object>create(configurationProperties);
this.kafkaSender = KafkaSender.create(senderOptions);
Producer Record
ProducerRecord<Object, Object> record = new ProducerRecord<>(topic, value);
Sender Record
SenderRecord<Object, Object, Object> senderRecord = SenderRecord.create(record, requestId);
Send API
kafkaSender.send(Mono.just(senderRecord)).subscribe()
@Test
void reproCase() {
}Possible Solution
Your Environment
- Reactor version(s) used: 1.3.22
- Other relevant libraries versions (eg.
netty, ...): apache kafka - JVM version (
java -version): 17.0.10 - OS and version (eg
uname -a): Linux