NIFI-15545 Extend KafkaConsumerService.getPartitionStates() to return lag information#10852
NIFI-15545 Extend KafkaConsumerService.getPartitionStates() to return lag information#10852lkuchars wants to merge 1 commit intoapache:mainfrom
Conversation
pvillard31
left a comment
There was a problem hiding this comment.
Warning: src/main/java/org/apache/nifi/kafka/service/api/common/PartitionState.java:[33,19] (whitespace) WhitespaceAfter: ',' is not followed by whitespace.
Warning: src/main/java/org/apache/nifi/kafka/service/api/common/ConsumerMetrics.java:[25,19] (modifier) ModifierOrder: 'static' modifier out of order with the JLS suggestions.
Warning: src/main/java/org/apache/nifi/kafka/service/api/common/ConsumerMetrics.java:[33,30] (whitespace) WhitespaceAround: '{' is not preceded with whitespace.
Warning: src/main/java/org/apache/nifi/kafka/service/api/common/ConsumerMetrics.java:[37,42] (whitespace) WhitespaceAround: '{' is not preceded with whitespace.
Please make sure that
mvn clean install -Pcontrib-check -Pintegration-tests
is passing from within the nifi-kafka-bundle directory.
I'd also suggest adding an integration test for completeness.
exceptionfactory
left a comment
There was a problem hiding this comment.
Thanks for working on this addition @lkuchars.
On initial review, I'm not sure that introducing ConsumerMetrics to the PartitionState class is the best approach. The PartitionState class is also used in the Kafka Producer Service, where Consumer Metrics never apply.
Instead, it seems better to add a new method to the KafkaConsumerService interface, taking a TopicPartitionSummary and returning the OptionalLong of current lag. There is a balance between simply reproducing Kafka Client methods, but in this case, it seems better to keep the methods distinct.
| this.currentLag = OptionalLong.empty(); | ||
| } | ||
|
|
||
| public static ConsumerMetrics empty(){ |
There was a problem hiding this comment.
This method seems unnecessary, as on the one hand, this class is just a wrapper for the currentLag. If the goal is the support other metrics in the future, it may be less clear what empty means.
There was a problem hiding this comment.
Yes, there are lots of other metrics on the consumer group that I had in mind for later.
Good point @exceptionfactory. I agree that the new method would be a better fit. But, Instead of using the method to return just the lag I would call this WDYT? |
|
Thanks for the reply @lkuchars. I think some additional consideration is needed for those other metrics. In particular, evaluating options for plugging in an implementation that integrates more closely with Kafka's available telemetry capabilities. That is worth considering in a separate Jira issue. For now, it seems best to proceed with the narrow additional of support for current lag, as it aligns with available Kafka Consumer methods. |
7da5c49 to
0ce8582
Compare
@pvillard31 Thanks for looking at the changes. Sorry for that. I forgot about contrib-check profile. I rarely push sth here. Also, I've added the ITest on the first push. |
NIFI-15545 Extend KafkaConsumerService.getPartitionStates() to return lag information
Summary
NIFI-15545
Tracking
Please complete the following tracking steps prior to pull request creation.
Issue Tracking
Pull Request Tracking
NIFI-00000NIFI-00000VerifiedstatusPull Request Formatting
mainbranchVerification
Please indicate the verification steps performed prior to pull request creation.
Build
./mvnw clean install -P contrib-checkLicensing
LICENSEandNOTICEfilesDocumentation