Projects
Eulaceura:Factory
rocketmq
_service:obs_scm:patch018-backport-enhancement-...
Sign Up
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File _service:obs_scm:patch018-backport-enhancement-of-tiered-storage.patch of Package rocketmq
From 1a8e7cb17cb29ed33b0196b52e452a6e76ade781 Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Tue, 12 Sep 2023 19:33:41 +0800 Subject: [PATCH 1/5] [ISSUE #7345] Fix wrong result of searchOffset in tiered storage --- .../tieredstore/file/TieredFlatFile.java | 5 +- .../tieredstore/file/TieredFlatFileTest.java | 46 +++++++++++++++++-- 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java index 426c4e09d..d973179ee 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java @@ -365,7 +365,10 @@ public class TieredFlatFile { if (!segmentList.isEmpty()) { return boundaryType == BoundaryType.UPPER ? segmentList.get(0) : segmentList.get(segmentList.size() - 1); } - return fileSegmentList.isEmpty() ? null : fileSegmentList.get(fileSegmentList.size() - 1); + if (fileSegmentList.isEmpty()) { + return null; + } + return boundaryType == BoundaryType.UPPER ? fileSegmentList.get(fileSegmentList.size() - 1) : fileSegmentList.get(0); } finally { fileSegmentLock.readLock().unlock(); } diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java index 7a4d05969..7e2fbf201 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java @@ -16,10 +16,7 @@ */ package org.apache.rocketmq.tieredstore.file; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; +import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.tieredstore.TieredStoreTestUtil; import org.apache.rocketmq.tieredstore.common.FileSegmentType; @@ -35,6 +32,11 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + public class TieredFlatFileTest { private final String storePath = TieredStoreTestUtil.getRandomStorePath(); @@ -301,4 +303,40 @@ public class TieredFlatFileTest { fileQueue.rollingNewFile(); Assert.assertEquals(2, fileQueue.getFileSegmentCount()); } + + @Test + public void testGetFileByTime() { + String filePath = TieredStoreUtil.toPath(queue); + TieredFlatFile tieredFlatFile = fileQueueFactory.createFlatFileForCommitLog(filePath); + TieredFileSegment fileSegment1 = new MemoryFileSegment(FileSegmentType.CONSUME_QUEUE, queue, 1100, storeConfig); + fileSegment1.setMinTimestamp(100); + fileSegment1.setMaxTimestamp(200); + + TieredFileSegment fileSegment2 = new MemoryFileSegment(FileSegmentType.CONSUME_QUEUE, queue, 1100, storeConfig); + fileSegment2.setMinTimestamp(200); + fileSegment2.setMaxTimestamp(300); + + tieredFlatFile.getFileSegmentList().add(fileSegment1); + tieredFlatFile.getFileSegmentList().add(fileSegment2); + + TieredFileSegment segmentUpper = tieredFlatFile.getFileByTime(400, BoundaryType.UPPER); + Assert.assertEquals(fileSegment2, segmentUpper); + + TieredFileSegment segmentLower = tieredFlatFile.getFileByTime(400, BoundaryType.LOWER); + Assert.assertEquals(fileSegment2, segmentLower); + + + TieredFileSegment segmentUpper2 = tieredFlatFile.getFileByTime(0, BoundaryType.UPPER); + Assert.assertEquals(fileSegment1, segmentUpper2); + + TieredFileSegment segmentLower2 = tieredFlatFile.getFileByTime(0, BoundaryType.LOWER); + Assert.assertEquals(fileSegment1, segmentLower2); + + + TieredFileSegment segmentUpper3 = tieredFlatFile.getFileByTime(200, BoundaryType.UPPER); + Assert.assertEquals(fileSegment1, segmentUpper3); + + TieredFileSegment segmentLower3 = tieredFlatFile.getFileByTime(200, BoundaryType.LOWER); + Assert.assertEquals(fileSegment2, segmentLower3); + } } -- 2.32.0.windows.2 From fd32dae2ab59f86dd215eeec405bf4fa6212bcb3 Mon Sep 17 00:00:00 2001 From: lizhimins <707364882@qq.com> Date: Tue, 12 Sep 2023 19:58:08 +0800 Subject: [PATCH 2/5] [ISSUE #6633] Not clear uninitialized files and fix metadata recover (#7342) --- .../tieredstore/file/TieredFlatFile.java | 53 +++++++------------ .../file/TieredFlatFileManager.java | 10 ++-- 2 files changed, 22 insertions(+), 41 deletions(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java index d973179ee..d96eb6e8f 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.tieredstore.file; -import com.alibaba.fastjson.JSON; import com.google.common.annotations.VisibleForTesting; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -25,13 +24,13 @@ import java.util.Comparator; import java.util.HashSet; import java.util.LinkedList; import java.util.List; -import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import javax.annotation.Nullable; +import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.tieredstore.common.AppendResult; @@ -43,7 +42,6 @@ import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore; import org.apache.rocketmq.tieredstore.provider.FileSegmentAllocator; import org.apache.rocketmq.tieredstore.provider.TieredFileSegment; import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; -import org.apache.rocketmq.common.BoundaryType; public class TieredFlatFile { @@ -177,7 +175,10 @@ public class TieredFlatFile { } } - private FileSegmentMetadata getOrCreateFileSegmentMetadata(TieredFileSegment fileSegment) { + /** + * FileQueue Status: Sealed | Sealed | Sealed | Not sealed, Allow appended && Not Full + */ + public void updateFileSegment(TieredFileSegment fileSegment) { FileSegmentMetadata metadata = tieredMetadataStore.getFileSegment( this.filePath, fileSegment.getFileType(), fileSegment.getBaseOffset()); @@ -186,45 +187,24 @@ public class TieredFlatFile { if (metadata == null) { metadata = new FileSegmentMetadata( this.filePath, fileSegment.getBaseOffset(), fileSegment.getFileType().getType()); - metadata.setCreateTimestamp(fileSegment.getMinTimestamp()); - metadata.setBeginTimestamp(fileSegment.getMinTimestamp()); - metadata.setEndTimestamp(fileSegment.getMaxTimestamp()); - if (fileSegment.isClosed()) { - metadata.setStatus(FileSegmentMetadata.STATUS_DELETED); - } - this.tieredMetadataStore.updateFileSegment(metadata); + metadata.setCreateTimestamp(System.currentTimeMillis()); } - return metadata; - } - - /** - * FileQueue Status: Sealed | Sealed | Sealed | Not sealed, Allow appended && Not Full - */ - public void updateFileSegment(TieredFileSegment fileSegment) { - FileSegmentMetadata segmentMetadata = getOrCreateFileSegmentMetadata(fileSegment); - if (segmentMetadata.getStatus() == FileSegmentMetadata.STATUS_NEW - && fileSegment.isFull() - && !fileSegment.needCommit()) { + metadata.setSize(fileSegment.getCommitPosition()); + metadata.setBeginTimestamp(fileSegment.getMinTimestamp()); + metadata.setEndTimestamp(fileSegment.getMaxTimestamp()); - segmentMetadata.markSealed(); + if (fileSegment.isFull() && !fileSegment.needCommit()) { + if (metadata.getStatus() == FileSegmentMetadata.STATUS_NEW) { + metadata.markSealed(); + } } if (fileSegment.isClosed()) { - segmentMetadata.setStatus(FileSegmentMetadata.STATUS_DELETED); + metadata.setStatus(FileSegmentMetadata.STATUS_DELETED); } - segmentMetadata.setSize(fileSegment.getCommitPosition()); - segmentMetadata.setEndTimestamp(fileSegment.getMaxTimestamp()); - - FileSegmentMetadata metadata = tieredMetadataStore.getFileSegment( - this.filePath, fileSegment.getFileType(), fileSegment.getBaseOffset()); - - if (!Objects.equals(metadata, segmentMetadata)) { - this.tieredMetadataStore.updateFileSegment(segmentMetadata); - logger.debug("TieredFlatFile#UpdateSegmentMetadata, filePath: {}, content: {}", - segmentMetadata.getPath(), JSON.toJSONString(segmentMetadata)); - } + this.tieredMetadataStore.updateFileSegment(metadata); } private void checkAndFixFileSize() { @@ -598,6 +578,9 @@ public class TieredFlatFile { logger.error("TieredFlatFile#destroy: mark file segment: {} is deleted failed", fileSegment.getPath(), e); } fileSegment.destroyFile(); + if (!fileSegment.exists()) { + tieredMetadataStore.deleteFileSegment(filePath, fileType, fileSegment.getBaseOffset()); + } } fileSegmentList.clear(); needCommitFileSegmentList.clear(); diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java index 7c744af3b..087ea8c9c 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java @@ -136,15 +136,13 @@ public class TieredFlatFileManager { TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime()); for (CompositeQueueFlatFile flatFile : deepCopyFlatFileToList()) { TieredStoreExecutor.cleanExpiredFileExecutor.submit(() -> { - flatFile.getCompositeFlatFileLock().lock(); try { + flatFile.getCompositeFlatFileLock().lock(); flatFile.cleanExpiredFile(expiredTimeStamp); flatFile.destroyExpiredFile(); - if (flatFile.getConsumeQueueBaseOffset() == -1) { - logger.info("Clean flatFile because file not initialized, topic={}, queueId={}", - flatFile.getMessageQueue().getTopic(), flatFile.getMessageQueue().getQueueId()); - destroyCompositeFile(flatFile.getMessageQueue()); - } + } catch (Throwable t) { + logger.error("Do Clean expired file error, topic={}, queueId={}", + flatFile.getMessageQueue().getTopic(), flatFile.getMessageQueue().getQueueId(), t); } finally { flatFile.getCompositeFlatFileLock().unlock(); } -- 2.32.0.windows.2 From 4a8e0d5b851d1f9573cda79b7d2e42ee498809da Mon Sep 17 00:00:00 2001 From: guyinyou <36399867+guyinyou@users.noreply.github.com> Date: Wed, 13 Sep 2023 16:08:03 +0800 Subject: [PATCH 3/5] [ISSUE #7351] Allow mqadmin to operate slave nodes Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com> --- .../processor/AdminBrokerProcessor.java | 12 -- .../processor/AdminBrokerProcessorTest.java | 106 ------------------ 2 files changed, 118 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 8fbcd3c94..9e48431be 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -406,9 +406,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); - if (validateSlave(response)) { - return response; - } final CreateTopicRequestHeader requestHeader = (CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class); @@ -519,9 +516,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); - if (validateSlave(response)) { - return response; - } DeleteTopicRequestHeader requestHeader = (DeleteTopicRequestHeader) request.decodeCommandCustomHeader(DeleteTopicRequestHeader.class); @@ -1413,9 +1407,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); - if (validateSlave(response)) { - return response; - } LOGGER.info("AdminBrokerProcessor#updateAndCreateSubscriptionGroup called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); @@ -1480,9 +1471,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { private RemotingCommand deleteSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); - if (validateSlave(response)) { - return response; - } DeleteSubscriptionGroupRequestHeader requestHeader = (DeleteSubscriptionGroupRequestHeader) request.decodeCommandCustomHeader(DeleteSubscriptionGroupRequestHeader.class); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java index 9d17011b6..ec252cece 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java @@ -76,7 +76,6 @@ import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfi import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.SelectMappedBufferResult; -import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.logfile.DefaultMappedFile; import org.apache.rocketmq.store.stats.BrokerStats; @@ -250,32 +249,6 @@ public class AdminBrokerProcessorTest { assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } - @Test - public void testUpdateAndCreateTopicOnSlaveInRocksdb() throws Exception { - if (notToBeExecuted()) { - return; - } - initRocksdbTopicManager(); - testUpdateAndCreateTopicOnSlave(); - } - - @Test - public void testUpdateAndCreateTopicOnSlave() throws Exception { - // setup - MessageStoreConfig messageStoreConfig = mock(MessageStoreConfig.class); - when(messageStoreConfig.getBrokerRole()).thenReturn(BrokerRole.SLAVE); - defaultMessageStore = mock(DefaultMessageStore.class); - when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig); - - // test on slave - String topic = "TEST_CREATE_TOPIC"; - RemotingCommand request = buildCreateTopicRequest(topic); - RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); - assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); - assertThat(response.getRemark()).isEqualTo("Can't modify topic or subscription group from slave broker, " + - "please execute it from master broker."); - } - @Test public void testDeleteTopicInRocksdb() throws Exception { if (notToBeExecuted()) { @@ -301,31 +274,6 @@ public class AdminBrokerProcessorTest { assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } - @Test - public void testDeleteTopicOnSlaveInRocksdb() throws Exception { - if (notToBeExecuted()) { - return; - } - initRocksdbTopicManager(); - testDeleteTopicOnSlave(); - } - - @Test - public void testDeleteTopicOnSlave() throws Exception { - // setup - MessageStoreConfig messageStoreConfig = mock(MessageStoreConfig.class); - when(messageStoreConfig.getBrokerRole()).thenReturn(BrokerRole.SLAVE); - defaultMessageStore = mock(DefaultMessageStore.class); - when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig); - - String topic = "TEST_DELETE_TOPIC"; - RemotingCommand request = buildDeleteTopicRequest(topic); - RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); - assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); - assertThat(response.getRemark()).isEqualTo("Can't modify topic or subscription group from slave broker, " + - "please execute it from master broker."); - } - @Test public void testDeleteWithPopRetryTopic() throws Exception { String topic = "topicA"; @@ -538,36 +486,6 @@ public class AdminBrokerProcessorTest { assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } - @Test - public void testUpdateAndCreateSubscriptionGroupOnSlaveInRocksdb() throws Exception { - initRocksdbSubscriptionManager(); - testUpdateAndCreateSubscriptionGroupOnSlave(); - } - - @Test - public void testUpdateAndCreateSubscriptionGroupOnSlave() throws RemotingCommandException { - // Setup - MessageStoreConfig messageStoreConfig = mock(MessageStoreConfig.class); - when(messageStoreConfig.getBrokerRole()).thenReturn(BrokerRole.SLAVE); - defaultMessageStore = mock(DefaultMessageStore.class); - when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig); - - // Test - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null); - SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); - subscriptionGroupConfig.setBrokerId(1); - subscriptionGroupConfig.setGroupName("groupId"); - subscriptionGroupConfig.setConsumeEnable(Boolean.TRUE); - subscriptionGroupConfig.setConsumeBroadcastEnable(Boolean.TRUE); - subscriptionGroupConfig.setRetryMaxTimes(111); - subscriptionGroupConfig.setConsumeFromMinEnable(Boolean.TRUE); - request.setBody(JSON.toJSON(subscriptionGroupConfig).toString().getBytes()); - RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); - assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); - assertThat(response.getRemark()).isEqualTo("Can't modify topic or subscription group from slave broker, " + - "please execute it from master broker."); - } - @Test public void testGetAllSubscriptionGroupInRocksdb() throws Exception { initRocksdbSubscriptionManager(); @@ -596,30 +514,6 @@ public class AdminBrokerProcessorTest { assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } - @Test - public void testDeleteSubscriptionGroupOnSlaveInRocksdb() throws Exception { - initRocksdbSubscriptionManager(); - testDeleteSubscriptionGroupOnSlave(); - } - - @Test - public void testDeleteSubscriptionGroupOnSlave() throws RemotingCommandException { - // Setup - MessageStoreConfig messageStoreConfig = mock(MessageStoreConfig.class); - when(messageStoreConfig.getBrokerRole()).thenReturn(BrokerRole.SLAVE); - defaultMessageStore = mock(DefaultMessageStore.class); - when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig); - - // Test - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP, null); - request.addExtField("groupName", "GID-Group-Name"); - request.addExtField("removeOffset", "true"); - RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); - assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); - assertThat(response.getRemark()).isEqualTo("Can't modify topic or subscription group from slave broker, " + - "please execute it from master broker."); - } - @Test public void testGetTopicStatsInfo() throws RemotingCommandException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_STATS_INFO, null); -- 2.32.0.windows.2 From 831fcc76cd7cd362bb6c136c287c624bb7eaf40a Mon Sep 17 00:00:00 2001 From: lizhimins <707364882@qq.com> Date: Tue, 19 Sep 2023 10:04:04 +0800 Subject: [PATCH 4/5] [ISSUE #7363] Fix get message from tiered storage return incorrect next pull offset (#7365) --- .../tieredstore/TieredMessageFetcher.java | 2 +- .../tieredstore/TieredMessageStore.java | 29 ++++++++++--------- .../tieredstore/TieredMessageStoreTest.java | 5 ++-- 3 files changed, 20 insertions(+), 16 deletions(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java index 766ff64f6..c948fa3fa 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java @@ -319,7 +319,7 @@ public class TieredMessageFetcher implements MessageStoreFetcher { } // if cache is miss, immediately pull messages - LOGGER.warn("TieredMessageFetcher#getMessageFromCacheAsync: cache miss: " + + LOGGER.info("TieredMessageFetcher#getMessageFromCacheAsync: cache miss: " + "topic: {}, queue: {}, queue offset: {}, max message num: {}", mq.getTopic(), mq.getQueueId(), queueOffset, maxCount); diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java index 9fb1b2f01..d7d13d61e 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java @@ -147,6 +147,11 @@ public class TieredMessageStore extends AbstractPluginMessageStore { public CompletableFuture<GetMessageResult> getMessageAsync(String group, String topic, int queueId, long offset, int maxMsgNums, MessageFilter messageFilter) { + // For system topic, force reading from local store + if (TieredStoreUtil.isSystemTopic(topic) || PopAckConstants.isStartWithRevivePrefix(topic)) { + return next.getMessageAsync(group, topic, queueId, offset, maxMsgNums, messageFilter); + } + if (fetchFromCurrentStore(topic, queueId, offset, maxMsgNums)) { logger.trace("GetMessageAsync from current store, topic: {}, queue: {}, offset: {}", topic, queueId, offset); } else { @@ -158,6 +163,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore { return fetcher .getMessageAsync(group, topic, queueId, offset, maxMsgNums, messageFilter) .thenApply(result -> { + Attributes latencyAttributes = TieredStoreMetricsManager.newAttributesBuilder() .put(TieredStoreMetricsConstant.LABEL_OPERATION, TieredStoreMetricsConstant.OPERATION_API_GET_MESSAGE) .put(TieredStoreMetricsConstant.LABEL_TOPIC, topic) @@ -166,8 +172,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore { TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), latencyAttributes); if (result.getStatus() == GetMessageStatus.OFFSET_FOUND_NULL || - result.getStatus() == GetMessageStatus.OFFSET_OVERFLOW_ONE || - result.getStatus() == GetMessageStatus.OFFSET_OVERFLOW_BADLY) { + result.getStatus() == GetMessageStatus.NO_MATCHED_LOGIC_QUEUE) { if (next.checkInStoreByConsumeOffset(topic, queueId, offset)) { TieredStoreMetricsManager.fallbackTotal.add(1, latencyAttributes); @@ -178,14 +183,8 @@ public class TieredMessageStore extends AbstractPluginMessageStore { } } - // Fetch system topic data from the broker when using the force level. - if (result.getStatus() == GetMessageStatus.NO_MATCHED_LOGIC_QUEUE) { - if (TieredStoreUtil.isSystemTopic(topic) || PopAckConstants.isStartWithRevivePrefix(topic)) { - return next.getMessage(group, topic, queueId, offset, maxMsgNums, messageFilter); - } - } - if (result.getStatus() != GetMessageStatus.FOUND && + result.getStatus() != GetMessageStatus.NO_MATCHED_LOGIC_QUEUE && result.getStatus() != GetMessageStatus.OFFSET_OVERFLOW_ONE && result.getStatus() != GetMessageStatus.OFFSET_OVERFLOW_BADLY) { logger.warn("GetMessageAsync not found and message is not in next store, result: {}, " + @@ -206,10 +205,14 @@ public class TieredMessageStore extends AbstractPluginMessageStore { if (minOffsetInQueue >= 0 && minOffsetInQueue < result.getMinOffset()) { result.setMinOffset(minOffsetInQueue); } - long maxOffsetInQueue = next.getMaxOffsetInQueue(topic, queueId); - if (maxOffsetInQueue >= 0 && maxOffsetInQueue > result.getMaxOffset()) { - result.setMaxOffset(maxOffsetInQueue); - } + + // In general, the local cq offset is slightly greater than the commit offset in read message, + // so there is no need to update the maximum offset to the local cq offset here, + // otherwise it will cause repeated consumption after next begin offset over commit offset. + + logger.trace("GetMessageAsync result, group: {}, topic: {}, queueId: {}, offset: {}, count:{}, {}", + group, topic, queueId, offset, maxMsgNums, result); + return result; }).exceptionally(e -> { logger.error("GetMessageAsync from tiered store failed", e); diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java index 2451199c2..07af1fc8b 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java @@ -168,7 +168,7 @@ public class TieredMessageStoreTest { GetMessageResult result1 = new GetMessageResult(); result1.setStatus(GetMessageStatus.FOUND); GetMessageResult result2 = new GetMessageResult(); - result2.setStatus(GetMessageStatus.MESSAGE_WAS_REMOVING); + result2.setStatus(GetMessageStatus.OFFSET_OVERFLOW_BADLY); when(fetcher.getMessageAsync(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any())).thenReturn(CompletableFuture.completedFuture(result1)); when(nextStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any())).thenReturn(result2); @@ -188,7 +188,8 @@ public class TieredMessageStoreTest { properties.setProperty("tieredStorageLevel", "3"); configuration.update(properties); when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(), anyLong())).thenReturn(true); - Assert.assertSame(result2, store.getMessage("group", mq.getTopic(), mq.getQueueId(), 0, 0, null)); + Assert.assertEquals(result2.getStatus(), + store.getMessage("group", mq.getTopic(), mq.getQueueId(), 0, 0, null).getStatus()); } @Test -- 2.32.0.windows.2 From f05a8da760dfade411ad56ef874f477988479cf9 Mon Sep 17 00:00:00 2001 From: rongtong <jinrongtong5@163.com> Date: Wed, 20 Sep 2023 15:06:21 +0800 Subject: [PATCH 5/5] Print admin queue watermark in log (#7372) --- .../main/java/org/apache/rocketmq/broker/BrokerController.java | 1 + 1 file changed, 1 insertion(+) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 13a3feb4e..53e2e1b62 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -1182,6 +1182,7 @@ public class BrokerController { LOG_WATER_MARK.info("[WATERMARK] ClientManager Queue Size: {} SlowTimeMills: {}", this.clientManagerThreadPoolQueue.size(), this.headSlowTimeMills(this.clientManagerThreadPoolQueue)); LOG_WATER_MARK.info("[WATERMARK] Heartbeat Queue Size: {} SlowTimeMills: {}", this.heartbeatThreadPoolQueue.size(), this.headSlowTimeMills(this.heartbeatThreadPoolQueue)); LOG_WATER_MARK.info("[WATERMARK] Ack Queue Size: {} SlowTimeMills: {}", this.ackThreadPoolQueue.size(), headSlowTimeMills(this.ackThreadPoolQueue)); + LOG_WATER_MARK.info("[WATERMARK] Admin Queue Size: {} SlowTimeMills: {}", this.adminBrokerThreadPoolQueue.size(), headSlowTimeMills(this.adminBrokerThreadPoolQueue)); } public MessageStore getMessageStore() { -- 2.32.0.windows.2
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.
浙ICP备2022010568号-2