Skip to content

NIFI-15545 Extend KafkaConsumerService.getPartitionStates() to return lag information#10852

Open
lkuchars wants to merge 1 commit intoapache:mainfrom
lkuchars:NIFI-15545-extend-get-partition-states
Open

NIFI-15545 Extend KafkaConsumerService.getPartitionStates() to return lag information#10852
lkuchars wants to merge 1 commit intoapache:mainfrom
lkuchars:NIFI-15545-extend-get-partition-states

Conversation

@lkuchars
Copy link
Contributor

@lkuchars lkuchars commented Feb 4, 2026

NIFI-15545 Extend KafkaConsumerService.getPartitionStates() to return lag information

Summary

NIFI-15545

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000
  • Pull request contains commits signed with a registered key indicating Verified status

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using ./mvnw clean install -P contrib-check
    • JDK 21
    • JDK 25

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

Copy link
Contributor

@pvillard31 pvillard31 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Warning:  src/main/java/org/apache/nifi/kafka/service/api/common/PartitionState.java:[33,19] (whitespace) WhitespaceAfter: ',' is not followed by whitespace.
Warning:  src/main/java/org/apache/nifi/kafka/service/api/common/ConsumerMetrics.java:[25,19] (modifier) ModifierOrder: 'static' modifier out of order with the JLS suggestions.
Warning:  src/main/java/org/apache/nifi/kafka/service/api/common/ConsumerMetrics.java:[33,30] (whitespace) WhitespaceAround: '{' is not preceded with whitespace.
Warning:  src/main/java/org/apache/nifi/kafka/service/api/common/ConsumerMetrics.java:[37,42] (whitespace) WhitespaceAround: '{' is not preceded with whitespace.

Please make sure that

mvn clean install -Pcontrib-check -Pintegration-tests

is passing from within the nifi-kafka-bundle directory.

I'd also suggest adding an integration test for completeness.

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this addition @lkuchars.

On initial review, I'm not sure that introducing ConsumerMetrics to the PartitionState class is the best approach. The PartitionState class is also used in the Kafka Producer Service, where Consumer Metrics never apply.

Instead, it seems better to add a new method to the KafkaConsumerService interface, taking a TopicPartitionSummary and returning the OptionalLong of current lag. There is a balance between simply reproducing Kafka Client methods, but in this case, it seems better to keep the methods distinct.

this.currentLag = OptionalLong.empty();
}

public static ConsumerMetrics empty(){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method seems unnecessary, as on the one hand, this class is just a wrapper for the currentLag. If the goal is the support other metrics in the future, it may be less clear what empty means.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there are lots of other metrics on the consumer group that I had in mind for later.

@lkuchars
Copy link
Contributor Author

lkuchars commented Feb 5, 2026

Instead, it seems better to add a new method to the KafkaConsumerService interface, taking a TopicPartitionSummary and returning the OptionalLong of current lag. There is a balance between simply reproducing Kafka Client methods, but in this case, it seems better to keep the methods distinct.

Good point @exceptionfactory. I agree that the new method would be a better fit. But, Instead of using the method to return just the lag I would call this metrics() or kafkaMetrics(), it has a corresponding method in the KafkaConsumer implementation. The new method would return a new object of class Metrics, and for now, the metrics returned there would be lag, bytes received, throughput, just a few useful ones. KafkaConsumer collects/computes metrics on poll.
Whole list is available here https://kafka.apache.org/41/operations/monitoring/#common-monitoring-metrics-for-producerconsumerconnectstreams

WDYT?

@exceptionfactory
Copy link
Contributor

Thanks for the reply @lkuchars.

I think some additional consideration is needed for those other metrics. In particular, evaluating options for plugging in an implementation that integrates more closely with Kafka's available telemetry capabilities. That is worth considering in a separate Jira issue.

For now, it seems best to proceed with the narrow additional of support for current lag, as it aligns with available Kafka Consumer methods.

@lkuchars lkuchars force-pushed the NIFI-15545-extend-get-partition-states branch from 7da5c49 to 0ce8582 Compare February 5, 2026 22:36
@lkuchars
Copy link
Contributor Author

lkuchars commented Feb 5, 2026

Please make sure that

mvn clean install -Pcontrib-check -Pintegration-tests

@pvillard31 Thanks for looking at the changes. Sorry for that. I forgot about contrib-check profile. I rarely push sth here. Also, I've added the ITest on the first push.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants