Projects
Eulaceura:Factory
kafka
_service:obs_scm:0017-fix-log-clean.patch
Sign Up
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File _service:obs_scm:0017-fix-log-clean.patch of Package kafka
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 7a8a13c6e7..177b460d38 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -840,7 +840,10 @@ private[log] class Cleaner(val id: Int, logSize + segs.head.size <= maxSize && indexSize + segs.head.offsetIndex.sizeInBytes <= maxIndexSize && timeIndexSize + segs.head.timeIndex.sizeInBytes <= maxIndexSize && - lastOffsetForFirstSegment(segs, firstUncleanableOffset) - group.last.baseOffset <= Int.MaxValue) { + //if first segment size is 0, we don't need to do the index offset range check. + //this will avoid empty log left every 2^31 message. + (segs.head.size == 0 || + lastOffsetForFirstSegment(segs, firstUncleanableOffset) - group.last.baseOffset <= Int.MaxValue)) { group = segs.head :: group logSize += segs.head.size indexSize += segs.head.offsetIndex.sizeInBytes diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 43bc3b9f28..e5984c4f31 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -1258,6 +1258,53 @@ class LogCleanerTest { "All but the last group should be the target size.") } + @Test + def testSegmentGroupingWithSparseOffsetsAndEmptySegments(): Unit ={ + val cleaner = makeCleaner(Int.MaxValue) + val logProps = new Properties() + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) + + val k="key".getBytes() + val v="val".getBytes() + + //create 3 segments + for(i <- 0 until 3){ + log.appendAsLeader(TestUtils.singletonRecords(value = v, key = k), leaderEpoch = 0) + //0 to Int.MaxValue is Int.MaxValue+1 message, -1 will be the last message of i-th segment + val records = messageWithOffset(k, v, (i + 1L) * (Int.MaxValue + 1L) -1 ) + log.appendAsFollower(records) + assertEquals(i + 1, log.numberOfSegments) + } + + //4th active segment, not clean + log.appendAsLeader(TestUtils.singletonRecords(value = v, key = k), leaderEpoch = 0) + + val totalSegments = 4 + //last segment not cleanable + val firstUncleanableOffset = log.logEndOffset - 1 + val notCleanableSegments = 1 + + assertEquals(totalSegments, log.numberOfSegments) + var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, firstUncleanableOffset) + //because index file uses 4 byte relative index offset and current segments all none empty, + //segments will not group even their size is very small. + assertEquals(totalSegments - notCleanableSegments, groups.size) + //do clean to clean first 2 segments to empty + cleaner.clean(LogToClean(log.topicPartition, log, 0, firstUncleanableOffset)) + assertEquals(totalSegments, log.numberOfSegments) + assertEquals(0, log.logSegments.head.size) + + //after clean we got 2 empty segment, they will group together this time + groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, firstUncleanableOffset) + val noneEmptySegment = 1 + assertEquals(noneEmptySegment + 1, groups.size) + + //trigger a clean and 2 empty segments should cleaned to 1 + cleaner.clean(LogToClean(log.topicPartition, log, 0, firstUncleanableOffset)) + assertEquals(totalSegments - 1, log.numberOfSegments) + } + + /** * Validate the logic for grouping log segments together for cleaning when only a small number of * messages are retained, but the range of offsets is greater than Int.MaxValue. A group should not
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.
浙ICP备2022010568号-2