Projects
home:Eustace:branches:Eulaceura:Factory
rocketmq
_service:obs_scm:patch033-backport-Lock-granula...
Sign Up
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File _service:obs_scm:patch033-backport-Lock-granularity-issue-causing-LMQ-message-loss.patch of Package rocketmq
From ead3d905016d9db4785a46beaa555c7fafd4f9bb Mon Sep 17 00:00:00 2001 From: Dongyuan Pan <dongyuanpan0@gmail.com> Date: Wed, 8 Nov 2023 10:40:52 +0800 Subject: [PATCH 1/2] [ISSUE #7511] Lock granularity issue causing LMQ message loss (#7525) * bug fix: assignOffset and increaseOffset in LMQ has concurrency issues in topicQueueLock, should be in putMessageLock * fix MultiDispatchTest * fix MultiDispatchTest * fix unit test --- .../common/message/MessageExtBrokerInner.java | 10 ++ .../org/apache/rocketmq/store/CommitLog.java | 94 ++++++++++++-- .../apache/rocketmq/store/ConsumeQueue.java | 44 +------ .../rocketmq/store/DefaultMessageStore.java | 1 - .../rocketmq/store/MessageExtEncoder.java | 118 ++++++++++++++++-- .../apache/rocketmq/store/MultiDispatch.java | 77 ++++++++++++ .../queue/AbstractConsumeQueueStore.java | 10 ++ .../store/queue/ConsumeQueueInterface.java | 1 - .../queue/ConsumeQueueStoreInterface.java | 14 +++ ...iDispatch.java => MultiDispatchUtils.java} | 17 +-- .../store/queue/QueueOffsetOperator.java | 6 +- .../store/queue/RocksDBConsumeQueue.java | 42 ------- .../rocketmq/store/AppendCallbackTest.java | 6 +- .../rocketmq/store/AppendPropCRCTest.java | 5 +- .../rocketmq/store/MultiDispatchTest.java | 12 +- .../rocketmq/store/kv/CompactionLogTest.java | 2 +- 16 files changed, 322 insertions(+), 137 deletions(-) create mode 100644 store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java rename store/src/main/java/org/apache/rocketmq/store/queue/{MultiDispatch.java => MultiDispatchUtils.java} (78%) diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java index 52501dbca..147f23f12 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java @@ -28,6 +28,8 @@ public class MessageExtBrokerInner extends MessageExt { private ByteBuffer encodedBuff; + private volatile boolean encodeCompleted; + private MessageVersion version = MessageVersion.MESSAGE_VERSION_V1; public ByteBuffer getEncodedBuff() { @@ -92,4 +94,12 @@ public class MessageExtBrokerInner extends MessageExt { this.setPropertiesString(MessageDecoder.messageProperties2String(this.getProperties())); } } + + public boolean isEncodeCompleted() { + return encodeCompleted; + } + + public void setEncodeCompleted(boolean encodeCompleted) { + this.encodeCompleted = encodeCompleted; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 6c3afde70..35c1d0e2d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -35,6 +35,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import com.sun.jna.NativeLong; import com.sun.jna.Pointer; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.SystemClock; @@ -56,6 +57,7 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.store.MessageExtEncoder.PutMessageThreadLocal; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.FlushDiskType; +import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.ha.HAService; import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService; import org.apache.rocketmq.store.logfile.MappedFile; @@ -101,6 +103,7 @@ public class CommitLog implements Swappable { protected int commitLogSize; private final boolean enabledAppendPropCRC; + protected final MultiDispatch multiDispatch; public CommitLog(final DefaultMessageStore messageStore) { String storePath = messageStore.getMessageStoreConfig().getStorePathCommitLog(); @@ -119,13 +122,11 @@ public class CommitLog implements Swappable { this.flushManager = new DefaultFlushManager(); this.coldDataCheckService = new ColdDataCheckService(); - this.appendMessageCallback = new DefaultAppendMessageCallback(); + this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig()); putMessageThreadLocal = new ThreadLocal<PutMessageThreadLocal>() { @Override protected PutMessageThreadLocal initialValue() { - return new PutMessageThreadLocal( - defaultMessageStore.getMessageStoreConfig().getMaxMessageSize(), - defaultMessageStore.getMessageStoreConfig().isEnabledAppendPropCRC()); + return new PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig()); } }; this.putMessageLock = messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock(); @@ -137,6 +138,8 @@ public class CommitLog implements Swappable { this.commitLogSize = messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(); this.enabledAppendPropCRC = messageStore.getMessageStoreConfig().isEnabledAppendPropCRC(); + + this.multiDispatch = new MultiDispatch(defaultMessageStore); } public void setFullStorePaths(Set<String> fullStorePaths) { @@ -1830,15 +1833,84 @@ public class CommitLog implements Swappable { // Store the message content private final ByteBuffer msgStoreItemMemory; private final int crc32ReservedLength = CommitLog.CRC32_RESERVED_LEN; + private final MessageStoreConfig messageStoreConfig; - DefaultAppendMessageCallback() { + DefaultAppendMessageCallback(MessageStoreConfig messageStoreConfig) { this.msgStoreItemMemory = ByteBuffer.allocate(END_FILE_MIN_BLANK_LENGTH); + this.messageStoreConfig = messageStoreConfig; + } + + public AppendMessageResult handlePropertiesForLmqMsg(ByteBuffer preEncodeBuffer, final MessageExtBrokerInner msgInner) { + if (msgInner.isEncodeCompleted()) { + return null; + } + + multiDispatch.wrapMultiDispatch(msgInner); + + msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); + + final byte[] propertiesData = + msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); + + boolean needAppendLastPropertySeparator = enabledAppendPropCRC && propertiesData != null && propertiesData.length > 0 + && propertiesData[propertiesData.length - 1] != MessageDecoder.PROPERTY_SEPARATOR; + + final int propertiesLength = (propertiesData == null ? 0 : propertiesData.length) + (needAppendLastPropertySeparator ? 1 : 0) + crc32ReservedLength; + + if (propertiesLength > Short.MAX_VALUE) { + log.warn("putMessage message properties length too long. length={}", propertiesData.length); + return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED); + } + + int msgLenWithoutProperties = preEncodeBuffer.getInt(0); + + int msgLen = msgLenWithoutProperties + 2 + propertiesLength; + + // Exceeds the maximum message + if (msgLen > this.messageStoreConfig.getMaxMessageSize()) { + log.warn("message size exceeded, msg total size: " + msgLen + ", maxMessageSize: " + this.messageStoreConfig.getMaxMessageSize()); + return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED); + } + + // Back filling total message length + preEncodeBuffer.putInt(0, msgLen); + // Modify position to msgLenWithoutProperties + preEncodeBuffer.position(msgLenWithoutProperties); + + preEncodeBuffer.putShort((short) propertiesLength); + + if (propertiesLength > crc32ReservedLength) { + preEncodeBuffer.put(propertiesData); + } + + if (needAppendLastPropertySeparator) { + preEncodeBuffer.put((byte) MessageDecoder.PROPERTY_SEPARATOR); + } + // 18 CRC32 + preEncodeBuffer.position(preEncodeBuffer.position() + crc32ReservedLength); + + msgInner.setEncodeCompleted(true); + + return null; } public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) { // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br> + ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff(); + boolean isMultiDispatchMsg = messageStoreConfig.isEnableMultiDispatch() && CommitLog.isMultiDispatchMsg(msgInner); + if (isMultiDispatchMsg) { + AppendMessageResult appendMessageResult = handlePropertiesForLmqMsg(preEncodeBuffer, msgInner); + if (appendMessageResult != null) { + return appendMessageResult; + } + } + + final int msgLen = preEncodeBuffer.getInt(0); + preEncodeBuffer.position(0); + preEncodeBuffer.limit(msgLen); + // PHY OFFSET long wroteOffset = fileFromOffset + byteBuffer.position(); @@ -1872,9 +1944,6 @@ public class CommitLog implements Swappable { break; } - ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff(); - final int msgLen = preEncodeBuffer.getInt(0); - // Determines whether there is sufficient free space if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { this.msgStoreItemMemory.clear(); @@ -1919,6 +1988,11 @@ public class CommitLog implements Swappable { byteBuffer.put(preEncodeBuffer); CommitLog.this.getMessageStore().getPerfCounter().endTick("WRITE_MEMORY_TIME_MS"); msgInner.setEncodedBuff(null); + + if (isMultiDispatchMsg) { + CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner); + } + return new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills, messageNum); } @@ -2159,6 +2233,10 @@ public class CommitLog implements Swappable { return flushManager; } + public static boolean isMultiDispatchMsg(MessageExtBrokerInner msg) { + return StringUtils.isNoneBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)) && !msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX); + } + private boolean isCloseReadAhead() { return !MixAll.isWindows() && !defaultMessageStore.getMessageStoreConfig().isDataReadAheadEnable(); } diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index 623509c8b..453c9d1dc 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -27,7 +27,6 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.attribute.CQType; import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.logging.org.slf4j.Logger; @@ -38,7 +37,7 @@ import org.apache.rocketmq.store.logfile.MappedFile; import org.apache.rocketmq.store.queue.ConsumeQueueInterface; import org.apache.rocketmq.store.queue.CqUnit; import org.apache.rocketmq.store.queue.FileQueueLifeCycle; -import org.apache.rocketmq.store.queue.MultiDispatch; +import org.apache.rocketmq.store.queue.MultiDispatchUtils; import org.apache.rocketmq.store.queue.QueueOffsetOperator; import org.apache.rocketmq.store.queue.ReferredIterator; @@ -702,7 +701,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle { this.messageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp()); } this.messageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp()); - if (MultiDispatch.checkMultiDispatchQueue(this.messageStore.getMessageStoreConfig(), request)) { + if (MultiDispatchUtils.checkMultiDispatchQueue(this.messageStore.getMessageStoreConfig(), request)) { multiDispatchLmqQueue(request, maxRetries); } return; @@ -776,28 +775,6 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle { String topicQueueKey = getTopic() + "-" + getQueueId(); long queueOffset = queueOffsetOperator.getQueueOffset(topicQueueKey); msg.setQueueOffset(queueOffset); - - - // Handling the multi dispatch message. In the context of a light message queue (as defined in RIP-28), - // light message queues are constructed based on message properties, which requires special handling of queue offset of the light message queue. - if (!MultiDispatch.isNeedHandleMultiDispatch(this.messageStore.getMessageStoreConfig(), msg.getTopic())) { - return; - } - String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); - if (StringUtils.isBlank(multiDispatchQueue)) { - return; - } - String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); - Long[] queueOffsets = new Long[queues.length]; - for (int i = 0; i < queues.length; i++) { - if (this.messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queues[i])) { - String key = MultiDispatch.lmqQueueKey(queues[i]); - queueOffsets[i] = queueOffsetOperator.getLmqOffset(key); - } - } - MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, - StringUtils.join(queueOffsets, MixAll.MULTI_DISPATCH_QUEUE_SPLITTER)); - msg.removeWaitStorePropertyString(); } @Override @@ -805,23 +782,6 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle { short messageNum) { String topicQueueKey = getTopic() + "-" + getQueueId(); queueOffsetOperator.increaseQueueOffset(topicQueueKey, messageNum); - - // Handling the multi dispatch message. In the context of a light message queue (as defined in RIP-28), - // light message queues are constructed based on message properties, which requires special handling of queue offset of the light message queue. - if (!MultiDispatch.isNeedHandleMultiDispatch(this.messageStore.getMessageStoreConfig(), msg.getTopic())) { - return; - } - String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); - if (StringUtils.isBlank(multiDispatchQueue)) { - return; - } - String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); - for (int i = 0; i < queues.length; i++) { - if (this.messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queues[i])) { - String key = MultiDispatch.lmqQueueKey(queues[i]); - queueOffsetOperator.increaseLmqOffset(key, (short) 1); - } - } } private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode, diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 99a54e2d7..dc5f312e5 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -2112,7 +2112,6 @@ public class DefaultMessageStore implements MessageStore { } } - @Override public void increaseOffset(MessageExtBrokerInner msg, short messageNum) { final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java index c1d808728..20e9a652b 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java @@ -29,6 +29,7 @@ import org.apache.rocketmq.common.message.MessageVersion; import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.store.config.MessageStoreConfig; public class MessageExtEncoder { protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); @@ -38,20 +39,22 @@ public class MessageExtEncoder { // The maximum length of the full message. private int maxMessageSize; private final int crc32ReservedLength; + private MessageStoreConfig messageStoreConfig; - public MessageExtEncoder(final int maxMessageBodySize) { - this(maxMessageBodySize, false); + public MessageExtEncoder(final int maxMessageBodySize, final MessageStoreConfig messageStoreConfig) { + this(messageStoreConfig); } - public MessageExtEncoder(final int maxMessageBodySize, boolean enabledAppendPropCRC) { + public MessageExtEncoder(final MessageStoreConfig messageStoreConfig) { ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT; + this.messageStoreConfig = messageStoreConfig; + this.maxMessageBodySize = messageStoreConfig.getMaxMessageSize(); //Reserve 64kb for encoding buffer outside body int maxMessageSize = Integer.MAX_VALUE - maxMessageBodySize >= 64 * 1024 ? maxMessageBodySize + 64 * 1024 : Integer.MAX_VALUE; byteBuf = alloc.directBuffer(maxMessageSize); - this.maxMessageBodySize = maxMessageBodySize; this.maxMessageSize = maxMessageSize; - this.crc32ReservedLength = enabledAppendPropCRC ? CommitLog.CRC32_RESERVED_LEN : 0; + this.crc32ReservedLength = messageStoreConfig.isEnabledAppendPropCRC() ? CommitLog.CRC32_RESERVED_LEN : 0; } public static int calMsgLength(MessageVersion messageVersion, @@ -79,8 +82,103 @@ public class MessageExtEncoder { + 2 + (Math.max(propertiesLength, 0)); //propertiesLength } + public static int calMsgLengthNoProperties(MessageVersion messageVersion, + int sysFlag, int bodyLength, int topicLength) { + + int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20; + int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20; + + return 4 //TOTALSIZE + + 4 //MAGICCODE + + 4 //BODYCRC + + 4 //QUEUEID + + 4 //FLAG + + 8 //QUEUEOFFSET + + 8 //PHYSICALOFFSET + + 4 //SYSFLAG + + 8 //BORNTIMESTAMP + + bornhostLength //BORNHOST + + 8 //STORETIMESTAMP + + storehostAddressLength //STOREHOSTADDRESS + + 4 //RECONSUMETIMES + + 8 //Prepared Transaction Offset + + 4 + (Math.max(bodyLength, 0)) //BODY + + messageVersion.getTopicLengthSize() + topicLength; //TOPIC + } + + public PutMessageResult encodeWithoutProperties(MessageExtBrokerInner msgInner) { + + final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); + final int topicLength = topicData.length; + + final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length; + + // Exceeds the maximum message body + if (bodyLength > this.maxMessageBodySize) { + CommitLog.log.warn("message body size exceeded, msg body size: " + bodyLength + + ", maxMessageSize: " + this.maxMessageBodySize); + return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); + } + + final int msgLenNoProperties = calMsgLengthNoProperties(msgInner.getVersion(), msgInner.getSysFlag(), bodyLength, topicLength); + + // 1 TOTALSIZE + this.byteBuf.writeInt(msgLenNoProperties); + // 2 MAGICCODE + this.byteBuf.writeInt(msgInner.getVersion().getMagicCode()); + // 3 BODYCRC + this.byteBuf.writeInt(msgInner.getBodyCRC()); + // 4 QUEUEID + this.byteBuf.writeInt(msgInner.getQueueId()); + // 5 FLAG + this.byteBuf.writeInt(msgInner.getFlag()); + // 6 QUEUEOFFSET, need update later + this.byteBuf.writeLong(0); + // 7 PHYSICALOFFSET, need update later + this.byteBuf.writeLong(0); + // 8 SYSFLAG + this.byteBuf.writeInt(msgInner.getSysFlag()); + // 9 BORNTIMESTAMP + this.byteBuf.writeLong(msgInner.getBornTimestamp()); + + // 10 BORNHOST + ByteBuffer bornHostBytes = msgInner.getBornHostBytes(); + this.byteBuf.writeBytes(bornHostBytes.array()); + + // 11 STORETIMESTAMP + this.byteBuf.writeLong(msgInner.getStoreTimestamp()); + + // 12 STOREHOSTADDRESS + ByteBuffer storeHostBytes = msgInner.getStoreHostBytes(); + this.byteBuf.writeBytes(storeHostBytes.array()); + + // 13 RECONSUMETIMES + this.byteBuf.writeInt(msgInner.getReconsumeTimes()); + // 14 Prepared Transaction Offset + this.byteBuf.writeLong(msgInner.getPreparedTransactionOffset()); + // 15 BODY + this.byteBuf.writeInt(bodyLength); + if (bodyLength > 0) + this.byteBuf.writeBytes(msgInner.getBody()); + + // 16 TOPIC + if (MessageVersion.MESSAGE_VERSION_V2.equals(msgInner.getVersion())) { + this.byteBuf.writeShort((short) topicLength); + } else { + this.byteBuf.writeByte((byte) topicLength); + } + this.byteBuf.writeBytes(topicData); + + return null; + } + public PutMessageResult encode(MessageExtBrokerInner msgInner) { this.byteBuf.clear(); + + if (messageStoreConfig.isEnableMultiDispatch() && CommitLog.isMultiDispatchMsg(msgInner)) { + return encodeWithoutProperties(msgInner); + } + /** * Serialize message */ @@ -303,7 +401,7 @@ public class MessageExtEncoder { } public ByteBuffer getEncoderBuffer() { - return this.byteBuf.nioBuffer(); + return this.byteBuf.nioBuffer(0, this.byteBuf.capacity()); } public int getMaxMessageBodySize() { @@ -322,12 +420,8 @@ public class MessageExtEncoder { private final MessageExtEncoder encoder; private final StringBuilder keyBuilder; - PutMessageThreadLocal(int size) { - this(size, false); - } - - PutMessageThreadLocal(int size, boolean enabledAppendPropCRC) { - encoder = new MessageExtEncoder(size, enabledAppendPropCRC); + PutMessageThreadLocal(MessageStoreConfig messageStoreConfig) { + encoder = new MessageExtEncoder(messageStoreConfig); keyBuilder = new StringBuilder(); } diff --git a/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java new file mode 100644 index 000000000..5bc587a8e --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store; + +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExtBrokerInner; + +/** + * MultiDispatch for lmq, not-thread-safe + */ +public class MultiDispatch { + private final StringBuilder keyBuilder = new StringBuilder(); + private final DefaultMessageStore messageStore; + private static final short VALUE_OF_EACH_INCREMENT = 1; + + public MultiDispatch(DefaultMessageStore messageStore) { + this.messageStore = messageStore; + } + + public String queueKey(String queueName, MessageExtBrokerInner msgInner) { + keyBuilder.delete(0, keyBuilder.length()); + keyBuilder.append(queueName); + keyBuilder.append('-'); + int queueId = msgInner.getQueueId(); + if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queueName)) { + queueId = 0; + } + keyBuilder.append(queueId); + return keyBuilder.toString(); + } + + public void wrapMultiDispatch(final MessageExtBrokerInner msg) { + + String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); + String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); + Long[] queueOffsets = new Long[queues.length]; + if (messageStore.getMessageStoreConfig().isEnableLmq()) { + for (int i = 0; i < queues.length; i++) { + String key = queueKey(queues[i], msg); + if (MixAll.isLmq(key)) { + queueOffsets[i] = messageStore.getQueueStore().getLmqQueueOffset(key); + } + } + } + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, + StringUtils.join(queueOffsets, MixAll.MULTI_DISPATCH_QUEUE_SPLITTER)); + msg.removeWaitStorePropertyString(); + } + + public void updateMultiQueueOffset(final MessageExtBrokerInner msgInner) { + String multiDispatchQueue = msgInner.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); + String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); + for (String queue : queues) { + String key = queueKey(queue, msgInner); + if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(key)) { + messageStore.getQueueStore().increaseLmqOffset(key, VALUE_OF_EACH_INCREMENT); + } + } + } +} \ No newline at end of file diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java index 30054fa50..d76b05577 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java @@ -74,6 +74,16 @@ public abstract class AbstractConsumeQueueStore implements ConsumeQueueStoreInte consumeQueue.increaseQueueOffset(this.queueOffsetOperator, msg, messageNum); } + @Override + public void increaseLmqOffset(String queueKey, short messageNum) { + queueOffsetOperator.increaseLmqOffset(queueKey, messageNum); + } + + @Override + public long getLmqQueueOffset(String queueKey) { + return queueOffsetOperator.getLmqOffset(queueKey); + } + @Override public void removeTopicQueueTable(String topic, Integer queueId) { this.queueOffsetOperator.remove(topic, queueId); diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java index c65f2a68b..768c782b1 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java @@ -181,7 +181,6 @@ public interface ConsumeQueueInterface extends FileQueueLifeCycle { */ void assignQueueOffset(QueueOffsetOperator queueOffsetAssigner, MessageExtBrokerInner msg) throws RocksDBException; - /** * Increase queue offset. * @param queueOffsetAssigner the delegated queue offset assigner diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreInterface.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreInterface.java index 268803dcc..e68880a82 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreInterface.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreInterface.java @@ -183,6 +183,20 @@ public interface ConsumeQueueStoreInterface { */ void increaseQueueOffset(MessageExtBrokerInner msg, short messageNum); + /** + * Increase lmq offset + * @param queueKey + * @param messageNum + */ + void increaseLmqOffset(String queueKey, short messageNum); + + /** + * get lmq queue offset + * @param queueKey + * @return + */ + long getLmqQueueOffset(String queueKey); + /** * recover topicQueue table by minPhyOffset * @param minPhyOffset diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatch.java b/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatchUtils.java similarity index 78% rename from store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatch.java rename to store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatchUtils.java index d6291d908..44397a2fc 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatch.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatchUtils.java @@ -16,8 +16,6 @@ */ package org.apache.rocketmq.store.queue; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import org.apache.commons.lang3.StringUtils; @@ -27,7 +25,7 @@ import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.store.DispatchRequest; import org.apache.rocketmq.store.config.MessageStoreConfig; -public class MultiDispatch { +public class MultiDispatchUtils { public static String lmqQueueKey(String queueName) { StringBuilder keyBuilder = new StringBuilder(); @@ -60,17 +58,4 @@ public class MultiDispatch { } return true; } - - public static List<DispatchRequest> checkMultiDispatchQueue(MessageStoreConfig messageStoreConfig, List<DispatchRequest> dispatchRequests) { - if (!messageStoreConfig.isEnableMultiDispatch() || dispatchRequests == null || dispatchRequests.size() == 0) { - return null; - } - List<DispatchRequest> result = new ArrayList<>(); - for (DispatchRequest dispatchRequest : dispatchRequests) { - if (checkMultiDispatchQueue(messageStoreConfig, dispatchRequest)) { - result.add(dispatchRequest); - } - } - return dispatchRequests; - } } diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java index 8da374828..5b4bf994e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java @@ -71,9 +71,9 @@ public class QueueOffsetOperator { return this.lmqTopicQueueTable.get(topicQueueKey); } - public void increaseLmqOffset(String topicQueueKey, short messageNum) { - Long lmqOffset = ConcurrentHashMapUtils.computeIfAbsent(this.lmqTopicQueueTable, topicQueueKey, k -> 0L); - this.lmqTopicQueueTable.put(topicQueueKey, lmqOffset + messageNum); + public void increaseLmqOffset(String queueKey, short messageNum) { + Long lmqOffset = ConcurrentHashMapUtils.computeIfAbsent(this.lmqTopicQueueTable, queueKey, k -> 0L); + this.lmqTopicQueueTable.put(queueKey, lmqOffset + messageNum); } public long currentQueueOffset(String topicQueueKey) { diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java index 759be395d..5a981bb4d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java @@ -19,14 +19,10 @@ package org.apache.rocketmq.store.queue; import java.nio.ByteBuffer; import java.util.List; -import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.BoundaryType; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.attribute.CQType; import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.common.message.MessageAccessor; -import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; @@ -217,50 +213,12 @@ public class RocksDBConsumeQueue implements ConsumeQueueInterface { queueOffsetOperator.updateQueueOffset(topicQueueKey, queueOffset); } msg.setQueueOffset(queueOffset); - - // Handling the multi dispatch message. In the context of a light message queue (as defined in RIP-28), - // light message queues are constructed based on message properties, which requires special handling of queue offset of the light message queue. - if (!MultiDispatch.isNeedHandleMultiDispatch(this.messageStore.getMessageStoreConfig(), msg.getTopic())) { - return; - } - String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); - if (StringUtils.isBlank(multiDispatchQueue)) { - return; - } - String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); - Long[] queueOffsets = new Long[queues.length]; - for (int i = 0; i < queues.length; i++) { - if (this.messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queues[i])) { - String key = MultiDispatch.lmqQueueKey(queues[i]); - queueOffsets[i] = queueOffsetOperator.getLmqTopicQueueNextOffset(key); - } - } - MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, - StringUtils.join(queueOffsets, MixAll.MULTI_DISPATCH_QUEUE_SPLITTER)); - msg.removeWaitStorePropertyString(); } @Override public void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator, MessageExtBrokerInner msg, short messageNum) { String topicQueueKey = getTopic() + "-" + getQueueId(); queueOffsetOperator.increaseQueueOffset(topicQueueKey, messageNum); - - // Handling the multi dispatch message. In the context of a light message queue (as defined in RIP-28), - // light message queues are constructed based on message properties, which requires special handling of queue offset of the light message queue. - if (!MultiDispatch.isNeedHandleMultiDispatch(this.messageStore.getMessageStoreConfig(), msg.getTopic())) { - return; - } - String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); - if (StringUtils.isBlank(multiDispatchQueue)) { - return; - } - String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); - for (int i = 0; i < queues.length; i++) { - if (this.messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queues[i])) { - String key = MultiDispatch.lmqQueueKey(queues[i]); - queueOffsetOperator.increaseLmqOffset(key, (short) 1); - } - } } @Override diff --git a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java index 87bfe85da..374857149 100644 --- a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java @@ -44,7 +44,7 @@ public class AppendCallbackTest { AppendMessageCallback callback; - MessageExtEncoder batchEncoder = new MessageExtEncoder(10 * 1024 * 1024); + MessageExtEncoder batchEncoder; @Before public void init() throws Exception { @@ -53,12 +53,14 @@ public class AppendCallbackTest { messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 4); messageStoreConfig.setMaxHashSlotNum(100); messageStoreConfig.setMaxIndexNum(100 * 10); + messageStoreConfig.setMaxMessageSize(10 * 1024 * 1024); messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore"); messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore" + File.separator + "commitlog"); //too much reference DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig(), new ConcurrentHashMap<>()); CommitLog commitLog = new CommitLog(messageStore); - callback = commitLog.new DefaultAppendMessageCallback(); + callback = commitLog.new DefaultAppendMessageCallback(messageStoreConfig); + batchEncoder = new MessageExtEncoder(messageStoreConfig); } @After diff --git a/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java b/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java index c8ed4d74d..d882fc9d9 100644 --- a/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java @@ -56,6 +56,7 @@ public class AppendPropCRCTest { messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 4); messageStoreConfig.setMaxHashSlotNum(100); messageStoreConfig.setMaxIndexNum(100 * 10); + messageStoreConfig.setMaxMessageSize(10 * 1024 * 1024); messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore"); messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore" + File.separator + "commitlog"); messageStoreConfig.setForceVerifyPropCRC(true); @@ -63,8 +64,8 @@ public class AppendPropCRCTest { //too much reference DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig(), new ConcurrentHashMap<>()); commitLog = new CommitLog(messageStore); - encoder = new MessageExtEncoder(10 * 1024 * 1024, true); - callback = commitLog.new DefaultAppendMessageCallback(); + encoder = new MessageExtEncoder(messageStoreConfig); + callback = commitLog.new DefaultAppendMessageCallback(messageStoreConfig); } @After diff --git a/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java index 2447bbf68..eae5eaa07 100644 --- a/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java @@ -28,20 +28,19 @@ import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.store.config.MessageStoreConfig; -import org.apache.rocketmq.store.queue.MultiDispatch; +import org.apache.rocketmq.store.queue.MultiDispatchUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.rocksdb.RocksDBException; -import static org.apache.rocketmq.store.config.StorePathConfigHelper.getStorePathConsumeQueue; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class MultiDispatchTest { - private ConsumeQueue consumeQueue; + private MultiDispatch multiDispatch; private DefaultMessageStore messageStore; @@ -61,8 +60,7 @@ public class MultiDispatchTest { BrokerConfig brokerConfig = new BrokerConfig(); //too much reference messageStore = new DefaultMessageStore(messageStoreConfig, null, null, brokerConfig, new ConcurrentHashMap<>()); - consumeQueue = new ConsumeQueue("xxx", 0, - getStorePathConsumeQueue(messageStoreConfig.getStorePathRootDir()), messageStoreConfig.getMappedFileSizeConsumeQueue(), messageStore); + multiDispatch = new MultiDispatch(messageStore); } @After @@ -74,14 +72,14 @@ public class MultiDispatchTest { public void lmqQueueKey() { MessageExtBrokerInner messageExtBrokerInner = mock(MessageExtBrokerInner.class); when(messageExtBrokerInner.getQueueId()).thenReturn(2); - String ret = MultiDispatch.lmqQueueKey("%LMQ%lmq123"); + String ret = MultiDispatchUtils.lmqQueueKey("%LMQ%lmq123"); assertEquals(ret, "%LMQ%lmq123-0"); } @Test public void wrapMultiDispatch() throws RocksDBException { MessageExtBrokerInner messageExtBrokerInner = buildMessageMultiQueue(); - messageStore.assignOffset(messageExtBrokerInner); + multiDispatch.wrapMultiDispatch(messageExtBrokerInner); assertEquals(messageExtBrokerInner.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET), "0,0"); } diff --git a/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java b/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java index df3c31c6e..e113b18f1 100644 --- a/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java @@ -86,7 +86,7 @@ public class CompactionLogTest { int compactionCqFileSize = 1024; - private static MessageExtEncoder encoder = new MessageExtEncoder(1024); + private static MessageExtEncoder encoder = new MessageExtEncoder(1024, new MessageStoreConfig()); private static SocketAddress storeHost; private static SocketAddress bornHost; -- 2.32.0.windows.2 From 70dc93abbcb9bf161378d66fcaca55bedc78b905 Mon Sep 17 00:00:00 2001 From: yangguodong <1174533476@qq.com> Date: Wed, 8 Nov 2023 21:14:54 -0600 Subject: [PATCH 2/2] Fix tiered store README.md error about Configuration (#7436) * Fix tiered store README.md error about Configuration * Fix change tieredStoreFilePath to tieredStoreFilepath * revert README.md change --------- Co-authored-by: yangguodong.cn <yangguodong.cn@bytedance.com> --- .../tieredstore/common/TieredMessageStoreConfig.java | 10 +++++----- .../tieredstore/provider/posix/PosixFileSegment.java | 4 ++-- .../rocketmq/tieredstore/file/TieredCommitLogTest.java | 2 +- .../provider/posix/PosixFileSegmentTest.java | 2 +- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java index 595db6b86..a112ea6b1 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java @@ -115,7 +115,7 @@ public class TieredMessageStoreConfig { private long readAheadCacheExpireDuration = 10 * 1000; private double readAheadCacheSizeThresholdRate = 0.3; - private String tieredStoreFilePath = ""; + private String tieredStoreFilepath = ""; private String objectStoreEndpoint = ""; @@ -350,12 +350,12 @@ public class TieredMessageStoreConfig { this.readAheadCacheSizeThresholdRate = rate; } - public String getTieredStoreFilePath() { - return tieredStoreFilePath; + public String getTieredStoreFilepath() { + return tieredStoreFilepath; } - public void setTieredStoreFilePath(String tieredStoreFilePath) { - this.tieredStoreFilePath = tieredStoreFilePath; + public void setTieredStoreFilepath(String tieredStoreFilepath) { + this.tieredStoreFilepath = tieredStoreFilepath; } public void setObjectStoreEndpoint(String objectStoreEndpoint) { diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java index 7e949cb28..708ce33f9 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java @@ -66,8 +66,8 @@ public class PosixFileSegment extends TieredFileSegment { super(storeConfig, fileType, filePath, baseOffset); // basePath - String basePath = StringUtils.defaultString(storeConfig.getTieredStoreFilePath(), - StringUtils.appendIfMissing(storeConfig.getTieredStoreFilePath(), File.separator)); + String basePath = StringUtils.defaultString(storeConfig.getTieredStoreFilepath(), + StringUtils.appendIfMissing(storeConfig.getTieredStoreFilepath(), File.separator)); // fullPath: basePath/hash_cluster/broker/topic/queueId/fileType/baseOffset String brokerClusterName = storeConfig.getBrokerClusterName(); diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java index 6693d3cb7..80cdba977 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java @@ -49,7 +49,7 @@ public class TieredCommitLogTest { TieredMessageStoreConfig storeConfig = new TieredMessageStoreConfig(); storeConfig.setBrokerName("brokerName"); storeConfig.setStorePathRootDir(storePath); - storeConfig.setTieredStoreFilePath(storePath + File.separator); + storeConfig.setTieredStoreFilepath(storePath + File.separator); storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment"); storeConfig.setCommitLogRollingInterval(0); storeConfig.setTieredStoreCommitLogMaxSize(1000); diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java index db33ae847..ede62b8ce 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java @@ -42,7 +42,7 @@ public class PosixFileSegmentTest { @Before public void setUp() { storeConfig = new TieredMessageStoreConfig(); - storeConfig.setTieredStoreFilePath(storePath); + storeConfig.setTieredStoreFilepath(storePath); mq = new MessageQueue("OSSFileSegmentTest", "broker", 0); TieredStoreExecutor.init(); } -- 2.32.0.windows.2
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.
浙ICP备2022010568号-2