Projects
Eulaceura:Factory
rocketmq
_service:obs_scm:patch010-backport-add-some-fix...
Sign Up
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File _service:obs_scm:patch010-backport-add-some-fixes.patch of Package rocketmq
From b2deef179dbc6a9eb1a2b6dd7b652d95cb768295 Mon Sep 17 00:00:00 2001 From: lizhimins <707364882@qq.com> Date: Thu, 10 Aug 2023 10:38:47 +0800 Subject: [PATCH 01/12] [ISSUE #7144] Accelerate the recovery speed of the tiered storage module (#7145) --- .../tieredstore/TieredDispatcher.java | 3 + .../tieredstore/TieredMessageStore.java | 2 +- .../common/TieredStoreExecutor.java | 25 ++-- .../tieredstore/file/CompositeFlatFile.java | 15 +- .../file/CompositeQueueFlatFile.java | 20 ++- .../tieredstore/file/TieredCommitLog.java | 24 +++- .../tieredstore/file/TieredFlatFile.java | 42 +++--- .../file/TieredFlatFileManager.java | 135 ++++++++++-------- .../metadata/FileSegmentMetadata.java | 26 +++- .../tieredstore/TieredDispatcherTest.java | 15 +- .../tieredstore/TieredMessageFetcherTest.java | 2 +- .../file/CompositeQueueFlatFileTest.java | 2 +- .../file/TieredFlatFileManagerTest.java | 7 +- 13 files changed, 194 insertions(+), 124 deletions(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java index bb58ea7dd..1746190cd 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java @@ -279,6 +279,9 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch long upperBound = Math.min(dispatchOffset + maxCount, maxOffsetInQueue); ConsumeQueue consumeQueue = (ConsumeQueue) defaultStore.getConsumeQueue(topic, queueId); + logger.debug("DispatchFlatFile race, topic={}, queueId={}, cq range={}-{}, dispatch offset={}-{}", + topic, queueId, minOffsetInQueue, maxOffsetInQueue, dispatchOffset, upperBound - 1); + for (; dispatchOffset < upperBound; dispatchOffset++) { // get consume queue SelectMappedBufferResult cqItem = consumeQueue.getIndexBuffer(dispatchOffset); diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java index 1f12410f2..ced1fb818 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java @@ -147,7 +147,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore { int queueId, long offset, int maxMsgNums, MessageFilter messageFilter) { if (!viaTieredStorage(topic, queueId, offset, maxMsgNums)) { - logger.debug("GetMessageAsync from next store topic: {}, queue: {}, offset: {}", topic, queueId, offset); + logger.trace("GetMessageAsync from next store topic: {}, queue: {}, offset: {}", topic, queueId, offset); return next.getMessageAsync(group, topic, queueId, offset, maxMsgNums, messageFilter); } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java index 6eb3478b3..6dd0e8846 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java @@ -43,18 +43,9 @@ public class TieredStoreExecutor { public static ExecutorService compactIndexFileExecutor; public static void init() { - dispatchThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY); - dispatchExecutor = new ThreadPoolExecutor( - Math.max(2, Runtime.getRuntime().availableProcessors()), - Math.max(16, Runtime.getRuntime().availableProcessors() * 4), - 1000 * 60, - TimeUnit.MILLISECONDS, - dispatchThreadPoolQueue, - new ThreadFactoryImpl("TieredCommonExecutor_")); - commonScheduledExecutor = new ScheduledThreadPoolExecutor( Math.max(4, Runtime.getRuntime().availableProcessors()), - new ThreadFactoryImpl("TieredCommonScheduledExecutor_")); + new ThreadFactoryImpl("TieredCommonExecutor_")); commitExecutor = new ScheduledThreadPoolExecutor( Math.max(16, Runtime.getRuntime().availableProcessors() * 4), @@ -62,7 +53,17 @@ public class TieredStoreExecutor { cleanExpiredFileExecutor = new ScheduledThreadPoolExecutor( Math.max(4, Runtime.getRuntime().availableProcessors()), - new ThreadFactoryImpl("TieredCleanExpiredFileExecutor_")); + new ThreadFactoryImpl("TieredCleanFileExecutor_")); + + dispatchThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY); + dispatchExecutor = new ThreadPoolExecutor( + Math.max(2, Runtime.getRuntime().availableProcessors()), + Math.max(16, Runtime.getRuntime().availableProcessors() * 4), + 1000 * 60, + TimeUnit.MILLISECONDS, + dispatchThreadPoolQueue, + new ThreadFactoryImpl("TieredDispatchExecutor_"), + new ThreadPoolExecutor.DiscardOldestPolicy()); fetchDataThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY); fetchDataExecutor = new ThreadPoolExecutor( @@ -71,7 +72,7 @@ public class TieredStoreExecutor { 1000 * 60, TimeUnit.MILLISECONDS, fetchDataThreadPoolQueue, - new ThreadFactoryImpl("TieredFetchDataExecutor_")); + new ThreadFactoryImpl("TieredFetchExecutor_")); compactIndexFileThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY); compactIndexFileExecutor = new ThreadPoolExecutor( diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java index df4baf33f..5ad3a6ff3 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java @@ -76,20 +76,15 @@ public class CompositeFlatFile implements CompositeAccess { this.storeConfig = fileQueueFactory.getStoreConfig(); this.readAheadFactor = this.storeConfig.getReadAheadMinFactor(); this.metadataStore = TieredStoreUtil.getMetadataStore(this.storeConfig); - this.dispatchOffset = new AtomicLong(); this.compositeFlatFileLock = new ReentrantLock(); this.inFlightRequestMap = new ConcurrentHashMap<>(); this.commitLog = new TieredCommitLog(fileQueueFactory, filePath); this.consumeQueue = new TieredConsumeQueue(fileQueueFactory, filePath); + this.dispatchOffset = new AtomicLong( + this.consumeQueue.isInitialized() ? this.getConsumeQueueCommitOffset() : -1L); this.groupOffsetCache = this.initOffsetCache(); } - protected void recoverMetadata() { - if (!consumeQueue.isInitialized() && this.dispatchOffset.get() != -1) { - consumeQueue.setBaseOffset(this.dispatchOffset.get() * TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE); - } - } - private Cache<String, Long> initOffsetCache() { return Caffeine.newBuilder() .expireAfterWrite(2, TimeUnit.MINUTES) @@ -310,10 +305,12 @@ public class CompositeFlatFile implements CompositeAccess { @Override public void initOffset(long offset) { - if (!consumeQueue.isInitialized()) { + if (consumeQueue.isInitialized()) { + dispatchOffset.set(this.getConsumeQueueCommitOffset()); + } else { consumeQueue.setBaseOffset(offset * TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE); + dispatchOffset.set(offset); } - dispatchOffset.set(offset); } @Override diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java index f6c0afed0..0a797f465 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java @@ -36,8 +36,7 @@ public class CompositeQueueFlatFile extends CompositeFlatFile { public CompositeQueueFlatFile(TieredFileAllocator fileQueueFactory, MessageQueue messageQueue) { super(fileQueueFactory, TieredStoreUtil.toPath(messageQueue)); this.messageQueue = messageQueue; - this.recoverTopicMetadata(); - super.recoverMetadata(); + this.recoverQueueMetadata(); this.indexFile = TieredFlatFileManager.getIndexFile(storeConfig); } @@ -46,11 +45,12 @@ public class CompositeQueueFlatFile extends CompositeFlatFile { if (!consumeQueue.isInitialized()) { queueMetadata.setMinOffset(offset); queueMetadata.setMaxOffset(offset); + metadataStore.updateQueue(queueMetadata); } super.initOffset(offset); } - public void recoverTopicMetadata() { + public void recoverQueueMetadata() { TopicMetadata topicMetadata = this.metadataStore.getTopic(messageQueue.getTopic()); if (topicMetadata == null) { topicMetadata = this.metadataStore.addTopic(messageQueue.getTopic(), -1L); @@ -64,18 +64,16 @@ public class CompositeQueueFlatFile extends CompositeFlatFile { if (queueMetadata.getMaxOffset() < queueMetadata.getMinOffset()) { queueMetadata.setMaxOffset(queueMetadata.getMinOffset()); } - this.dispatchOffset.set(queueMetadata.getMaxOffset()); } - public void persistMetadata() { + public void flushMetadata() { try { - if (consumeQueue.getCommitOffset() < queueMetadata.getMinOffset()) { - return; - } - queueMetadata.setMaxOffset(consumeQueue.getCommitOffset() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE); + queueMetadata.setMinOffset(super.getConsumeQueueMinOffset()); + queueMetadata.setMaxOffset(super.getConsumeQueueMaxOffset()); metadataStore.updateQueue(queueMetadata); } catch (Exception e) { - LOGGER.error("CompositeFlatFile#flushMetadata: update queue metadata failed: topic: {}, queue: {}", messageQueue.getTopic(), messageQueue.getQueueId(), e); + LOGGER.error("CompositeFlatFile#flushMetadata error, topic: {}, queue: {}", + messageQueue.getTopic(), messageQueue.getQueueId(), e); } } @@ -114,7 +112,7 @@ public class CompositeQueueFlatFile extends CompositeFlatFile { @Override public void shutdown() { super.shutdown(); - metadataStore.updateQueue(queueMetadata); + this.flushMetadata(); } @Override diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java index 80e1bce50..0e5f79132 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java @@ -50,7 +50,7 @@ public class TieredCommitLog { this.storeConfig = fileQueueFactory.getStoreConfig(); this.flatFile = fileQueueFactory.createFlatFileForCommitLog(filePath); this.minConsumeQueueOffset = new AtomicLong(NOT_EXIST_MIN_OFFSET); - this.correctMinOffset(); + this.correctMinOffsetAsync(); } @VisibleForTesting @@ -91,17 +91,26 @@ public class TieredCommitLog { return flatFile.getFileToWrite().getMaxTimestamp(); } - public synchronized long correctMinOffset() { + public long correctMinOffset() { + try { + return correctMinOffsetAsync().get(); + } catch (Exception e) { + log.error("Correct min offset failed in clean expired file", e); + } + return NOT_EXIST_MIN_OFFSET; + } + + public synchronized CompletableFuture<Long> correctMinOffsetAsync() { if (flatFile.getFileSegmentCount() == 0) { this.minConsumeQueueOffset.set(NOT_EXIST_MIN_OFFSET); - return NOT_EXIST_MIN_OFFSET; + return CompletableFuture.completedFuture(NOT_EXIST_MIN_OFFSET); } // queue offset field length is 8 int length = MessageBufferUtil.QUEUE_OFFSET_POSITION + 8; if (flatFile.getCommitOffset() - flatFile.getMinOffset() < length) { this.minConsumeQueueOffset.set(NOT_EXIST_MIN_OFFSET); - return NOT_EXIST_MIN_OFFSET; + return CompletableFuture.completedFuture(NOT_EXIST_MIN_OFFSET); } try { @@ -109,7 +118,8 @@ public class TieredCommitLog { .thenApply(buffer -> { long offset = MessageBufferUtil.getQueueOffset(buffer); minConsumeQueueOffset.set(offset); - log.info("Correct commitlog min cq offset success, filePath={}, min cq offset={}, range={}-{}", + log.debug("Correct commitlog min cq offset success, " + + "filePath={}, min cq offset={}, commitlog range={}-{}", flatFile.getFilePath(), offset, flatFile.getMinOffset(), flatFile.getCommitOffset()); return offset; }) @@ -117,11 +127,11 @@ public class TieredCommitLog { log.warn("Correct commitlog min cq offset error, filePath={}, range={}-{}", flatFile.getFilePath(), flatFile.getMinOffset(), flatFile.getCommitOffset(), throwable); return minConsumeQueueOffset.get(); - }).get(); + }); } catch (Exception e) { log.error("Correct commitlog min cq offset error, filePath={}", flatFile.getFilePath(), e); } - return minConsumeQueueOffset.get(); + return CompletableFuture.completedFuture(minConsumeQueueOffset.get()); } public AppendResult append(ByteBuffer byteBuf) { diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java index 75ce8d89f..426c4e09d 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.tieredstore.file; +import com.alibaba.fastjson.JSON; import com.google.common.annotations.VisibleForTesting; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -24,6 +25,7 @@ import java.util.Comparator; import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; @@ -178,32 +180,26 @@ public class TieredFlatFile { private FileSegmentMetadata getOrCreateFileSegmentMetadata(TieredFileSegment fileSegment) { FileSegmentMetadata metadata = tieredMetadataStore.getFileSegment( - fileSegment.getPath(), fileSegment.getFileType(), fileSegment.getBaseOffset()); - - if (metadata != null) { - return metadata; - } + this.filePath, fileSegment.getFileType(), fileSegment.getBaseOffset()); // Note: file segment path may not the same as file base path, use base path here. - metadata = new FileSegmentMetadata( - this.filePath, fileSegment.getBaseOffset(), fileSegment.getFileType().getType()); - - if (fileSegment.isClosed()) { - metadata.setStatus(FileSegmentMetadata.STATUS_DELETED); + if (metadata == null) { + metadata = new FileSegmentMetadata( + this.filePath, fileSegment.getBaseOffset(), fileSegment.getFileType().getType()); + metadata.setCreateTimestamp(fileSegment.getMinTimestamp()); + metadata.setBeginTimestamp(fileSegment.getMinTimestamp()); + metadata.setEndTimestamp(fileSegment.getMaxTimestamp()); + if (fileSegment.isClosed()) { + metadata.setStatus(FileSegmentMetadata.STATUS_DELETED); + } + this.tieredMetadataStore.updateFileSegment(metadata); } - - metadata.setBeginTimestamp(fileSegment.getMinTimestamp()); - metadata.setEndTimestamp(fileSegment.getMaxTimestamp()); - - // Submit to persist - this.tieredMetadataStore.updateFileSegment(metadata); return metadata; } /** * FileQueue Status: Sealed | Sealed | Sealed | Not sealed, Allow appended && Not Full */ - @VisibleForTesting public void updateFileSegment(TieredFileSegment fileSegment) { FileSegmentMetadata segmentMetadata = getOrCreateFileSegmentMetadata(fileSegment); @@ -219,9 +215,16 @@ public class TieredFlatFile { } segmentMetadata.setSize(fileSegment.getCommitPosition()); - segmentMetadata.setBeginTimestamp(fileSegment.getMinTimestamp()); segmentMetadata.setEndTimestamp(fileSegment.getMaxTimestamp()); - this.tieredMetadataStore.updateFileSegment(segmentMetadata); + + FileSegmentMetadata metadata = tieredMetadataStore.getFileSegment( + this.filePath, fileSegment.getFileType(), fileSegment.getBaseOffset()); + + if (!Objects.equals(metadata, segmentMetadata)) { + this.tieredMetadataStore.updateFileSegment(segmentMetadata); + logger.debug("TieredFlatFile#UpdateSegmentMetadata, filePath: {}, content: {}", + segmentMetadata.getPath(), JSON.toJSONString(segmentMetadata)); + } } private void checkAndFixFileSize() { @@ -257,6 +260,7 @@ public class TieredFlatFile { logger.warn("TieredFlatFile#checkAndFixFileSize: fix last file {} size: origin: {}, actual: {}", lastFile.getPath(), lastFile.getCommitOffset() - lastFile.getBaseOffset(), lastFileSize); lastFile.initPosition(lastFileSize); + this.updateFileSegment(lastFile); } } } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java index aeca44b8c..e9ae4a5a5 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java @@ -16,16 +16,19 @@ */ package org.apache.rocketmq.tieredstore.file; +import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.List; import java.util.Random; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; +import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; @@ -36,6 +39,7 @@ import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; public class TieredFlatFileManager { + private static final Logger BROKER_LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME); private static volatile TieredFlatFileManager instance; @@ -44,7 +48,7 @@ public class TieredFlatFileManager { private final TieredMetadataStore metadataStore; private final TieredMessageStoreConfig storeConfig; private final TieredFileAllocator tieredFileAllocator; - private final ConcurrentMap<MessageQueue, CompositeQueueFlatFile> queueFlatFileMap; + private final ConcurrentMap<MessageQueue, CompositeQueueFlatFile> flatFileConcurrentMap; public TieredFlatFileManager(TieredMessageStoreConfig storeConfig) throws ClassNotFoundException, NoSuchMethodException { @@ -52,23 +56,20 @@ public class TieredFlatFileManager { this.storeConfig = storeConfig; this.metadataStore = TieredStoreUtil.getMetadataStore(storeConfig); this.tieredFileAllocator = new TieredFileAllocator(storeConfig); - this.queueFlatFileMap = new ConcurrentHashMap<>(); + this.flatFileConcurrentMap = new ConcurrentHashMap<>(); this.doScheduleTask(); } public static TieredFlatFileManager getInstance(TieredMessageStoreConfig storeConfig) { - if (storeConfig == null) { + if (storeConfig == null || instance != null) { return instance; } - - if (instance == null) { - synchronized (TieredFlatFileManager.class) { - if (instance == null) { - try { - instance = new TieredFlatFileManager(storeConfig); - } catch (Exception e) { - logger.error("TieredFlatFileManager#getInstance: create flat file manager failed", e); - } + synchronized (TieredFlatFileManager.class) { + if (instance == null) { + try { + instance = new TieredFlatFileManager(storeConfig); + } catch (Exception e) { + logger.error("Construct FlatFileManager instance error", e); } } } @@ -88,7 +89,7 @@ public class TieredFlatFileManager { TieredStoreUtil.RMQ_SYS_TIERED_STORE_INDEX_TOPIC, storeConfig.getBrokerName(), 0)); indexFile = new TieredIndexFile(new TieredFileAllocator(storeConfig), filePath); } catch (Exception e) { - logger.error("TieredFlatFileManager#getIndexFile: create index file failed", e); + logger.error("Construct FlatFileManager indexFile error", e); } } } @@ -105,7 +106,7 @@ public class TieredFlatFileManager { flatFile.commitCommitLog(); } catch (Throwable e) { MessageQueue mq = flatFile.getMessageQueue(); - logger.error("commit commitLog periodically failed: topic: {}, queue: {}", + logger.error("Commit commitLog periodically failed: topic: {}, queue: {}", mq.getTopic(), mq.getQueueId(), e); } }, delay, TimeUnit.MILLISECONDS); @@ -114,7 +115,7 @@ public class TieredFlatFileManager { flatFile.commitConsumeQueue(); } catch (Throwable e) { MessageQueue mq = flatFile.getMessageQueue(); - logger.error("commit consumeQueue periodically failed: topic: {}, queue: {}", + logger.error("Commit consumeQueue periodically failed: topic: {}, queue: {}", mq.getTopic(), mq.getQueueId(), e); } }, delay, TimeUnit.MILLISECONDS); @@ -125,7 +126,7 @@ public class TieredFlatFileManager { indexFile.commit(true); } } catch (Throwable e) { - logger.error("commit indexFile periodically failed", e); + logger.error("Commit indexFile periodically failed", e); } }, 0, TimeUnit.MILLISECONDS); } @@ -160,7 +161,7 @@ public class TieredFlatFileManager { try { doCommit(); } catch (Throwable e) { - logger.error("commit flat file periodically failed: ", e); + logger.error("Commit flat file periodically failed: ", e); } }, 60, 60, TimeUnit.SECONDS); @@ -168,49 +169,73 @@ public class TieredFlatFileManager { try { doCleanExpiredFile(); } catch (Throwable e) { - logger.error("clean expired flat file failed: ", e); + logger.error("Clean expired flat file failed: ", e); } }, 30, 30, TimeUnit.SECONDS); } public boolean load() { + Stopwatch stopwatch = Stopwatch.createStarted(); try { - AtomicLong topicSequenceNumber = new AtomicLong(); - List<Future<?>> futureList = new ArrayList<>(); - queueFlatFileMap.clear(); - metadataStore.iterateTopic(topicMetadata -> { + flatFileConcurrentMap.clear(); + this.recoverSequenceNumber(); + this.recoverTieredFlatFile(); + logger.info("Message store recover end, total cost={}ms", stopwatch.elapsed(TimeUnit.MILLISECONDS)); + } catch (Exception e) { + long costTime = stopwatch.elapsed(TimeUnit.MILLISECONDS); + logger.info("Message store recover error, total cost={}ms", costTime); + BROKER_LOG.error("Message store recover error, total cost={}ms", costTime, e); + return false; + } + return true; + } + + public void recoverSequenceNumber() { + AtomicLong topicSequenceNumber = new AtomicLong(); + metadataStore.iterateTopic(topicMetadata -> { + if (topicMetadata != null && topicMetadata.getTopicId() > 0) { topicSequenceNumber.set(Math.max(topicSequenceNumber.get(), topicMetadata.getTopicId())); - Future<?> future = TieredStoreExecutor.dispatchExecutor.submit(() -> { - if (topicMetadata.getStatus() != 0) { - return; - } + } + }); + metadataStore.setTopicSequenceNumber(topicSequenceNumber.incrementAndGet()); + } + + public void recoverTieredFlatFile() { + Semaphore semaphore = new Semaphore((int) (TieredStoreExecutor.QUEUE_CAPACITY * 0.75)); + List<CompletableFuture<Void>> futures = new ArrayList<>(); + metadataStore.iterateTopic(topicMetadata -> { + try { + semaphore.acquire(); + CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { try { - metadataStore.iterateQueue(topicMetadata.getTopic(), - queueMetadata -> getOrCreateFlatFileIfAbsent( - new MessageQueue(topicMetadata.getTopic(), - storeConfig.getBrokerName(), - queueMetadata.getQueue().getQueueId()))); + Stopwatch subWatch = Stopwatch.createStarted(); + if (topicMetadata.getStatus() != 0) { + return; + } + AtomicLong queueCount = new AtomicLong(); + metadataStore.iterateQueue(topicMetadata.getTopic(), queueMetadata -> { + this.getOrCreateFlatFileIfAbsent(new MessageQueue(topicMetadata.getTopic(), + storeConfig.getBrokerName(), queueMetadata.getQueue().getQueueId())); + queueCount.incrementAndGet(); + }); + logger.info("Recover TopicFlatFile, topic: {}, queueCount: {}, cost: {}ms", + topicMetadata.getTopic(), queueCount.get(), subWatch.elapsed(TimeUnit.MILLISECONDS)); } catch (Exception e) { - logger.error("load mq composite flat file from metadata failed", e); + logger.error("Recover TopicFlatFile error, topic: {}", topicMetadata.getTopic(), e); + } finally { + semaphore.release(); } - }); - futureList.add(future); - }); - - // Wait for load all metadata done - for (Future<?> future : futureList) { - future.get(); + }, TieredStoreExecutor.commitExecutor); + futures.add(future); + } catch (Exception e) { + throw new RuntimeException(e); } - metadataStore.setTopicSequenceNumber(topicSequenceNumber.incrementAndGet()); - } catch (Exception e) { - logger.error("load mq composite flat file from metadata failed", e); - return false; - } - return true; + }); + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); } public void cleanup() { - queueFlatFileMap.clear(); + flatFileConcurrentMap.clear(); cleanStaticReference(); } @@ -221,27 +246,25 @@ public class TieredFlatFileManager { @Nullable public CompositeQueueFlatFile getOrCreateFlatFileIfAbsent(MessageQueue messageQueue) { - return queueFlatFileMap.computeIfAbsent(messageQueue, mq -> { + return flatFileConcurrentMap.computeIfAbsent(messageQueue, mq -> { try { - logger.debug("TieredFlatFileManager#getOrCreateFlatFileIfAbsent: " + - "try to create new flat file: topic: {}, queueId: {}", + logger.debug("Create new TopicFlatFile, topic: {}, queueId: {}", messageQueue.getTopic(), messageQueue.getQueueId()); return new CompositeQueueFlatFile(tieredFileAllocator, mq); } catch (Exception e) { - logger.error("TieredFlatFileManager#getOrCreateFlatFileIfAbsent: " + - "create new flat file: topic: {}, queueId: {}", + logger.debug("Create new TopicFlatFile failed, topic: {}, queueId: {}", messageQueue.getTopic(), messageQueue.getQueueId(), e); - return null; } + return null; }); } public CompositeQueueFlatFile getFlatFile(MessageQueue messageQueue) { - return queueFlatFileMap.get(messageQueue); + return flatFileConcurrentMap.get(messageQueue); } public ImmutableList<CompositeQueueFlatFile> deepCopyFlatFileToList() { - return ImmutableList.copyOf(queueFlatFileMap.values()); + return ImmutableList.copyOf(flatFileConcurrentMap.values()); } public void shutdown() { @@ -270,7 +293,7 @@ public class TieredFlatFileManager { } // delete memory reference - CompositeQueueFlatFile flatFile = queueFlatFileMap.remove(mq); + CompositeQueueFlatFile flatFile = flatFileConcurrentMap.remove(mq); if (flatFile != null) { MessageQueue messageQueue = flatFile.getMessageQueue(); logger.info("TieredFlatFileManager#destroyCompositeFile: " + diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/FileSegmentMetadata.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/FileSegmentMetadata.java index 1b232fc75..2f0fd71de 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/FileSegmentMetadata.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/FileSegmentMetadata.java @@ -16,6 +16,8 @@ */ package org.apache.rocketmq.tieredstore.metadata; +import java.util.Objects; + public class FileSegmentMetadata { public static final int STATUS_NEW = 0; @@ -43,7 +45,6 @@ public class FileSegmentMetadata { this.baseOffset = baseOffset; this.type = type; this.status = STATUS_NEW; - this.createTimestamp = System.currentTimeMillis(); } public void markSealed() { @@ -122,4 +123,27 @@ public class FileSegmentMetadata { public void setSealTimestamp(long sealTimestamp) { this.sealTimestamp = sealTimestamp; } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + FileSegmentMetadata metadata = (FileSegmentMetadata) o; + return size == metadata.size + && baseOffset == metadata.baseOffset + && status == metadata.status + && path.equals(metadata.path) + && type == metadata.type + && createTimestamp == metadata.createTimestamp + && beginTimestamp == metadata.beginTimestamp + && endTimestamp == metadata.endTimestamp + && sealTimestamp == metadata.sealTimestamp; + } + + @Override + public int hashCode() { + return Objects.hash(type, path, baseOffset, status, size, createTimestamp, beginTimestamp, endTimestamp, sealTimestamp); + } } diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java index e6adef1d1..5791dc9a4 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java @@ -116,19 +116,20 @@ public class TieredDispatcherTest { buffer3.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 9); flatFile.appendCommitLog(buffer3); flatFile.commitCommitLog(); - Assert.assertEquals(10, flatFile.getDispatchOffset()); + Assert.assertEquals(9 + 1, flatFile.getDispatchOffset()); + Assert.assertEquals(9, flatFile.getCommitLogDispatchCommitOffset()); dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, flatFile, 8, 8, 0, 0, buffer1); dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, flatFile, 9, 9, 0, 0, buffer2); dispatcher.buildConsumeQueueAndIndexFile(); Assert.assertEquals(7, flatFile.getConsumeQueueMaxOffset()); - Assert.assertEquals(7, flatFile.getDispatchOffset()); dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, flatFile, 7, 7, 0, 0, buffer1); dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, flatFile, 8, 8, 0, 0, buffer2); dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, flatFile, 9, 9, 0, 0, buffer3); dispatcher.buildConsumeQueueAndIndexFile(); - Assert.assertEquals(10, flatFile.getConsumeQueueMaxOffset()); + Assert.assertEquals(6, flatFile.getConsumeQueueMinOffset()); + Assert.assertEquals(9 + 1, flatFile.getConsumeQueueMaxOffset()); } @Test @@ -142,6 +143,7 @@ public class TieredDispatcherTest { Mockito.when(defaultStore.getMinOffsetInQueue(mq.getTopic(), mq.getQueueId())).thenReturn(0L); Mockito.when(defaultStore.getMaxOffsetInQueue(mq.getTopic(), mq.getQueueId())).thenReturn(9L); + // mock cq item, position = 7 ByteBuffer cqItem = ByteBuffer.allocate(ConsumeQueue.CQ_STORE_UNIT_SIZE); cqItem.putLong(7); cqItem.putInt(MessageBufferUtilTest.MSG_LEN); @@ -150,13 +152,13 @@ public class TieredDispatcherTest { SelectMappedBufferResult mockResult = new SelectMappedBufferResult(0, cqItem, ConsumeQueue.CQ_STORE_UNIT_SIZE, null); Mockito.when(((ConsumeQueue) defaultStore.getConsumeQueue(mq.getTopic(), mq.getQueueId())).getIndexBuffer(6)).thenReturn(mockResult); + // mock cq item, position = 8 cqItem = ByteBuffer.allocate(ConsumeQueue.CQ_STORE_UNIT_SIZE); cqItem.putLong(8); cqItem.putInt(MessageBufferUtilTest.MSG_LEN); cqItem.putLong(1); cqItem.flip(); mockResult = new SelectMappedBufferResult(0, cqItem, ConsumeQueue.CQ_STORE_UNIT_SIZE, null); - Mockito.when(((ConsumeQueue) defaultStore.getConsumeQueue(mq.getTopic(), mq.getQueueId())).getIndexBuffer(7)).thenReturn(mockResult); mockResult = new SelectMappedBufferResult(0, MessageBufferUtilTest.buildMockedMessageBuffer(), MessageBufferUtilTest.MSG_LEN, null); @@ -167,7 +169,10 @@ public class TieredDispatcherTest { mockResult = new SelectMappedBufferResult(0, msg, MessageBufferUtilTest.MSG_LEN, null); Mockito.when(defaultStore.selectOneMessageByOffset(8, MessageBufferUtilTest.MSG_LEN)).thenReturn(mockResult); - dispatcher.dispatchFlatFile(flatFileManager.getOrCreateFlatFileIfAbsent(mq)); + CompositeQueueFlatFile flatFile = flatFileManager.getOrCreateFlatFileIfAbsent(mq); + Assert.assertNotNull(flatFile); + flatFile.initOffset(7); + dispatcher.dispatchFlatFile(flatFile); Assert.assertEquals(8, flatFileManager.getFlatFile(mq).getDispatchOffset()); } } diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java index d75b2f916..774c6cf64 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java @@ -23,6 +23,7 @@ import java.util.Objects; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.SystemUtils; import org.apache.commons.lang3.tuple.Triple; +import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.store.DispatchRequest; import org.apache.rocketmq.store.GetMessageResult; @@ -40,7 +41,6 @@ import org.apache.rocketmq.tieredstore.file.TieredIndexFile; import org.apache.rocketmq.tieredstore.util.MessageBufferUtil; import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest; import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; -import org.apache.rocketmq.common.BoundaryType; import org.awaitility.Awaitility; import org.junit.After; import org.junit.Assert; diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java index 27efe111e..2e028ada3 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java @@ -119,7 +119,7 @@ public class CompositeQueueFlatFileTest { Assert.assertEquals(AppendResult.SUCCESS, result); file.commit(true); - file.persistMetadata(); + file.flushMetadata(); QueueMetadata queueMetadata = metadataStore.getQueue(mq); Assert.assertEquals(53, queueMetadata.getMaxOffset()); diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManagerTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManagerTest.java index b7528c5e4..20fe4dd70 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManagerTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManagerTest.java @@ -72,10 +72,15 @@ public class TieredFlatFileManagerTest { CompositeFlatFile flatFile = flatFileManager.getFlatFile(mq); Assert.assertNotNull(flatFile); - Assert.assertEquals(100, flatFile.getDispatchOffset()); + Assert.assertEquals(-1L, flatFile.getDispatchOffset()); + flatFile.initOffset(100L); + Assert.assertEquals(100L, flatFile.getDispatchOffset()); + flatFile.initOffset(200L); + Assert.assertEquals(100L, flatFile.getDispatchOffset()); CompositeFlatFile flatFile1 = flatFileManager.getFlatFile(mq1); Assert.assertNotNull(flatFile1); + flatFile1.initOffset(200L); Assert.assertEquals(200, flatFile1.getDispatchOffset()); flatFileManager.destroyCompositeFile(mq); -- 2.32.0.windows.2 From 99b39a35f29e491862296d56b7938a995d153974 Mon Sep 17 00:00:00 2001 From: ShuangxiDing <dingshuangxi888@gmail.com> Date: Thu, 10 Aug 2023 11:28:39 +0800 Subject: [PATCH 02/12] [ISSUE #7115] Fix grpc response message NPE (#7116) --- .../apache/rocketmq/proxy/grpc/v2/common/ResponseBuilder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/ResponseBuilder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/ResponseBuilder.java index 0b3c85ea6..efa879a9c 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/ResponseBuilder.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/ResponseBuilder.java @@ -92,7 +92,7 @@ public class ResponseBuilder { public Status buildStatus(Code code, String message) { return Status.newBuilder() .setCode(code) - .setMessage(message) + .setMessage(message != null ? message : code.name()) .build(); } -- 2.32.0.windows.2 From c0ba453f38183266cf9a69be754e620311e1923b Mon Sep 17 00:00:00 2001 From: caigy <csgytsai@163.com> Date: Thu, 10 Aug 2023 14:08:17 +0800 Subject: [PATCH 03/12] [ISSUE #7129] Fix resource collisions in acl tests (#7130) * run acl tests sequencially to avoid collision * disable reuseForks for acl like broker * Revert "[ISSUE #7135] Temporarily ignoring plainAccessValidator test (#7135)" This reverts commit 6bc2c8474a0ce1e2833c82dffea7b1d8f718fcd7. --- acl/pom.xml | 13 +++++++++++++ .../acl/plain/PlainAccessControlFlowTest.java | 5 ----- .../acl/plain/PlainAccessValidatorTest.java | 3 --- .../acl/plain/PlainPermissionManagerTest.java | 3 --- 4 files changed, 13 insertions(+), 11 deletions(-) diff --git a/acl/pom.xml b/acl/pom.xml index 67bfcb8d2..989c0cf77 100644 --- a/acl/pom.xml +++ b/acl/pom.xml @@ -74,4 +74,17 @@ <scope>test</scope> </dependency> </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <version>${maven-surefire-plugin.version}</version> + <configuration> + <forkCount>1</forkCount> + <reuseForks>false</reuseForks> + </configuration> + </plugin> + </plugins> + </build> </project> diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessControlFlowTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessControlFlowTest.java index e7fd0932f..519345714 100644 --- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessControlFlowTest.java +++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessControlFlowTest.java @@ -31,7 +31,6 @@ import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; import java.io.File; @@ -44,7 +43,6 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; - /** * <p> In this class, we'll test the following scenarios, each containing several consecutive operations on ACL, * <p> like updating and deleting ACL, changing config files and checking validations. @@ -52,9 +50,6 @@ import java.util.List; * <p> Case 2: Only conf/acl/plain_acl.yml exists; * <p> Case 3: Both conf/plain_acl.yml and conf/acl/plain_acl.yml exists. */ - -// Ignore this test case as it is currently unable to pass on ubuntu workflow -@Ignore public class PlainAccessControlFlowTest { public static final String DEFAULT_TOPIC = "topic-acl"; diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java index a3a925758..ef0cffbdc 100644 --- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java +++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java @@ -56,11 +56,8 @@ import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; -// Ignore this test case as it is currently unable to pass on ubuntu workflow -@Ignore public class PlainAccessValidatorTest { private PlainAccessValidator plainAccessValidator; diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java index aa7539f3a..941d8c779 100644 --- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java +++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java @@ -29,7 +29,6 @@ import org.assertj.core.api.Assertions; import org.assertj.core.util.Lists; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import java.io.File; @@ -42,8 +41,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -// Ignore this test case as it is currently unable to pass on ubuntu workflow -@Ignore public class PlainPermissionManagerTest { PlainPermissionManager plainPermissionManager; -- 2.32.0.windows.2 From 8741ff8c9b3bdbfc97976285affa7ea35c81243c Mon Sep 17 00:00:00 2001 From: ShuangxiDing <dingshuangxi888@gmail.com> Date: Thu, 10 Aug 2023 17:41:15 +0800 Subject: [PATCH 04/12] [ISSUE #7153] Add switch for MIXED message type (#7154) Add a switch for MIXED message type when creating a Topic in the Broker. --- .../broker/processor/AdminBrokerProcessor.java | 8 ++++++++ .../java/org/apache/rocketmq/common/BrokerConfig.java | 10 ++++++++++ 2 files changed, 18 insertions(+) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index a6ce03dc2..bbddcec2d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -59,6 +59,7 @@ import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UnlockCallback; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.attribute.AttributeParser; +import org.apache.rocketmq.common.attribute.TopicMessageType; import org.apache.rocketmq.common.constant.ConsumeInitMode; import org.apache.rocketmq.common.constant.FIleReadaheadMode; import org.apache.rocketmq.common.constant.LoggerName; @@ -439,6 +440,13 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { String attributesModification = requestHeader.getAttributes(); topicConfig.setAttributes(AttributeParser.parseToMap(attributesModification)); + if (topicConfig.getTopicMessageType() == TopicMessageType.MIXED + && !brokerController.getBrokerConfig().isEnableMixedMessageType()) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("MIXED message type is not supported."); + return response; + } + try { this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index a815636b1..99a5db5ad 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -393,6 +393,8 @@ public class BrokerConfig extends BrokerIdentity { */ private boolean enableSingleTopicRegister = false; + private boolean enableMixedMessageType = false; + public long getMaxPopPollingSize() { return maxPopPollingSize; } @@ -1712,4 +1714,12 @@ public class BrokerConfig extends BrokerIdentity { public void setEnableSingleTopicRegister(boolean enableSingleTopicRegister) { this.enableSingleTopicRegister = enableSingleTopicRegister; } + + public boolean isEnableMixedMessageType() { + return enableMixedMessageType; + } + + public void setEnableMixedMessageType(boolean enableMixedMessageType) { + this.enableMixedMessageType = enableMixedMessageType; + } } -- 2.32.0.windows.2 From f534501855f8edbcb58f5b856973bf1027b5cf3a Mon Sep 17 00:00:00 2001 From: Steven <shirenchuang@users.noreply.github.com> Date: Fri, 11 Aug 2023 10:25:48 +0800 Subject: [PATCH 05/12] [Feature 7155] add errlog when cmd err (#7157) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 十真 <shirenchuang.src@cainiao.com> --- .../src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java | 1 + 1 file changed, 1 insertion(+) diff --git a/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java b/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java index b00bad3c5..5a8a7cd54 100644 --- a/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java +++ b/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java @@ -52,6 +52,7 @@ public class ServerUtil { System.exit(0); } } catch (ParseException e) { + System.err.println(e.getMessage()); hf.printHelp(appName, options, true); System.exit(1); } -- 2.32.0.windows.2 From db58f00c0fe0f129611d654291f2177de55dc9ff Mon Sep 17 00:00:00 2001 From: Zhouxiang Zhan <zhouxzhan@apache.org> Date: Fri, 11 Aug 2023 19:18:30 +0800 Subject: [PATCH 06/12] [ISSUE #7169] Change metadataThreadPoolQueueCapacity to 100000 (#7170) --- .../main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java index 4f57a7052..39caaa0d9 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java @@ -165,7 +165,7 @@ public class ProxyConfig implements ConfigFile { private int subscriptionGroupConfigCacheExpiredInSeconds = 20; private int subscriptionGroupConfigCacheMaxNum = 20000; private int metadataThreadPoolNums = 3; - private int metadataThreadPoolQueueCapacity = 1000; + private int metadataThreadPoolQueueCapacity = 100000; private int transactionHeartbeatThreadPoolNums = 20; private int transactionHeartbeatThreadPoolQueueCapacity = 200; -- 2.32.0.windows.2 From 1f04e68a2e331ab035b791280c5a91b60fe0c85f Mon Sep 17 00:00:00 2001 From: yx9o <yangx_soft@163.com> Date: Sat, 12 Aug 2023 21:12:22 +0800 Subject: [PATCH 07/12] [ISSUE #7172] Unified Chinese for Name Server (#7173) --- docs/cn/concept.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/cn/concept.md b/docs/cn/concept.md index cb2c863bd..3d67e9371 100644 --- a/docs/cn/concept.md +++ b/docs/cn/concept.md @@ -17,7 +17,7 @@ RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。 ## 6 名字服务(Name Server) - 名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。 +名字服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。 ## 7 拉取式消费(Pull Consumer) Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。 -- 2.32.0.windows.2 From 25005060bbace477eeaaf4c0142cece5213efbbf Mon Sep 17 00:00:00 2001 From: yx9o <yangx_soft@163.com> Date: Sun, 13 Aug 2023 20:52:17 +0800 Subject: [PATCH 08/12] [ISSUE #7176] Correct mismatched logs (#7177) --- .../org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java index 0055a1cc8..f7a95f0a6 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java @@ -522,7 +522,7 @@ public class RouteInfoManager { this.lock.writeLock().unlock(); } } catch (Exception e) { - log.error("wipeWritePermOfBrokerByLock Exception", e); + log.error("addWritePermOfBrokerByLock Exception", e); } return 0; } -- 2.32.0.windows.2 From ac411daa27117e9115a8fc5e2d5753085f009ed9 Mon Sep 17 00:00:00 2001 From: yx9o <yangx_soft@163.com> Date: Tue, 15 Aug 2023 08:31:00 +0800 Subject: [PATCH 09/12] [ISSUE #7183] Correct mismatched commandDesc (#7184) --- .../tools/command/topic/RemappingStaticTopicSubCommand.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java index 849f680d0..2a08fdb5b 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java @@ -47,7 +47,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand { @Override public String commandDesc() { - return "Update or create static topic, which has fixed number of queues"; + return "Remapping static topic."; } @Override -- 2.32.0.windows.2 From 55e0cdb2af3ab75a6d892f919d60797f17a99fda Mon Sep 17 00:00:00 2001 From: redlsz <szliu0927@gmail.com> Date: Tue, 15 Aug 2023 19:19:45 +0800 Subject: [PATCH 10/12] fix: IndexOutOfBoundsException when process pop response (#7003) --- .../org/apache/rocketmq/client/impl/MQClientAPIImpl.java | 5 ++++- .../rocketmq/proxy/service/message/LocalMessageService.java | 5 ++++- .../rocketmq/remoting/protocol/header/ExtraInfoUtil.java | 4 ++++ 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 708a6acd1..5101ffc8e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -1174,7 +1174,10 @@ public class MQClientAPIImpl implements NameServerUpdateCallback { Long.parseLong(messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET))); continue; } - key = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), messageExt.getQueueId()); + // Value of POP_CK is used to determine whether it is a pop retry, + // cause topic could be rewritten by broker. + key = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), + messageExt.getProperty(MessageConst.PROPERTY_POP_CK), messageExt.getQueueId()); if (!sortMap.containsKey(key)) { sortMap.put(key, new ArrayList<>(4)); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java index 115c140ff..eb2c4d9ee 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java @@ -249,7 +249,10 @@ public class LocalMessageService implements MessageService { // <topicMark@queueId, msg queueOffset> Map<String, List<Long>> sortMap = new HashMap<>(16); for (MessageExt messageExt : messageExtList) { - String key = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), messageExt.getQueueId()); + // Value of POP_CK is used to determine whether it is a pop retry, + // cause topic could be rewritten by broker. + String key = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), + messageExt.getProperty(MessageConst.PROPERTY_POP_CK), messageExt.getQueueId()); if (!sortMap.containsKey(key)) { sortMap.put(key, new ArrayList<>(4)); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java index 9a5fa89ab..13094331e 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java @@ -282,6 +282,10 @@ public class ExtraInfoUtil { return (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) ? RETRY_TOPIC : NORMAL_TOPIC) + "@" + key; } + public static String getStartOffsetInfoMapKey(String topic, String popCk, long key) { + return ((topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || popCk != null) ? RETRY_TOPIC : NORMAL_TOPIC) + "@" + key; + } + public static String getQueueOffsetKeyValueKey(long queueId, long queueOffset) { return QUEUE_OFFSET + queueId + "%" + queueOffset; } -- 2.32.0.windows.2 From a9c0b43f7f6ce5acfc4f2f3069553071fa93dfee Mon Sep 17 00:00:00 2001 From: yx9o <yangx_soft@163.com> Date: Wed, 16 Aug 2023 18:45:00 +0800 Subject: [PATCH 11/12] [ISSUE #7192] Correct typos (#7193) --- .../tools/command/consumer/ConsumerProgressSubCommand.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java index f51a24673..97125b854 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java @@ -54,7 +54,7 @@ public class ConsumerProgressSubCommand implements SubCommand { @Override public String commandDesc() { - return "Query consumers's progress, speed"; + return "Query consumer's progress, speed."; } @Override -- 2.32.0.windows.2 From 5a3de926b816db5a121c1d788430072a3bc942ae Mon Sep 17 00:00:00 2001 From: Zhouxiang Zhan <zhouxzhan@apache.org> Date: Wed, 16 Aug 2023 20:52:53 +0800 Subject: [PATCH 12/12] Optimize updateSubscription check exist loop (#7190) --- .../broker/client/ConsumerGroupInfo.java | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java index 867b9c720..1ea58c125 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.broker.client; import io.netty.channel.Channel; import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map.Entry; @@ -172,7 +173,7 @@ public class ConsumerGroupInfo { */ public boolean updateSubscription(final Set<SubscriptionData> subList) { boolean updated = false; - + Set<String> topicSet = new HashSet<>(); for (SubscriptionData sub : subList) { SubscriptionData old = this.subscriptionTable.get(sub.getTopic()); if (old == null) { @@ -194,22 +195,16 @@ public class ConsumerGroupInfo { this.subscriptionTable.put(sub.getTopic(), sub); } + // Add all new topics to the HashSet + topicSet.add(sub.getTopic()); } Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, SubscriptionData> next = it.next(); String oldTopic = next.getKey(); - - boolean exist = false; - for (SubscriptionData sub : subList) { - if (sub.getTopic().equals(oldTopic)) { - exist = true; - break; - } - } - - if (!exist) { + // Check HashSet with O(1) time complexity + if (!topicSet.contains(oldTopic)) { log.warn("subscription changed, group: {} remove topic {} {}", this.groupName, oldTopic, -- 2.32.0.windows.2
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.
浙ICP备2022010568号-2