Projects
Eulaceura:Factory
rocketmq
_service:obs_scm:patch032-backport-Clear-POP_CK...
Sign Up
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File _service:obs_scm:patch032-backport-Clear-POP_CK-when-sending-messages.patch of Package rocketmq
From 26fa0501482bbf31c2a64a33f329ab9744ac3800 Mon Sep 17 00:00:00 2001 From: fuyou001 <yubao.fyb@alibaba-inc.com> Date: Fri, 27 Oct 2023 16:28:17 +0800 Subject: [PATCH 1/3] [ISSUE #7501] The broker supports idempotence in creating topics (#7502) --- .../rocketmq/broker/processor/AdminBrokerProcessor.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 004bf12ac..fbba6633b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -440,6 +440,13 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } + if (topicConfig.equals(this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic))) { + LOGGER.info("Broker receive request to update or create topic={}, but topicConfig has no changes , so idempotent, caller address={}", + requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + response.setCode(ResponseCode.SUCCESS); + return response; + } + try { this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { -- 2.32.0.windows.2 From 46962c262c37554ff09afe9e02c7baf66a5ecc73 Mon Sep 17 00:00:00 2001 From: fujian-zfj <2573259572@qq.com> Date: Thu, 2 Nov 2023 13:47:16 +0800 Subject: [PATCH 2/3] [ISSUE #7523] Message will flush timeout when transientStorePoolEnable=true and flushDiskType=SYNC_FLUSH (#7524) * typo int readme[ecosystem] * enableTransientPool and sync_flush will cause flush_time_out * polish * add log --- .../org/apache/rocketmq/store/CommitLog.java | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 3d3ee86b8..6c3afde70 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -1634,12 +1634,21 @@ public class CommitLog implements Swappable { private void doCommit() { if (!this.requestsRead.isEmpty()) { for (GroupCommitRequest req : this.requestsRead) { - // There may be a message in the next file, so a maximum of - // two times the flush boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); - for (int i = 0; i < 2 && !flushOK; i++) { + for (int i = 0; i < 1000 && !flushOK; i++) { CommitLog.this.mappedFileQueue.flush(0); flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); + if (flushOK) { + break; + } else { + // When transientStorePoolEnable is true, the messages in writeBuffer may not be committed + // to pageCache very quickly, and flushOk here may almost be false, so we can sleep 1ms to + // wait for the messages to be committed to pageCache. + try { + Thread.sleep(1); + } catch (InterruptedException ignored) { + } + } } req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT); @@ -1846,7 +1855,7 @@ public class CommitLog implements Swappable { // Record ConsumeQueue information Long queueOffset = msgInner.getQueueOffset(); - // this msg maybe a inner-batch msg. + // this msg maybe an inner-batch msg. short messageNum = getMessageNum(msgInner); // Transaction messages that require special handling -- 2.32.0.windows.2 From 00965d8c11833237d5c9cd925664a1c456493cee Mon Sep 17 00:00:00 2001 From: lk <xdkxlk@outlook.com> Date: Mon, 6 Nov 2023 09:46:39 +0800 Subject: [PATCH 3/3] [ISSUE #7531] Clear POP_CK when sending messages (#7532) --- .../processor/SendMessageProcessor.java | 9 ++ .../common/message/MessageExtBrokerInner.java | 44 +------- .../rocketmq/common/utils/MessageUtils.java | 48 +++++++++ .../pop/PopMessageAndForwardingIT.java | 102 ++++++++++++++++++ 4 files changed, 161 insertions(+), 42 deletions(-) create mode 100644 test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopMessageAndForwardingIT.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 9625689a8..956ef43fb 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -47,6 +47,7 @@ import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.common.utils.CleanupPolicyUtils; +import org.apache.rocketmq.common.utils.MessageUtils; import org.apache.rocketmq.common.utils.QueueTypeUtils; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; @@ -106,6 +107,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement } RemotingCommand response; + clearReservedProperties(requestHeader); + if (requestHeader.isBatch()) { response = this.sendBatchMessage(ctx, request, sendMessageContext, requestHeader, mappingContext, (ctx1, response1) -> executeSendMessageHookAfter(response1, ctx1)); @@ -131,6 +134,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement return false; } + private void clearReservedProperties(SendMessageRequestHeader requestHeader) { + String properties = requestHeader.getProperties(); + properties = MessageUtils.deleteProperty(properties, MessageConst.PROPERTY_POP_CK); + requestHeader.setProperties(properties); + } + /** * If the response is not null, it meets some errors * diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java index 4e5d3419a..52501dbca 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java @@ -19,9 +19,7 @@ package org.apache.rocketmq.common.message; import java.nio.ByteBuffer; import org.apache.rocketmq.common.TopicFilterType; - -import static org.apache.rocketmq.common.message.MessageDecoder.NAME_VALUE_SEPARATOR; -import static org.apache.rocketmq.common.message.MessageDecoder.PROPERTY_SEPARATOR; +import org.apache.rocketmq.common.utils.MessageUtils; public class MessageExtBrokerInner extends MessageExt { private static final long serialVersionUID = 7256001576878700634L; @@ -62,45 +60,7 @@ public class MessageExtBrokerInner extends MessageExt { public void deleteProperty(String name) { super.clearProperty(name); if (propertiesString != null) { - int idx0 = 0; - int idx1; - int idx2; - idx1 = propertiesString.indexOf(name, idx0); - if (idx1 != -1) { - // cropping may be required - StringBuilder stringBuilder = new StringBuilder(propertiesString.length()); - while (true) { - int startIdx = idx0; - while (true) { - idx1 = propertiesString.indexOf(name, startIdx); - if (idx1 == -1) { - break; - } - startIdx = idx1 + name.length(); - if (idx1 == 0 || propertiesString.charAt(idx1 - 1) == PROPERTY_SEPARATOR) { - if (propertiesString.length() > idx1 + name.length() - && propertiesString.charAt(idx1 + name.length()) == NAME_VALUE_SEPARATOR) { - break; - } - } - } - if (idx1 == -1) { - // there are no characters that need to be skipped. Append all remaining characters. - stringBuilder.append(propertiesString, idx0, propertiesString.length()); - break; - } - // there are characters that need to be cropped - stringBuilder.append(propertiesString, idx0, idx1); - // move idx2 to the end of the cropped character - idx2 = propertiesString.indexOf(PROPERTY_SEPARATOR, idx1 + name.length() + 1); - // all subsequent characters will be cropped - if (idx2 == -1) { - break; - } - idx0 = idx2 + 1; - } - this.setPropertiesString(stringBuilder.toString()); - } + this.setPropertiesString(MessageUtils.deleteProperty(propertiesString, name)); } } diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/MessageUtils.java b/common/src/main/java/org/apache/rocketmq/common/utils/MessageUtils.java index 4d6a150ad..a6563bc92 100644 --- a/common/src/main/java/org/apache/rocketmq/common/utils/MessageUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/utils/MessageUtils.java @@ -25,6 +25,9 @@ import com.google.common.hash.Hashing; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; +import static org.apache.rocketmq.common.message.MessageDecoder.NAME_VALUE_SEPARATOR; +import static org.apache.rocketmq.common.message.MessageDecoder.PROPERTY_SEPARATOR; + public class MessageUtils { public static int getShardingKeyIndex(String shardingKey, int indexSize) { @@ -47,4 +50,49 @@ public class MessageUtils { } return indexSet; } + + public static String deleteProperty(String propertiesString, String name) { + if (propertiesString != null) { + int idx0 = 0; + int idx1; + int idx2; + idx1 = propertiesString.indexOf(name, idx0); + if (idx1 != -1) { + // cropping may be required + StringBuilder stringBuilder = new StringBuilder(propertiesString.length()); + while (true) { + int startIdx = idx0; + while (true) { + idx1 = propertiesString.indexOf(name, startIdx); + if (idx1 == -1) { + break; + } + startIdx = idx1 + name.length(); + if (idx1 == 0 || propertiesString.charAt(idx1 - 1) == PROPERTY_SEPARATOR) { + if (propertiesString.length() > idx1 + name.length() + && propertiesString.charAt(idx1 + name.length()) == NAME_VALUE_SEPARATOR) { + break; + } + } + } + if (idx1 == -1) { + // there are no characters that need to be skipped. Append all remaining characters. + stringBuilder.append(propertiesString, idx0, propertiesString.length()); + break; + } + // there are characters that need to be cropped + stringBuilder.append(propertiesString, idx0, idx1); + // move idx2 to the end of the cropped character + idx2 = propertiesString.indexOf(PROPERTY_SEPARATOR, idx1 + name.length() + 1); + // all subsequent characters will be cropped + if (idx2 == -1) { + break; + } + idx0 = idx2 + 1; + } + return stringBuilder.toString(); + } + } + return propertiesString; + } } diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopMessageAndForwardingIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopMessageAndForwardingIT.java new file mode 100644 index 000000000..52a0c277c --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopMessageAndForwardingIT.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.consumer.pop; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.rocketmq.client.consumer.PopResult; +import org.apache.rocketmq.client.consumer.PopStatus; +import org.apache.rocketmq.common.attribute.CQType; +import org.apache.rocketmq.common.attribute.TopicMessageType; +import org.apache.rocketmq.common.constant.ConsumeInitMode; +import org.apache.rocketmq.common.filter.ExpressionType; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil; +import org.apache.rocketmq.test.base.IntegrationTestBase; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.client.rmq.RMQPopClient; +import org.apache.rocketmq.test.util.MQRandomUtils; +import org.junit.Before; +import org.junit.Test; + +import static org.awaitility.Awaitility.await; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +public class PopMessageAndForwardingIT extends BasePop { + + protected String topic; + protected String group; + protected RMQNormalProducer producer = null; + protected RMQPopClient client = null; + protected String broker1Addr; + protected MessageQueue broker1MessageQueue; + protected String broker2Addr; + protected MessageQueue broker2MessageQueue; + + @Before + public void setUp() { + broker1Addr = brokerController1.getBrokerAddr(); + broker2Addr = brokerController2.getBrokerAddr(); + topic = MQRandomUtils.getRandomTopic(); + group = initConsumerGroup(); + IntegrationTestBase.initTopic(topic, NAMESRV_ADDR, BROKER1_NAME, 8, CQType.SimpleCQ, TopicMessageType.NORMAL); + IntegrationTestBase.initTopic(topic, NAMESRV_ADDR, BROKER2_NAME, 8, CQType.SimpleCQ, TopicMessageType.NORMAL); + producer = getProducer(NAMESRV_ADDR, topic); + client = getRMQPopClient(); + broker1MessageQueue = new MessageQueue(topic, BROKER1_NAME, -1); + broker2MessageQueue = new MessageQueue(topic, BROKER2_NAME, -1); + } + + @Test + public void test() { + producer.send(1, broker1MessageQueue); + + AtomicReference<MessageExt> firstMessageExtRef = new AtomicReference<>(); + await().atMost(Duration.ofSeconds(3)).until(() -> { + PopResult popResult = client.popMessageAsync(broker1Addr, broker1MessageQueue, 3000, 32, group, 1000, + true, ConsumeInitMode.MIN, false, ExpressionType.TAG, "*").get(); + if (!popResult.getPopStatus().equals(PopStatus.FOUND)) { + return false; + } + firstMessageExtRef.set(popResult.getMsgFoundList().get(0)); + return true; + }); + + producer.sendMQ(firstMessageExtRef.get(), broker2MessageQueue); + AtomicReference<MessageExt> secondMessageExtRef = new AtomicReference<>(); + await().atMost(Duration.ofSeconds(3)).until(() -> { + PopResult popResult = client.popMessageAsync(broker2Addr, broker2MessageQueue, 3000, 32, group, 1000, + true, ConsumeInitMode.MIN, false, ExpressionType.TAG, "*").get(); + if (!popResult.getPopStatus().equals(PopStatus.FOUND)) { + return false; + } + secondMessageExtRef.set(popResult.getMsgFoundList().get(0)); + return true; + }); + + assertEquals(firstMessageExtRef.get().getMsgId(), secondMessageExtRef.get().getMsgId()); + String firstPopCk = firstMessageExtRef.get().getProperty(MessageConst.PROPERTY_POP_CK); + String secondPopCk = secondMessageExtRef.get().getProperty(MessageConst.PROPERTY_POP_CK); + assertNotEquals(firstPopCk, secondPopCk); + assertEquals(BROKER1_NAME, ExtraInfoUtil.getBrokerName(ExtraInfoUtil.split(firstPopCk))); + assertEquals(BROKER2_NAME, ExtraInfoUtil.getBrokerName(ExtraInfoUtil.split(secondPopCk))); + } +} -- 2.32.0.windows.2
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.
浙ICP备2022010568号-2