Projects
Eulaceura:Factory
kafka
_service:obs_scm:0006-NPE-subscriptionState.patch
Sign Up
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File _service:obs_scm:0006-NPE-subscriptionState.patch of Package kafka
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 30491110a3..ce81aa1b95 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -539,10 +539,13 @@ public class SubscriptionState { synchronized Long partitionLag(TopicPartition tp, IsolationLevel isolationLevel) { TopicPartitionState topicPartitionState = assignedState(tp); - if (isolationLevel == IsolationLevel.READ_COMMITTED) + if (topicPartitionState.position == null) { + return null; + } else if (isolationLevel == IsolationLevel.READ_COMMITTED) { return topicPartitionState.lastStableOffset == null ? null : topicPartitionState.lastStableOffset - topicPartitionState.position.offset; - else + } else { return topicPartitionState.highWatermark == null ? null : topicPartitionState.highWatermark - topicPartitionState.position.offset; + } } synchronized Long partitionLead(TopicPartition tp) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java index d6e88008b5..d19234fe8a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.consumer.internals.SubscriptionState.LogTruncation; +import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; @@ -794,4 +795,18 @@ public class SubscriptionStateTest { assertFalse(state.isOffsetResetNeeded(tp0)); } + @Test + public void nullPositionLagOnNoPosition() { + state.assignFromUser(Collections.singleton(tp0)); + + assertNull(state.partitionLag(tp0, IsolationLevel.READ_UNCOMMITTED)); + assertNull(state.partitionLag(tp0, IsolationLevel.READ_COMMITTED)); + + state.updateHighWatermark(tp0, 1L); + state.updateLastStableOffset(tp0, 1L); + + assertNull(state.partitionLag(tp0, IsolationLevel.READ_UNCOMMITTED)); + assertNull(state.partitionLag(tp0, IsolationLevel.READ_COMMITTED)); + } + }
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.
浙ICP备2022010568号-2