Projects
Eulaceura:Factory
rocketmq
_service:obs_scm:patch031-backport-Add-CRC-chec...
Sign Up
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File _service:obs_scm:patch031-backport-Add-CRC-check-of-commitlog.patch of Package rocketmq
From 91349f30b96db2e16b71d65a535d81f11b60bda5 Mon Sep 17 00:00:00 2001 From: guyinyou <36399867+guyinyou@users.noreply.github.com> Date: Wed, 25 Oct 2023 14:54:00 +0800 Subject: [PATCH 1/2] [ISSUE #7437] Add the CRC check of commitlog (#7468) * Added CRC32 check for full data * add unit test * add MessageExtBrokerInnerTest * fix codestyle * fix codestyle --------- Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com> --- .../org/apache/rocketmq/common/UtilAll.java | 14 ++ .../rocketmq/common/message/MessageConst.java | 2 + .../common/message/MessageDecoder.java | 32 ++- .../common/message/MessageExtBrokerInner.java | 49 +++++ .../common/MessageExtBrokerInnerTest.java | 93 ++++++++ .../org/apache/rocketmq/store/CommitLog.java | 87 +++++++- .../rocketmq/store/MessageExtEncoder.java | 38 +++- .../store/config/MessageStoreConfig.java | 23 ++ .../rocketmq/store/AppendPropCRCTest.java | 200 ++++++++++++++++++ .../rocketmq/store/BatchPutMessageTest.java | 2 +- .../store/MessageExtBrokerInnerTest.java | 105 +++++++++ .../store/ha/autoswitch/AutoSwitchHATest.java | 2 +- .../file/CompositeQueueFlatFileTest.java | 2 +- .../util/MessageBufferUtilTest.java | 2 +- 14 files changed, 630 insertions(+), 21 deletions(-) create mode 100644 common/src/test/java/org/apache/rocketmq/common/MessageExtBrokerInnerTest.java create mode 100644 store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java create mode 100644 store/src/test/java/org/apache/rocketmq/store/MessageExtBrokerInnerTest.java diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java index d2b7c374b..95b6b09b4 100644 --- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java @@ -307,6 +307,20 @@ public class UtilAll { return (int) (crc32.getValue() & 0x7FFFFFFF); } + public static int crc32(ByteBuffer byteBuffer) { + CRC32 crc32 = new CRC32(); + crc32.update(byteBuffer); + return (int) (crc32.getValue() & 0x7FFFFFFF); + } + + public static int crc32(ByteBuffer[] byteBuffers) { + CRC32 crc32 = new CRC32(); + for (ByteBuffer buffer : byteBuffers) { + crc32.update(buffer); + } + return (int) (crc32.getValue() & 0x7FFFFFFF); + } + public static String bytes2string(byte[] src) { char[] hexChars = new char[src.length * 2]; for (int j = 0; j < src.length; j++) { diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java index 87fed7c19..24f7bdb99 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java @@ -97,6 +97,7 @@ public class MessageConst { public static final String PROPERTY_TIMER_DEL_UNIQKEY = "TIMER_DEL_UNIQKEY"; public static final String PROPERTY_TIMER_DELAY_LEVEL = "TIMER_DELAY_LEVEL"; public static final String PROPERTY_TIMER_DELAY_MS = "TIMER_DELAY_MS"; + public static final String PROPERTY_CRC32 = "__CRC32#"; /** * properties for DLQ @@ -155,5 +156,6 @@ public class MessageConst { STRING_HASH_SET.add(PROPERTY_BORN_TIMESTAMP); STRING_HASH_SET.add(PROPERTY_DLQ_ORIGIN_TOPIC); STRING_HASH_SET.add(PROPERTY_DLQ_ORIGIN_MESSAGE_ID); + STRING_HASH_SET.add(PROPERTY_CRC32); } } diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java index 6de0b69fb..b053f8275 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.common.message; +import io.netty.buffer.ByteBuf; import java.io.IOException; import java.net.Inet4Address; import java.net.InetAddress; @@ -152,6 +153,34 @@ public class MessageDecoder { return null; } + public static void createCrc32(final ByteBuffer input, int crc32) { + input.put(MessageConst.PROPERTY_CRC32.getBytes(StandardCharsets.UTF_8)); + input.put((byte) NAME_VALUE_SEPARATOR); + for (int i = 0; i < 10; i++) { + byte b = '0'; + if (crc32 > 0) { + b += (byte) (crc32 % 10); + crc32 /= 10; + } + input.put(b); + } + input.put((byte) PROPERTY_SEPARATOR); + } + + public static void createCrc32(final ByteBuf input, int crc32) { + input.writeBytes(MessageConst.PROPERTY_CRC32.getBytes(StandardCharsets.UTF_8)); + input.writeByte((byte) NAME_VALUE_SEPARATOR); + for (int i = 0; i < 10; i++) { + byte b = '0'; + if (crc32 > 0) { + b += (byte) (crc32 % 10); + crc32 /= 10; + } + input.writeByte(b); + } + input.writeByte((byte) PROPERTY_SEPARATOR); + } + public static MessageExt decode(ByteBuffer byteBuffer) { return decode(byteBuffer, true, true, false); } @@ -601,9 +630,6 @@ public class MessageDecoder { sb.append(value); sb.append(PROPERTY_SEPARATOR); } - if (sb.length() > 0) { - sb.deleteCharAt(sb.length() - 1); - } return sb.toString(); } diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java index 91599653c..4e5d3419a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java @@ -20,6 +20,9 @@ import java.nio.ByteBuffer; import org.apache.rocketmq.common.TopicFilterType; +import static org.apache.rocketmq.common.message.MessageDecoder.NAME_VALUE_SEPARATOR; +import static org.apache.rocketmq.common.message.MessageDecoder.PROPERTY_SEPARATOR; + public class MessageExtBrokerInner extends MessageExt { private static final long serialVersionUID = 7256001576878700634L; private String propertiesString; @@ -55,6 +58,52 @@ public class MessageExtBrokerInner extends MessageExt { this.propertiesString = propertiesString; } + + public void deleteProperty(String name) { + super.clearProperty(name); + if (propertiesString != null) { + int idx0 = 0; + int idx1; + int idx2; + idx1 = propertiesString.indexOf(name, idx0); + if (idx1 != -1) { + // cropping may be required + StringBuilder stringBuilder = new StringBuilder(propertiesString.length()); + while (true) { + int startIdx = idx0; + while (true) { + idx1 = propertiesString.indexOf(name, startIdx); + if (idx1 == -1) { + break; + } + startIdx = idx1 + name.length(); + if (idx1 == 0 || propertiesString.charAt(idx1 - 1) == PROPERTY_SEPARATOR) { + if (propertiesString.length() > idx1 + name.length() + && propertiesString.charAt(idx1 + name.length()) == NAME_VALUE_SEPARATOR) { + break; + } + } + } + if (idx1 == -1) { + // there are no characters that need to be skipped. Append all remaining characters. + stringBuilder.append(propertiesString, idx0, propertiesString.length()); + break; + } + // there are characters that need to be cropped + stringBuilder.append(propertiesString, idx0, idx1); + // move idx2 to the end of the cropped character + idx2 = propertiesString.indexOf(PROPERTY_SEPARATOR, idx1 + name.length() + 1); + // all subsequent characters will be cropped + if (idx2 == -1) { + break; + } + idx0 = idx2 + 1; + } + this.setPropertiesString(stringBuilder.toString()); + } + } + } + public long getTagsCode() { return tagsCode; } diff --git a/common/src/test/java/org/apache/rocketmq/common/MessageExtBrokerInnerTest.java b/common/src/test/java/org/apache/rocketmq/common/MessageExtBrokerInnerTest.java new file mode 100644 index 000000000..77d69e5ad --- /dev/null +++ b/common/src/test/java/org/apache/rocketmq/common/MessageExtBrokerInnerTest.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common; + +import org.apache.rocketmq.common.message.MessageExtBrokerInner; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class MessageExtBrokerInnerTest { + @Test + public void testDeleteProperty() { + MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner(); + String propertiesString = ""; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(""); + + propertiesString = "KeyA\u0001ValueA"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(""); + + propertiesString = "KeyA\u0001ValueA\u0002"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(""); + + propertiesString = "KeyA\u0001ValueA\u0002KeyA\u0001ValueA"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(""); + + propertiesString = "KeyA\u0001ValueA\u0002KeyA\u0001ValueA\u0002"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(""); + + propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002"); + + propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA\u0002"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002"); + + propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA\u0002KeyB\u0001ValueB\u0002"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002KeyB\u0001ValueB\u0002"); + + propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueB\u0002"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002"); + + propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueB"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB"); + + propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA\u0001ValueA\u0002"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA\u0001ValueA\u0002"); + + propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA\u0001"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA\u0001"); + + propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA"); + } +} diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 93102799b..3d3ee86b8 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -73,6 +73,10 @@ public class CommitLog implements Swappable { protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); // End of file empty MAGIC CODE cbd43194 public final static int BLANK_MAGIC_CODE = -875286124; + /** + * CRC32 Format: [PROPERTY_CRC32 + NAME_VALUE_SEPARATOR + 10-digit fixed-length string + PROPERTY_SEPARATOR] + */ + public static final int CRC32_RESERVED_LEN = MessageConst.PROPERTY_CRC32.length() + 1 + 10 + 1; protected final MappedFileQueue mappedFileQueue; protected final DefaultMessageStore defaultMessageStore; @@ -96,6 +100,8 @@ public class CommitLog implements Swappable { protected int commitLogSize; + private final boolean enabledAppendPropCRC; + public CommitLog(final DefaultMessageStore messageStore) { String storePath = messageStore.getMessageStoreConfig().getStorePathCommitLog(); if (storePath.contains(MixAll.MULTI_PATH_SPLITTER)) { @@ -117,7 +123,9 @@ public class CommitLog implements Swappable { putMessageThreadLocal = new ThreadLocal<PutMessageThreadLocal>() { @Override protected PutMessageThreadLocal initialValue() { - return new PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); + return new PutMessageThreadLocal( + defaultMessageStore.getMessageStoreConfig().getMaxMessageSize(), + defaultMessageStore.getMessageStoreConfig().isEnabledAppendPropCRC()); } }; this.putMessageLock = messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock(); @@ -127,6 +135,8 @@ public class CommitLog implements Swappable { this.topicQueueLock = new TopicQueueLock(messageStore.getMessageStoreConfig().getTopicQueueLockNum()); this.commitLogSize = messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(); + + this.enabledAppendPropCRC = messageStore.getMessageStoreConfig().isEnabledAppendPropCRC(); } public void setFullStorePaths(Set<String> fullStorePaths) { @@ -470,10 +480,16 @@ public class CommitLog implements Swappable { byteBuffer.get(bytesContent, 0, bodyLen); if (checkCRC) { - int crc = UtilAll.crc32(bytesContent, 0, bodyLen); - if (crc != bodyCRC) { - log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC); - return new DispatchRequest(-1, false/* success */); + /** + * When the forceVerifyPropCRC = false, + * use original bodyCrc validation. + */ + if (!this.defaultMessageStore.getMessageStoreConfig().isForceVerifyPropCRC()) { + int crc = UtilAll.crc32(bytesContent, 0, bodyLen); + if (crc != bodyCRC) { + log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC); + return new DispatchRequest(-1, false/* success */); + } } } } else { @@ -531,6 +547,43 @@ public class CommitLog implements Swappable { } } + if (checkCRC) { + /** + * When the forceVerifyPropCRC = true, + * Crc verification needs to be performed on the entire message data (excluding the length reserved at the tail) + */ + if (this.defaultMessageStore.getMessageStoreConfig().isForceVerifyPropCRC()) { + int expectedCRC = -1; + if (propertiesMap != null) { + String crc32Str = propertiesMap.get(MessageConst.PROPERTY_CRC32); + if (crc32Str != null) { + expectedCRC = 0; + for (int i = crc32Str.length() - 1; i >= 0; i--) { + int num = crc32Str.charAt(i) - '0'; + expectedCRC *= 10; + expectedCRC += num; + } + } + } + if (expectedCRC > 0) { + ByteBuffer tmpBuffer = byteBuffer.duplicate(); + tmpBuffer.position(tmpBuffer.position() - totalSize); + tmpBuffer.limit(tmpBuffer.position() + totalSize - CommitLog.CRC32_RESERVED_LEN); + int crc = UtilAll.crc32(tmpBuffer); + if (crc != expectedCRC) { + log.warn( + "CommitLog#checkAndDispatchMessage: failed to check message CRC, expected " + + "CRC={}, actual CRC={}", bodyCRC, crc); + return new DispatchRequest(-1, false/* success */); + } + } else { + log.warn( + "CommitLog#checkAndDispatchMessage: failed to check message CRC, not found CRC in properties"); + return new DispatchRequest(-1, false/* success */); + } + } + } + int readLength = MessageExtEncoder.calMsgLength(messageVersion, sysFlag, bodyLen, topicLen, propertiesLength); if (totalSize != readLength) { doNothingForDeadCode(reconsumeTimes); @@ -846,9 +899,12 @@ public class CommitLog implements Swappable { if (!defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) { msg.setStoreTimestamp(System.currentTimeMillis()); } - // Set the message body CRC (consider the most appropriate setting on the client) msg.setBodyCRC(UtilAll.crc32(msg.getBody())); + if (enabledAppendPropCRC) { + // delete crc32 properties if exist + msg.deleteProperty(MessageConst.PROPERTY_CRC32); + } // Back to Results AppendMessageResult result = null; @@ -1764,6 +1820,7 @@ public class CommitLog implements Swappable { private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4; // Store the message content private final ByteBuffer msgStoreItemMemory; + private final int crc32ReservedLength = CommitLog.CRC32_RESERVED_LEN; DefaultAppendMessageCallback() { this.msgStoreItemMemory = ByteBuffer.allocate(END_FILE_MIN_BLANK_LENGTH); @@ -1837,6 +1894,15 @@ public class CommitLog implements Swappable { pos += 8 + 4 + 8 + ipLen; // refresh store time stamp in lock preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp()); + if (enabledAppendPropCRC) { + // 18 CRC32 + int checkSize = msgLen - crc32ReservedLength; + ByteBuffer tmpBuffer = preEncodeBuffer.duplicate(); + tmpBuffer.limit(tmpBuffer.position() + checkSize); + int crc32 = UtilAll.crc32(tmpBuffer); + tmpBuffer.limit(tmpBuffer.position() + crc32ReservedLength); + MessageDecoder.createCrc32(tmpBuffer, crc32); + } final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); CommitLog.this.getMessageStore().getPerfCounter().startTick("WRITE_MEMORY_TIME_MS"); @@ -1918,6 +1984,15 @@ public class CommitLog implements Swappable { pos += 8 + 4 + 8 + bornHostLength; // refresh store time stamp in lock messagesByteBuff.putLong(pos, messageExtBatch.getStoreTimestamp()); + if (enabledAppendPropCRC) { + //append crc32 + int checkSize = msgLen - crc32ReservedLength; + ByteBuffer tmpBuffer = messagesByteBuff.duplicate(); + tmpBuffer.position(msgPos).limit(msgPos + checkSize); + int crc32 = UtilAll.crc32(tmpBuffer); + messagesByteBuff.position(msgPos + checkSize); + MessageDecoder.createCrc32(messagesByteBuff, crc32); + } putMessageContext.getPhyPos()[index++] = wroteOffset + totalMsgLen - msgLen; queueOffset++; diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java index ee609a337..c1d808728 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.store; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.UnpooledByteBufAllocator; +import java.nio.ByteBuffer; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageDecoder; @@ -29,8 +30,6 @@ import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; -import java.nio.ByteBuffer; - public class MessageExtEncoder { protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private ByteBuf byteBuf; @@ -38,7 +37,13 @@ public class MessageExtEncoder { private int maxMessageBodySize; // The maximum length of the full message. private int maxMessageSize; + private final int crc32ReservedLength; + public MessageExtEncoder(final int maxMessageBodySize) { + this(maxMessageBodySize, false); + } + + public MessageExtEncoder(final int maxMessageBodySize, boolean enabledAppendPropCRC) { ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT; //Reserve 64kb for encoding buffer outside body int maxMessageSize = Integer.MAX_VALUE - maxMessageBodySize >= 64 * 1024 ? @@ -46,6 +51,7 @@ public class MessageExtEncoder { byteBuf = alloc.directBuffer(maxMessageSize); this.maxMessageBodySize = maxMessageBodySize; this.maxMessageSize = maxMessageSize; + this.crc32ReservedLength = enabledAppendPropCRC ? CommitLog.CRC32_RESERVED_LEN : 0; } public static int calMsgLength(MessageVersion messageVersion, @@ -81,10 +87,13 @@ public class MessageExtEncoder { final byte[] propertiesData = msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); - final int propertiesLength = propertiesData == null ? 0 : propertiesData.length; + boolean needAppendLastPropertySeparator = crc32ReservedLength > 0 && propertiesData != null && propertiesData.length > 0 + && propertiesData[propertiesData.length - 1] != MessageDecoder.PROPERTY_SEPARATOR; + + final int propertiesLength = (propertiesData == null ? 0 : propertiesData.length) + (needAppendLastPropertySeparator ? 1 : 0) + crc32ReservedLength; if (propertiesLength > Short.MAX_VALUE) { - log.warn("putMessage message properties length too long. length={}", propertiesData.length); + log.warn("putMessage message properties length too long. length={}", propertiesLength); return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null); } @@ -160,8 +169,14 @@ public class MessageExtEncoder { // 17 PROPERTIES this.byteBuf.writeShort((short) propertiesLength); - if (propertiesLength > 0) + if (propertiesLength > crc32ReservedLength) { this.byteBuf.writeBytes(propertiesData); + } + if (needAppendLastPropertySeparator) { + this.byteBuf.writeByte((byte) MessageDecoder.PROPERTY_SEPARATOR); + } + // 18 CRC32 + this.byteBuf.writerIndex(this.byteBuf.writerIndex() + crc32ReservedLength); return null; } @@ -213,10 +228,11 @@ public class MessageExtEncoder { final byte[] topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); final int topicLength = topicData.length; - final int topicLengthSize = messageExtBatch.getVersion().getTopicLengthSize(); int totalPropLen = needAppendLastPropertySeparator ? - propertiesLen + batchPropLen + topicLengthSize : propertiesLen + batchPropLen; + propertiesLen + batchPropLen + 1 : propertiesLen + batchPropLen; + // properties need to add crc32 + totalPropLen += crc32ReservedLength; final int msgLen = calMsgLength( messageExtBatch.getVersion(), messageExtBatch.getSysFlag(), bodyLen, topicLength, totalPropLen); @@ -278,6 +294,7 @@ public class MessageExtEncoder { } this.byteBuf.writeBytes(batchPropData, 0, batchPropLen); } + this.byteBuf.writerIndex(this.byteBuf.writerIndex() + crc32ReservedLength); } putMessageContext.setBatchSize(batchSize); putMessageContext.setPhyPos(new long[batchSize]); @@ -304,8 +321,13 @@ public class MessageExtEncoder { static class PutMessageThreadLocal { private final MessageExtEncoder encoder; private final StringBuilder keyBuilder; + PutMessageThreadLocal(int size) { - encoder = new MessageExtEncoder(size); + this(size, false); + } + + PutMessageThreadLocal(int size, boolean enabledAppendPropCRC) { + encoder = new MessageExtEncoder(size, enabledAppendPropCRC); keyBuilder = new StringBuilder(); } diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 028facbdc..8cb3ea6e9 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -270,6 +270,12 @@ public class MessageStoreConfig { */ private boolean autoMessageVersionOnTopicLen = true; + /** + * It cannot be changed after the broker is started. + * Modifications need to be restarted to take effect. + */ + private boolean enabledAppendPropCRC = false; + private boolean forceVerifyPropCRC = false; private int travelCqFileNumWhenGetMessage = 1; // Sleep interval between to corrections private int correctLogicMinOffsetSleepInterval = 1; @@ -405,6 +411,14 @@ public class MessageStoreConfig { private int topicQueueLockNum = 32; + public boolean isEnabledAppendPropCRC() { + return enabledAppendPropCRC; + } + + public void setEnabledAppendPropCRC(boolean enabledAppendPropCRC) { + this.enabledAppendPropCRC = enabledAppendPropCRC; + } + public boolean isDebugLockEnable() { return debugLockEnable; } @@ -640,6 +654,15 @@ public class MessageStoreConfig { this.checkCRCOnRecover = checkCRCOnRecover; } + public boolean isForceVerifyPropCRC() { + return forceVerifyPropCRC; + } + + public void setForceVerifyPropCRC(boolean forceVerifyPropCRC) { + this.forceVerifyPropCRC = forceVerifyPropCRC; + } + + public String getStorePathCommitLog() { if (storePathCommitLog == null) { return storePathRootDir + File.separator + "commitlog"; diff --git a/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java b/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java new file mode 100644 index 000000000..c8ed4d74d --- /dev/null +++ b/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.store; + +import java.io.File; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageExtBatch; +import org.apache.rocketmq.common.message.MessageExtBrokerInner; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class AppendPropCRCTest { + + AppendMessageCallback callback; + + MessageExtEncoder encoder; + + CommitLog commitLog; + + @Before + public void init() throws Exception { + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig.setMappedFileSizeCommitLog(1024 * 8); + messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 4); + messageStoreConfig.setMaxHashSlotNum(100); + messageStoreConfig.setMaxIndexNum(100 * 10); + messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore"); + messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore" + File.separator + "commitlog"); + messageStoreConfig.setForceVerifyPropCRC(true); + messageStoreConfig.setEnabledAppendPropCRC(true); + //too much reference + DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig(), new ConcurrentHashMap<>()); + commitLog = new CommitLog(messageStore); + encoder = new MessageExtEncoder(10 * 1024 * 1024, true); + callback = commitLog.new DefaultAppendMessageCallback(); + } + + @After + public void destroy() { + UtilAll.deleteFile(new File(System.getProperty("user.home") + File.separator + "unitteststore")); + } + + @Test + public void testAppendMessageSucc() throws Exception { + String topic = "test-topic"; + int queue = 0; + int msgNum = 10; + int propertiesLen = 0; + Message msg = new Message(); + msg.setBody("body".getBytes()); + msg.setTopic(topic); + msg.setTags("abc"); + msg.putUserProperty("a", "aaaaaaaa"); + msg.putUserProperty("b", "bbbbbbbb"); + msg.putUserProperty("c", "cccccccc"); + msg.putUserProperty("d", "dddddddd"); + msg.putUserProperty("e", "eeeeeeee"); + msg.putUserProperty("f", "ffffffff"); + + MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner(); + messageExtBrokerInner.setTopic(topic); + messageExtBrokerInner.setQueueId(queue); + messageExtBrokerInner.setBornTimestamp(System.currentTimeMillis()); + messageExtBrokerInner.setBornHost(new InetSocketAddress("127.0.0.1", 123)); + messageExtBrokerInner.setStoreHost(new InetSocketAddress("127.0.0.1", 124)); + messageExtBrokerInner.setBody(msg.getBody()); + messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); + propertiesLen = messageExtBrokerInner.getPropertiesString().length(); + + ByteBuffer buff = ByteBuffer.allocate(1024 * 10); + for (int i = 0; i < msgNum; i++) { + encoder.encode(messageExtBrokerInner); + messageExtBrokerInner.setEncodedBuff(encoder.getEncoderBuffer()); + AppendMessageResult allresult = callback.doAppend(0, buff, 1024 * 10, messageExtBrokerInner, null); + assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus()); + } + // Expected to pass when message is not modified + buff.flip(); + for (int i = 0; i < msgNum - 1; i++) { + DispatchRequest request = commitLog.checkMessageAndReturnSize(buff, true, false); + assertTrue(request.isSuccess()); + } + // Modify the properties of the last message and expect the verification to fail. + int idx = buff.limit() - (propertiesLen / 2); + buff.put(idx, (byte) (buff.get(idx) + 1)); + DispatchRequest request = commitLog.checkMessageAndReturnSize(buff, true, false); + assertFalse(request.isSuccess()); + } + + @Test + public void testAppendMessageBatchSucc() throws Exception { + List<Message> messages = new ArrayList<>(); + String topic = "test-topic"; + int queue = 0; + int propertiesLen = 0; + for (int i = 0; i < 10; i++) { + Message msg = new Message(); + msg.setBody("body".getBytes()); + msg.setTopic(topic); + msg.setTags("abc"); + msg.putUserProperty("a", "aaaaaaaa"); + msg.putUserProperty("b", "bbbbbbbb"); + msg.putUserProperty("c", "cccccccc"); + msg.putUserProperty("d", "dddddddd"); + msg.putUserProperty("e", "eeeeeeee"); + msg.putUserProperty("f", "ffffffff"); + String propertiesString = MessageDecoder.messageProperties2String(msg.getProperties()); + propertiesLen = propertiesString.length(); + messages.add(msg); + } + MessageExtBatch messageExtBatch = new MessageExtBatch(); + messageExtBatch.setTopic(topic); + messageExtBatch.setQueueId(queue); + messageExtBatch.setBornTimestamp(System.currentTimeMillis()); + messageExtBatch.setBornHost(new InetSocketAddress("127.0.0.1", 123)); + messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1", 124)); + messageExtBatch.setBody(MessageDecoder.encodeMessages(messages)); + + PutMessageContext putMessageContext = new PutMessageContext(topic + "-" + queue); + messageExtBatch.setEncodedBuff(encoder.encode(messageExtBatch, putMessageContext)); + ByteBuffer buff = ByteBuffer.allocate(1024 * 10); + //encounter end of file when append half of the data + AppendMessageResult allresult = + callback.doAppend(0, buff, 1024 * 10, messageExtBatch, putMessageContext); + + assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus()); + assertEquals(0, allresult.getWroteOffset()); + assertEquals(0, allresult.getLogicsOffset()); + assertEquals(buff.position(), allresult.getWroteBytes()); + + assertEquals(messages.size(), allresult.getMsgNum()); + + Set<String> msgIds = new HashSet<>(); + for (String msgId : allresult.getMsgId().split(",")) { + assertEquals(32, msgId.length()); + msgIds.add(msgId); + } + assertEquals(messages.size(), msgIds.size()); + + List<MessageExt> decodeMsgs = MessageDecoder.decodes((ByteBuffer) buff.flip()); + assertEquals(decodeMsgs.size(), decodeMsgs.size()); + long queueOffset = decodeMsgs.get(0).getQueueOffset(); + long storeTimeStamp = decodeMsgs.get(0).getStoreTimestamp(); + for (int i = 0; i < messages.size(); i++) { + assertEquals(messages.get(i).getTopic(), decodeMsgs.get(i).getTopic()); + assertEquals(new String(messages.get(i).getBody()), new String(decodeMsgs.get(i).getBody())); + assertEquals(messages.get(i).getTags(), decodeMsgs.get(i).getTags()); + + assertEquals(messageExtBatch.getBornHostNameString(), decodeMsgs.get(i).getBornHostNameString()); + + assertEquals(messageExtBatch.getBornTimestamp(), decodeMsgs.get(i).getBornTimestamp()); + assertEquals(storeTimeStamp, decodeMsgs.get(i).getStoreTimestamp()); + assertEquals(queueOffset++, decodeMsgs.get(i).getQueueOffset()); + } + + // Expected to pass when message is not modified + buff.flip(); + for (int i = 0; i < messages.size() - 1; i++) { + DispatchRequest request = commitLog.checkMessageAndReturnSize(buff, true, false); + assertTrue(request.isSuccess()); + } + // Modify the properties of the last message and expect the verification to fail. + int idx = buff.limit() - (propertiesLen / 2); + buff.put(idx, (byte) (buff.get(idx) + 1)); + DispatchRequest request = commitLog.checkMessageAndReturnSize(buff, true, false); + assertFalse(request.isSuccess()); + } +} diff --git a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java index 43ca38eb4..768029ca1 100644 --- a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java @@ -108,7 +108,7 @@ public class BatchPutMessageTest { short propertiesLength = (short) propertiesBytes.length; final byte[] topicData = msg.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); final int topicLength = topicData.length; - msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength, propertiesLength + batchPropLen + 1) + msgLengthArr[j - 1]; + msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength, propertiesLength + batchPropLen) + msgLengthArr[j - 1]; j++; } byte[] batchMessageBody = MessageDecoder.encodeMessages(messages); diff --git a/store/src/test/java/org/apache/rocketmq/store/MessageExtBrokerInnerTest.java b/store/src/test/java/org/apache/rocketmq/store/MessageExtBrokerInnerTest.java new file mode 100644 index 000000000..415dc3811 --- /dev/null +++ b/store/src/test/java/org/apache/rocketmq/store/MessageExtBrokerInnerTest.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.store; + +import org.apache.rocketmq.common.message.MessageExtBrokerInner; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class MessageExtBrokerInnerTest { + @Test + public void testDeleteProperty() { + MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner(); + String propertiesString = ""; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(""); + + propertiesString = "KeyA\u0001ValueA"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(""); + + propertiesString = "KeyA\u0001ValueA\u0002"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(""); + + propertiesString = "KeyA\u0001ValueA\u0002KeyA\u0001ValueA"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(""); + + propertiesString = "KeyA\u0001ValueA\u0002KeyA\u0001ValueA\u0002"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(""); + + propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002"); + + propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA\u0002"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002"); + + propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA\u0002KeyB\u0001ValueB\u0002"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002KeyB\u0001ValueB\u0002"); + + propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueB\u0002"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002"); + + propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueB"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB"); + + propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA\u0001ValueA\u0002"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA\u0001ValueA\u0002"); + + propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA\u0001"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA\u0001"); + + propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA"); + + propertiesString = "__CRC32#\u0001"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("__CRC32#"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEmpty(); + + propertiesString = "__CRC32#"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("__CRC32#"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(propertiesString); + } + +} diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java index db5c5af4c..7d659d2f6 100644 --- a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java +++ b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java @@ -465,7 +465,7 @@ public class AutoSwitchHATest { // Step2: check flag SynchronizingSyncStateSet Assert.assertTrue(masterHAService.isSynchronizingSyncStateSet()); - Assert.assertEquals(this.messageStore1.getConfirmOffset(), 1570); + Assert.assertEquals(this.messageStore1.getConfirmOffset(), 1580); Set<Long> syncStateSet = masterHAService.getSyncStateSet(); Assert.assertEquals(syncStateSet.size(), 2); Assert.assertTrue(syncStateSet.contains(1L)); diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java index 2e028ada3..588424304 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java @@ -74,7 +74,7 @@ public class CompositeQueueFlatFileTest { ByteBuffer message = MessageBufferUtilTest.buildMockedMessageBuffer(); AppendResult result = flatFile.appendCommitLog(message); Assert.assertEquals(AppendResult.SUCCESS, result); - Assert.assertEquals(122L, flatFile.commitLog.getFlatFile().getFileToWrite().getAppendPosition()); + Assert.assertEquals(123L, flatFile.commitLog.getFlatFile().getFileToWrite().getAppendPosition()); Assert.assertEquals(0L, flatFile.commitLog.getFlatFile().getFileToWrite().getCommitPosition()); flatFile = new CompositeQueueFlatFile(tieredFileAllocator, mq); diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java index 1f38d4f6c..a413f2113 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java @@ -47,7 +47,7 @@ public class MessageBufferUtilTest { + 8 //Prepared Transaction Offset + 4 + 0 //BODY + 2 + 0 //TOPIC - + 2 + 30 //properties + + 2 + 31 //properties + 0; public static ByteBuffer buildMockedMessageBuffer() { -- 2.32.0.windows.2 From 48ef5ced4639699e3ba207b1a648b1fd47649a69 Mon Sep 17 00:00:00 2001 From: rongtong <jinrongtong5@163.com> Date: Thu, 26 Oct 2023 14:43:24 +0800 Subject: [PATCH 2/2] [ISSUE #7505] Do not validate the length when deleting a topic --- .../rocketmq/broker/processor/AdminBrokerProcessor.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 0b7a6d206..004bf12ac 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -518,12 +518,13 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(ctx.channel())); String topic = requestHeader.getTopic(); - TopicValidator.ValidateTopicResult result = TopicValidator.validateTopic(topic); - if (!result.isValid()) { + + if (UtilAll.isBlank(topic)) { response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(result.getRemark()); + response.setRemark("The specified topic is blank."); return response; } + if (brokerController.getBrokerConfig().isValidateSystemTopicWhenUpdateTopic()) { if (TopicValidator.isSystemTopic(topic)) { response.setCode(ResponseCode.SYSTEM_ERROR); @@ -2726,7 +2727,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } final EpochEntryCache entryCache = new EpochEntryCache(brokerConfig.getBrokerClusterName(), - brokerConfig.getBrokerName(), brokerConfig.getBrokerId(), replicasManager.getEpochEntries(), this.brokerController.getMessageStore().getMaxPhyOffset()); + brokerConfig.getBrokerName(), brokerConfig.getBrokerId(), replicasManager.getEpochEntries(), this.brokerController.getMessageStore().getMaxPhyOffset()); response.setBody(entryCache.encode()); response.setCode(ResponseCode.SUCCESS); -- 2.32.0.windows.2
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.
浙ICP备2022010568号-2