Projects
Eulaceura:Factory
kafka
_service:obs_scm:0013-AlterIsr.patch
Sign Up
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File _service:obs_scm:0013-AlterIsr.patch of Package kafka
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index a58f4238ff..88b337311d 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -581,9 +581,6 @@ class Partition(val topicPartition: TopicPartition, leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset) zkVersion = partitionState.zkVersion - // Clear any pending AlterIsr requests and check replica state - alterIsrManager.clearPending(topicPartition) - // In the case of successive leader elections in a short time period, a follower may have // entries in its log from a later epoch than any entry in the new leader's log. In order // to ensure that these followers can truncate to the right offset, we must cache the new @@ -661,9 +658,6 @@ class Partition(val topicPartition: TopicPartition, leaderEpochStartOffsetOpt = None zkVersion = partitionState.zkVersion - // Since we might have been a leader previously, still clear any pending AlterIsr requests - alterIsrManager.clearPending(topicPartition) - if (leaderReplicaIdOpt.contains(newLeaderBrokerId) && leaderEpoch == oldLeaderEpoch) { false } else { @@ -1373,13 +1367,15 @@ class Partition(val topicPartition: TopicPartition, isrState = proposedIsrState if (!alterIsrManager.submit(alterIsrItem)) { - // If the ISR manager did not accept our update, we need to revert back to previous state + // If the ISR manager did not accept our update, we need to revert the proposed state. + // This can happen if the ISR state was updated by the controller (via LeaderAndIsr in ZK-mode or + // ChangePartitionRecord in KRaft mode) but we have an AlterIsr request still in-flight. isrState = oldState isrChangeListener.markFailed() - throw new IllegalStateException(s"Failed to enqueue ISR change state $newLeaderAndIsr for partition $topicPartition") + warn(s"Failed to enqueue ISR change state $newLeaderAndIsr for partition $topicPartition") + } else { + debug(s"Enqueued ISR change to state $newLeaderAndIsr after transition to $proposedIsrState") } - - debug(s"Enqueued ISR change to state $newLeaderAndIsr after transition to $proposedIsrState") } /** diff --git a/core/src/main/scala/kafka/server/AlterIsrManager.scala b/core/src/main/scala/kafka/server/AlterIsrManager.scala index 9ad734f708..1059a3df3e 100644 --- a/core/src/main/scala/kafka/server/AlterIsrManager.scala +++ b/core/src/main/scala/kafka/server/AlterIsrManager.scala @@ -49,8 +49,6 @@ trait AlterIsrManager { def shutdown(): Unit = {} def submit(alterIsrItem: AlterIsrItem): Boolean - - def clearPending(topicPartition: TopicPartition): Unit } case class AlterIsrItem(topicPartition: TopicPartition, @@ -134,9 +132,6 @@ class DefaultAlterIsrManager( enqueued } - override def clearPending(topicPartition: TopicPartition): Unit = { - unsentIsrUpdates.remove(topicPartition) - } private[server] def maybePropagateIsrChanges(): Unit = { // Send all pending items if there is not already a request in-flight. diff --git a/core/src/main/scala/kafka/server/ZkIsrManager.scala b/core/src/main/scala/kafka/server/ZkIsrManager.scala index 2d88aac6b4..8dffcdf307 100644 --- a/core/src/main/scala/kafka/server/ZkIsrManager.scala +++ b/core/src/main/scala/kafka/server/ZkIsrManager.scala @@ -55,12 +55,6 @@ class ZkIsrManager(scheduler: Scheduler, time: Time, zkClient: KafkaZkClient) ex period = isrChangeNotificationConfig.checkIntervalMs, unit = TimeUnit.MILLISECONDS) } - override def clearPending(topicPartition: TopicPartition): Unit = { - // Since we always immediately process ZK updates and never actually enqueue anything, there is nothing to - // clear here so this is a no-op. Even if there are changes that have not been propagated, the write to ZK - // has already happened, so we may as well send the notification to the controller. - } - override def submit(alterIsrItem: AlterIsrItem): Boolean = { debug(s"Writing new ISR ${alterIsrItem.leaderAndIsr.isr} to ZooKeeper with version " + s"${alterIsrItem.leaderAndIsr.zkVersion} for partition ${alterIsrItem.topicPartition}") diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala index 5eedb63ae5..4dbd735753 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala @@ -18,10 +18,10 @@ package kafka.controller import java.util.Properties -import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue} - +import java.util.concurrent.{CompletableFuture, CountDownLatch, LinkedBlockingQueue, TimeUnit} import com.yammer.metrics.core.Timer import kafka.api.{ApiVersion, KAFKA_2_6_IV0, KAFKA_2_7_IV0, LeaderAndIsr} +import kafka.controller.KafkaController.AlterIsrCallback import kafka.metrics.KafkaYammerMetrics import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.{LogCaptureAppender, TestUtils} @@ -849,6 +849,67 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { latch.await() } + @Test + def testAlterIsrErrors(): Unit = { + servers = makeServers(1) + val controllerId = TestUtils.waitUntilControllerElected(zkClient) + val tp = new TopicPartition("t", 0) + val assignment = Map(tp.partition -> Seq(controllerId)) + TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) + val controller = getController().kafkaController + var future = captureAlterIsrError(controllerId, controller.brokerEpoch - 1, + Map(tp -> LeaderAndIsr(controllerId, List(controllerId)))) + var capturedError = future.get(5, TimeUnit.SECONDS) + assertEquals(Errors.STALE_BROKER_EPOCH, capturedError) + + future = captureAlterIsrError(99, controller.brokerEpoch, + Map(tp -> LeaderAndIsr(controllerId, List(controllerId)))) + capturedError = future.get(5, TimeUnit.SECONDS) + assertEquals(Errors.STALE_BROKER_EPOCH, capturedError) + + val unknownTopicPartition = new TopicPartition("unknown", 99) + future = captureAlterIsrPartitionError(controllerId, controller.brokerEpoch, + Map(unknownTopicPartition -> LeaderAndIsr(controllerId, List(controllerId))), unknownTopicPartition) + capturedError = future.get(5, TimeUnit.SECONDS) + assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, capturedError) + + future = captureAlterIsrPartitionError(controllerId, controller.brokerEpoch, + Map(tp -> LeaderAndIsr(controllerId, 1, List(controllerId), 99)), tp) + capturedError = future.get(5, TimeUnit.SECONDS) + assertEquals(Errors.INVALID_UPDATE_VERSION, capturedError) + } + + def captureAlterIsrError(brokerId: Int, brokerEpoch: Long, isrsToAlter: Map[TopicPartition, LeaderAndIsr]): CompletableFuture[Errors] = { + val future = new CompletableFuture[Errors]() + val controller = getController().kafkaController + val callback: AlterIsrCallback = { + case Left(_: Map[TopicPartition, Either[Errors, LeaderAndIsr]]) => + future.completeExceptionally(new AssertionError(s"Should have seen top-level error")) + case Right(error: Errors) => + future.complete(error) + } + controller.eventManager.put(AlterIsrReceived(brokerId, brokerEpoch, isrsToAlter, callback)) + future + } + + def captureAlterIsrPartitionError(brokerId: Int, brokerEpoch: Long, isrsToAlter: Map[TopicPartition, LeaderAndIsr], tp: TopicPartition): CompletableFuture[Errors] = { + val future = new CompletableFuture[Errors]() + val controller = getController().kafkaController + val callback: AlterIsrCallback = { + case Left(partitionResults: Map[TopicPartition, Either[Errors, LeaderAndIsr]]) => + partitionResults.get(tp) match { + case Some(Left(error: Errors)) => future.complete(error) + case Some(Right(_: LeaderAndIsr)) => future.completeExceptionally(new AssertionError(s"Should have seen an error for $tp in result")) + case None => future.completeExceptionally(new AssertionError(s"Should have seen $tp in result")) + } + case Right(_: Errors) => + future.completeExceptionally(new AssertionError(s"Should not seen top-level error")) + } + controller.eventManager.put(AlterIsrReceived(brokerId, brokerEpoch, isrsToAlter, callback)) + future + } + + @Test def testTopicIdsAreAdded(): Unit = { servers = makeServers(1) diff --git a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala index 1074fd3157..1c8c81471f 100644 --- a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala @@ -70,8 +70,10 @@ class AlterIsrManagerTest { @Test def testOverwriteWithinBatch(): Unit = { val capture = EasyMock.newCapture[AbstractRequest.Builder[AlterIsrRequest]]() + val callbackCapture = EasyMock.newCapture[ControllerRequestCompletionHandler]() + EasyMock.expect(brokerToController.start()) - EasyMock.expect(brokerToController.sendRequest(EasyMock.capture(capture), EasyMock.anyObject())).once() + EasyMock.expect(brokerToController.sendRequest(EasyMock.capture(capture), EasyMock.capture(callbackCapture))).times(2) EasyMock.replay(brokerToController) val scheduler = new MockScheduler(time) @@ -81,11 +83,21 @@ class AlterIsrManagerTest { // Only send one ISR update for a given topic+partition assertTrue(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0))) assertFalse(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2), 10), _ => {}, 0))) + + // Simulate response + val alterIsrResp = partitionResponse(tp0, Errors.NONE) + val resp = new ClientResponse(null, null, "", 0L, 0L, + false, null, null, alterIsrResp) + callbackCapture.getValue.onComplete(resp) + + // Now we can submit this partition again + assertTrue(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1), 10), _ => {}, 0))) EasyMock.verify(brokerToController) + // Make sure we sent the right request ISR={1} val request = capture.getValue.build() assertEquals(request.data().topics().size(), 1) - assertEquals(request.data().topics().get(0).partitions().get(0).newIsr().size(), 3) + assertEquals(request.data().topics().get(0).partitions().get(0).newIsr().size(), 1) } @Test diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 43df2b97f4..8e52007bc7 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -1106,10 +1106,6 @@ object TestUtils extends Logging { } } - override def clearPending(topicPartition: TopicPartition): Unit = { - inFlight.set(false); - } - def completeIsrUpdate(newZkVersion: Int): Unit = { if (inFlight.compareAndSet(true, false)) { val item = isrUpdates.head
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.
浙ICP备2022010568号-2