Projects
home:Eustace:branches:Eulaceura:Factory
rocketmq
_service:obs_scm:patch037-backport-Retry-topic-...
Sign Up
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File _service:obs_scm:patch037-backport-Retry-topic-v2-in-pop.patch of Package rocketmq
From ca721b0145994d7f5e67b4d2fe3b7a4ad7a1c132 Mon Sep 17 00:00:00 2001 From: zhanghong <985492783@qq.com> Date: Tue, 21 Nov 2023 14:03:24 +0800 Subject: [PATCH 1/3] [ISSUE #7462] Remove deprecated LocalTransactionExecuter (#7463) --- .../impl/producer/DefaultMQProducerImpl.java | 9 +++---- .../client/producer/DefaultMQProducer.java | 16 ----------- .../producer/LocalTransactionExecuter.java | 27 ------------------- .../rocketmq/client/producer/MQProducer.java | 3 --- .../producer/TransactionMQProducer.java | 16 ----------- .../client.producer.DefaultMQProducer.schema | 1 - 6 files changed, 4 insertions(+), 68 deletions(-) delete mode 100644 client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index b0c212e46..545f17d93 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -54,7 +54,6 @@ import org.apache.rocketmq.client.latency.MQFaultStrategy; import org.apache.rocketmq.client.latency.Resolver; import org.apache.rocketmq.client.latency.ServiceDetector; import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.client.producer.LocalTransactionExecuter; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.RequestCallback; @@ -1379,10 +1378,10 @@ public class DefaultMQProducerImpl implements MQProducerInner { } public TransactionSendResult sendMessageInTransaction(final Message msg, - final LocalTransactionExecuter localTransactionExecuter, final Object arg) + final TransactionListener localTransactionListener, final Object arg) throws MQClientException { TransactionListener transactionListener = getCheckListener(); - if (null == localTransactionExecuter && null == transactionListener) { + if (null == localTransactionListener && null == transactionListener) { throw new MQClientException("tranExecutor is null", null); } @@ -1414,8 +1413,8 @@ public class DefaultMQProducerImpl implements MQProducerInner { if (null != transactionId && !"".equals(transactionId)) { msg.setTransactionId(transactionId); } - if (null != localTransactionExecuter) { - localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); + if (null != localTransactionListener) { + localTransactionState = localTransactionListener.executeLocalTransaction(msg, arg); } else { log.debug("Used new transaction API"); localTransactionState = transactionListener.executeLocalTransaction(msg, arg); diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index c5b1b5223..7bd3876f5 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -853,22 +853,6 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { this.defaultMQProducerImpl.sendOneway(msg, selector, arg); } - /** - * This method is to send transactional messages. - * - * @param msg Transactional message to send. - * @param tranExecuter local transaction executor. - * @param arg Argument used along with local transaction executor. - * @return Transaction result. - * @throws MQClientException if there is any client error. - */ - @Override - public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, - final Object arg) - throws MQClientException { - throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class"); - } - /** * This method is used to send transactional messages. * diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java deleted file mode 100644 index 267ba10bd..000000000 --- a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.client.producer; - -import org.apache.rocketmq.common.message.Message; - -/** - * @deprecated This interface will be removed in the version 5.0.0, interface {@link TransactionListener} is recommended. - */ -@Deprecated -public interface LocalTransactionExecuter { - LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg); -} diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java index 78657e623..8bd30e98d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java @@ -81,9 +81,6 @@ public interface MQProducer extends MQAdmin { void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg) throws MQClientException, RemotingException, InterruptedException; - TransactionSendResult sendMessageInTransaction(final Message msg, - final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException; - TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException; diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java index baa8b4408..d529f3e77 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java @@ -67,22 +67,6 @@ public class TransactionMQProducer extends DefaultMQProducer { this.defaultMQProducerImpl.destroyTransactionEnv(); } - /** - * This method will be removed in the version 5.0.0, method <code>sendMessageInTransaction(Message,Object)</code>} - * is recommended. - */ - @Override - @Deprecated - public TransactionSendResult sendMessageInTransaction(final Message msg, - final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException { - if (null == this.transactionCheckListener) { - throw new MQClientException("localTransactionBranchCheckListener is null", null); - } - - msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic())); - return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg); - } - @Override public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException { diff --git a/test/src/test/resources/schema/api/client.producer.DefaultMQProducer.schema b/test/src/test/resources/schema/api/client.producer.DefaultMQProducer.schema index 0418c73fe..d1111fb45 100644 --- a/test/src/test/resources/schema/api/client.producer.DefaultMQProducer.schema +++ b/test/src/test/resources/schema/api/client.producer.DefaultMQProducer.schema @@ -122,7 +122,6 @@ Method send(org.apache.rocketmq.client.producer.SendCallback,org.apache.rocketmq Method send(org.apache.rocketmq.client.producer.SendCallback,org.apache.rocketmq.common.message.Message,org.apache.rocketmq.common.message.MessageQueue) : public throws (void) Method send(org.apache.rocketmq.common.message.Message) : public throws (org.apache.rocketmq.client.producer.SendResult) Method send(org.apache.rocketmq.common.message.Message,org.apache.rocketmq.common.message.MessageQueue) : public throws (org.apache.rocketmq.client.producer.SendResult) -Method sendMessageInTransaction(java.lang.Object,org.apache.rocketmq.client.producer.LocalTransactionExecuter,org.apache.rocketmq.common.message.Message) : public throws (org.apache.rocketmq.client.producer.TransactionSendResult) Method sendMessageInTransaction(java.lang.Object,org.apache.rocketmq.common.message.Message) : public throws (org.apache.rocketmq.client.producer.TransactionSendResult) Method sendOneway(java.lang.Object,org.apache.rocketmq.client.producer.MessageQueueSelector,org.apache.rocketmq.common.message.Message) : public throws (void) Method sendOneway(org.apache.rocketmq.common.message.Message) : public throws (void) -- 2.32.0.windows.2 From a7d493b2fbc153cc6cbdf2b2ffcbf19cf7cba803 Mon Sep 17 00:00:00 2001 From: panzhi <panzhi33@qq.com> Date: Tue, 21 Nov 2023 20:55:35 +0800 Subject: [PATCH 2/3] transactionProducer get the topic route before sending the message (#7569) --- .../impl/producer/DefaultMQProducerImpl.java | 15 +++++ .../client/producer/DefaultMQProducer.java | 63 +++++++++++++++++++ .../producer/TransactionMQProducer.java | 23 +++++-- .../transaction/TransactionProducer.java | 3 +- 4 files changed, 98 insertions(+), 6 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 545f17d93..088bff089 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -262,6 +262,8 @@ public class DefaultMQProducerImpl implements MQProducerInner { mQClientFactory.start(); } + this.initTopicRoute(); + this.mqFaultStrategy.startDetector(); log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), @@ -1740,6 +1742,19 @@ public class DefaultMQProducerImpl implements MQProducerInner { } } + private void initTopicRoute() { + List<String> topics = this.defaultMQProducer.getTopics(); + if (topics != null && topics.size() > 0) { + topics.forEach(topic -> { + String newTopic = NamespaceUtil.wrapNamespace(this.defaultMQProducer.getNamespace(), topic); + TopicPublishInfo topicPublishInfo = tryToFindTopicPublishInfo(newTopic); + if (topicPublishInfo == null || !topicPublishInfo.ok()) { + log.warn("No route info of this topic: " + newTopic + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO)); + } + }); + } + } + public ConcurrentMap<String, TopicPublishInfo> getTopicPublishInfoTable() { return topicPublishInfoTable; } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 7bd3876f5..700e00aac 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -87,6 +87,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { */ private String producerGroup; + /** + * Topics that need to be initialized for transaction producer + */ + private List<String> topics; + /** * Just for testing or demo program */ @@ -235,6 +240,22 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this); } + /** + * Constructor specifying namespace, producer group, topics and RPC hook. + * + * @param namespace Namespace for this MQ Producer instance. + * @param producerGroup Producer group, see the name-sake field. + * @param topics Topic that needs to be initialized for routing + * @param rpcHook RPC hook to execute per each remoting command execution. + */ + public DefaultMQProducer(final String namespace, final String producerGroup, final List<String> topics, RPCHook rpcHook) { + this.namespace = namespace; + this.producerGroup = producerGroup; + this.topics = topics; + defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); + produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this); + } + /** * Constructor specifying producer group and enabled msg trace flag. * @@ -290,6 +311,41 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { } } + /** + * Constructor specifying namespace, producer group, topics, RPC hook, enabled msgTrace flag and customized trace topic + * name. + * + * @param namespace Namespace for this MQ Producer instance. + * @param producerGroup Producer group, see the name-sake field. + * @param topics Topic that needs to be initialized for routing + * @param rpcHook RPC hook to execute per each remoting command execution. + * @param enableMsgTrace Switch flag instance for message trace. + * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default + * trace topic name. + */ + public DefaultMQProducer(final String namespace, final String producerGroup, final List<String> topics, + RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic) { + this.namespace = namespace; + this.producerGroup = producerGroup; + this.topics = topics; + defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); + produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this); + //if client open the message trace feature + if (enableMsgTrace) { + try { + AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook); + dispatcher.setHostProducer(this.defaultMQProducerImpl); + traceDispatcher = dispatcher; + this.defaultMQProducerImpl.registerSendMessageHook( + new SendMessageTraceHookImpl(traceDispatcher)); + this.defaultMQProducerImpl.registerEndTransactionHook( + new EndTransactionTraceHookImpl(traceDispatcher)); + } catch (Throwable e) { + logger.error("system mqtrace hook init failed ,maybe can't send msg trace data"); + } + } + } + @Override public void setUseTLS(boolean useTLS) { super.setUseTLS(useTLS); @@ -1316,4 +1372,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { defaultMQProducerImpl.setSemaphoreAsyncSendSize(backPressureForAsyncSendSize); } + public List<String> getTopics() { + return topics; + } + + public void setTopics(List<String> topics) { + this.topics = topics; + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java index d529f3e77..2c3b479f7 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.client.producer; +import java.util.List; import java.util.concurrent.ExecutorService; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.Message; @@ -36,19 +37,31 @@ public class TransactionMQProducer extends DefaultMQProducer { } public TransactionMQProducer(final String producerGroup) { - this(null, producerGroup, null); + this(null, producerGroup, null, null); + } + + public TransactionMQProducer(final String producerGroup, final List<String> topics) { + this(null, producerGroup, topics, null); } public TransactionMQProducer(final String namespace, final String producerGroup) { - this(namespace, producerGroup, null); + this(namespace, producerGroup, null, null); + } + + public TransactionMQProducer(final String namespace, final String producerGroup, final List<String> topics) { + this(namespace, producerGroup, topics, null); } public TransactionMQProducer(final String producerGroup, RPCHook rpcHook) { - this(null, producerGroup, rpcHook); + this(null, producerGroup, null, rpcHook); + } + + public TransactionMQProducer(final String producerGroup, final List<String> topics, RPCHook rpcHook) { + this(null, producerGroup, topics, rpcHook); } - public TransactionMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) { - super(namespace, producerGroup, rpcHook); + public TransactionMQProducer(final String namespace, final String producerGroup, final List<String> topics, RPCHook rpcHook) { + super(namespace, producerGroup, topics, rpcHook); } public TransactionMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic) { diff --git a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java index 5973c3c30..d1d57c55e 100644 --- a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java @@ -24,6 +24,7 @@ import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.io.UnsupportedEncodingException; +import java.util.Arrays; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; @@ -39,7 +40,7 @@ public class TransactionProducer { public static void main(String[] args) throws MQClientException, InterruptedException { TransactionListener transactionListener = new TransactionListenerImpl(); - TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP); + TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP, Arrays.asList(TOPIC)); // Uncomment the following line while debugging, namesrvAddr should be set to your local address // producer.setNamesrvAddr(DEFAULT_NAMESRVADDR); -- 2.32.0.windows.2 From 5b43387be33506e4c19df4783724d06b1dfdc062 Mon Sep 17 00:00:00 2001 From: Zhouxiang Zhan <zhouxzhan@apache.org> Date: Thu, 23 Nov 2023 14:53:48 +0800 Subject: [PATCH 3/3] [ISSUE #7543] Retry topic v2 in pop (#7544) * Implement pop retry topic v2 * Use pop retry topic v2 to notify the origin topic * add parse group * retry topic v2 compatibility * calculate consumer lag * delete retry topic --- .../acl/plain/PlainAccessResource.java | 3 +- .../ExpressionForRetryMessageFilter.java | 3 +- .../NotifyMessageArrivingListener.java | 3 +- .../longpolling/PopLongPollingService.java | 10 +++ .../broker/metrics/ConsumerLagCalculator.java | 11 ++++ .../processor/AdminBrokerProcessor.java | 4 ++ .../processor/NotificationProcessor.java | 2 +- .../broker/processor/PopMessageProcessor.java | 24 ++++++- .../broker/processor/PopReviveService.java | 9 --- .../processor/SendMessageProcessor.java | 3 +- .../apache/rocketmq/common/BrokerConfig.java | 10 +++ .../apache/rocketmq/common/KeyBuilder.java | 37 +++++++++-- .../rocketmq/common/KeyBuilderTest.java | 65 +++++++++++++++++++ .../consumer/ConsumerProgressSubCommand.java | 3 +- .../tools/monitor/MonitorService.java | 3 +- 15 files changed, 168 insertions(+), 22 deletions(-) create mode 100644 common/src/test/java/org/apache/rocketmq/common/KeyBuilderTest.java diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java index 72aa8ca71..1e185afff 100644 --- a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java +++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java @@ -48,6 +48,7 @@ import org.apache.rocketmq.acl.common.AuthenticationHeader; import org.apache.rocketmq.acl.common.AuthorizationHeader; import org.apache.rocketmq.acl.common.Permission; import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.PlainAccessConfig; @@ -341,7 +342,7 @@ public class PlainAccessResource implements AccessResource { if (retryTopic == null) { return null; } - return retryTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); + return KeyBuilder.parseGroup(retryTopic); } public static String getRetryTopic(String group) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java index bc01b21cb..cc3e37bf4 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.filter; import java.nio.ByteBuffer; import java.util.Map; +import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.message.MessageConst; @@ -62,7 +63,7 @@ public class ExpressionForRetryMessageFilter extends ExpressionMessageFilter { tempProperties = MessageDecoder.decodeProperties(msgBuffer); } String realTopic = tempProperties.get(MessageConst.PROPERTY_RETRY_TOPIC); - String group = subscriptionData.getTopic().substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); + String group = KeyBuilder.parseGroup(subscriptionData.getTopic()); realFilterData = this.consumerFilterManager.get(realTopic, group); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java index 3c099fe2f..e55ed2778 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java @@ -17,12 +17,11 @@ package org.apache.rocketmq.broker.longpolling; +import java.util.Map; import org.apache.rocketmq.broker.processor.NotificationProcessor; import org.apache.rocketmq.broker.processor.PopMessageProcessor; import org.apache.rocketmq.store.MessageArrivingListener; -import java.util.Map; - public class NotifyMessageArrivingListener implements MessageArrivingListener { private final PullRequestHoldService pullRequestHoldService; private final PopMessageProcessor popMessageProcessor; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java index 113c91297..f1bc9adc4 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java @@ -144,6 +144,16 @@ public class PopLongPollingService extends ServiceThread { } } + public void notifyMessageArrivingWithRetryTopic(final String topic, final int queueId) { + String notifyTopic; + if (KeyBuilder.isPopRetryTopicV2(topic)) { + notifyTopic = KeyBuilder.parseNormalTopic(topic); + } else { + notifyTopic = topic; + } + notifyMessageArriving(notifyTopic, queueId); + } + public void notifyMessageArriving(final String topic, final int queueId) { ConcurrentHashMap<String, Byte> cids = topicCidMap.get(topic); if (cids == null) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java index af08a83c7..d1f3fffde 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java @@ -185,6 +185,17 @@ public class ConsumerLagCalculator { continue; } } + if (brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) { + String retryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group); + TopicConfig retryTopicConfigV1 = topicConfigManager.selectTopicConfig(retryTopicV1); + if (retryTopicConfigV1 != null) { + int retryTopicPerm = retryTopicConfigV1.getPerm() & brokerConfig.getBrokerPermission(); + if (PermName.isReadable(retryTopicPerm) || PermName.isWriteable(retryTopicPerm)) { + consumer.accept(new ProcessGroupInfo(group, topic, true, retryTopicV1)); + continue; + } + } + } consumer.accept(new ProcessGroupInfo(group, topic, true, null)); } else { consumer.accept(new ProcessGroupInfo(group, topic, false, null)); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index fbba6633b..863b275d1 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -548,6 +548,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopic) != null) { deleteTopicInBroker(popRetryTopic); } + final String popRetryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group); + if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopicV1) != null) { + deleteTopicInBroker(popRetryTopicV1); + } } // delete topic deleteTopicInBroker(topic); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java index a15340383..91d275dfe 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java @@ -58,7 +58,7 @@ public class NotificationProcessor implements NettyRequestProcessor { } public void notifyMessageArriving(final String topic, final int queueId) { - popLongPollingService.notifyMessageArriving(topic, queueId); + popLongPollingService.notifyMessageArrivingWithRetryTopic(topic, queueId); } @Override diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index 7ed4d53ab..58baecc05 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -185,7 +185,7 @@ public class PopMessageProcessor implements NettyRequestProcessor { } public void notifyMessageArriving(final String topic, final int queueId) { - popLongPollingService.notifyMessageArriving(topic, queueId); + popLongPollingService.notifyMessageArrivingWithRetryTopic(topic, queueId); } public boolean notifyMessageArriving(final String topic, final String cid, final int queueId) { @@ -364,6 +364,17 @@ public class PopMessageProcessor implements NettyRequestProcessor { startOffsetInfo, msgOffsetInfo, finalOrderCountInfo)); } } + if (brokerController.getBrokerConfig().isRetrieveMessageFromPopRetryTopicV1()) { + TopicConfig retryTopicConfigV1 = + this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), requestHeader.getConsumerGroup())); + if (retryTopicConfigV1 != null) { + for (int i = 0; i < retryTopicConfigV1.getReadQueueNums(); i++) { + int queueId = (randomQ + i) % retryTopicConfigV1.getReadQueueNums(); + getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter, + startOffsetInfo, msgOffsetInfo, finalOrderCountInfo)); + } + } + } } if (requestHeader.getQueueId() < 0) { // read all queue @@ -388,6 +399,17 @@ public class PopMessageProcessor implements NettyRequestProcessor { startOffsetInfo, msgOffsetInfo, finalOrderCountInfo)); } } + if (brokerController.getBrokerConfig().isRetrieveMessageFromPopRetryTopicV1()) { + TopicConfig retryTopicConfigV1 = + this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), requestHeader.getConsumerGroup())); + if (retryTopicConfigV1 != null) { + for (int i = 0; i < retryTopicConfigV1.getReadQueueNums(); i++) { + int queueId = (randomQ + i) % retryTopicConfigV1.getReadQueueNums(); + getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter, + startOffsetInfo, msgOffsetInfo, finalOrderCountInfo)); + } + } + } } final RemotingCommand finalResponse = response; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java index 3fb689ed6..8d25bc57e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java @@ -142,15 +142,6 @@ public class PopReviveService extends ServiceThread { this.brokerController.getBrokerStatsManager().incBrokerPutNums(popCheckPoint.getTopic(), 1); this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic()); this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes()); - if (brokerController.getPopMessageProcessor() != null) { - brokerController.getPopMessageProcessor().notifyMessageArriving( - KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()), - popCheckPoint.getCId(), - -1 - ); - brokerController.getNotificationProcessor().notifyMessageArriving( - KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()), -1); - } return true; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 956ef43fb..4ec84c146 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -28,6 +28,7 @@ import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.metrics.BrokerMetricsManager; import org.apache.rocketmq.broker.mqtrace.SendMessageContext; import org.apache.rocketmq.common.AbortProcessException; +import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; @@ -178,7 +179,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement MessageExt msg, TopicConfig topicConfig, Map<String, String> properties) { String newTopic = requestHeader.getTopic(); if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { - String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); + String groupName = KeyBuilder.parseGroup(newTopic); SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName); if (null == subscriptionGroupConfig) { diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 0d248c4e1..c186352d1 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -223,6 +223,8 @@ public class BrokerConfig extends BrokerIdentity { private boolean enablePopBatchAck = false; private boolean enableNotifyAfterPopOrderLockRelease = true; private boolean initPopOffsetByCheckMsgInMem = true; + // read message from pop retry topic v1, for the compatibility, will be removed in the future version + private boolean retrieveMessageFromPopRetryTopicV1 = true; private boolean realTimeNotifyConsumerChange = true; @@ -1284,6 +1286,14 @@ public class BrokerConfig extends BrokerIdentity { this.initPopOffsetByCheckMsgInMem = initPopOffsetByCheckMsgInMem; } + public boolean isRetrieveMessageFromPopRetryTopicV1() { + return retrieveMessageFromPopRetryTopicV1; + } + + public void setRetrieveMessageFromPopRetryTopicV1(boolean retrieveMessageFromPopRetryTopicV1) { + this.retrieveMessageFromPopRetryTopicV1 = retrieveMessageFromPopRetryTopicV1; + } + public boolean isRealTimeNotifyConsumerChange() { return realTimeNotifyConsumerChange; } diff --git a/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java b/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java index e1532d939..f2a8c4089 100644 --- a/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java +++ b/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java @@ -18,24 +18,53 @@ package org.apache.rocketmq.common; public class KeyBuilder { public static final int POP_ORDER_REVIVE_QUEUE = 999; + private static final String POP_RETRY_SEPARATOR_V1 = "_"; + private static final String POP_RETRY_SEPARATOR_V2 = ":"; public static String buildPopRetryTopic(String topic, String cid) { - return MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + "_" + topic; + return MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V2 + topic; + } + + public static String buildPopRetryTopicV1(String topic, String cid) { + return MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V1 + topic; } public static String parseNormalTopic(String topic, String cid) { if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { - return topic.substring((MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + "_").length()); + if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V2)) { + return topic.substring((MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V2).length()); + } + return topic.substring((MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V1).length()); } else { return topic; } } + public static String parseNormalTopic(String retryTopic) { + if (isPopRetryTopicV2(retryTopic)) { + String[] result = retryTopic.split(POP_RETRY_SEPARATOR_V2); + if (result.length == 2) { + return result[1]; + } + } + return retryTopic; + } + + public static String parseGroup(String retryTopic) { + if (isPopRetryTopicV2(retryTopic)) { + String[] result = retryTopic.split(POP_RETRY_SEPARATOR_V2); + if (result.length == 2) { + return result[0].substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); + } + } + return retryTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); + } + public static String buildPollingKey(String topic, String cid, int queueId) { return topic + PopAckConstants.SPLIT + cid + PopAckConstants.SPLIT + queueId; } - public static String buildPollingNotificationKey(String topic, int queueId) { - return topic + PopAckConstants.SPLIT + queueId; + public static boolean isPopRetryTopicV2(String retryTopic) { + return retryTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && retryTopic.contains(POP_RETRY_SEPARATOR_V2); } } diff --git a/common/src/test/java/org/apache/rocketmq/common/KeyBuilderTest.java b/common/src/test/java/org/apache/rocketmq/common/KeyBuilderTest.java new file mode 100644 index 000000000..f83e0aa14 --- /dev/null +++ b/common/src/test/java/org/apache/rocketmq/common/KeyBuilderTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common; + +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class KeyBuilderTest { + String topic = "test-topic"; + String group = "test-group"; + + @Test + public void buildPopRetryTopic() { + assertThat(KeyBuilder.buildPopRetryTopic(topic, group)).isEqualTo(MixAll.RETRY_GROUP_TOPIC_PREFIX + group + ":" + topic); + } + + @Test + public void buildPopRetryTopicV1() { + assertThat(KeyBuilder.buildPopRetryTopicV1(topic, group)).isEqualTo(MixAll.RETRY_GROUP_TOPIC_PREFIX + group + "_" + topic); + } + + @Test + public void parseNormalTopic() { + String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group); + assertThat(KeyBuilder.parseNormalTopic(popRetryTopic, group)).isEqualTo(topic); + String popRetryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group); + assertThat(KeyBuilder.parseNormalTopic(popRetryTopicV1, group)).isEqualTo(topic); + } + + @Test + public void testParseNormalTopic() { + String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group); + assertThat(KeyBuilder.parseNormalTopic(popRetryTopic)).isEqualTo(topic); + } + + @Test + public void parseGroup() { + String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group); + assertThat(KeyBuilder.parseGroup(popRetryTopic)).isEqualTo(group); + } + + @Test + public void isPopRetryTopicV2() { + String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group); + assertThat(KeyBuilder.isPopRetryTopicV2(popRetryTopic)).isEqualTo(true); + String popRetryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group); + assertThat(KeyBuilder.isPopRetryTopicV2(popRetryTopicV1)).isEqualTo(false); + } +} \ No newline at end of file diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java index 97125b854..c489cad68 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java @@ -25,6 +25,7 @@ import java.util.Map; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; +import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; @@ -212,7 +213,7 @@ public class ConsumerProgressSubCommand implements SubCommand { TopicList topicList = defaultMQAdminExt.fetchAllTopicList(); for (String topic : topicList.getTopicList()) { if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { - String consumerGroup = topic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); + String consumerGroup = KeyBuilder.parseGroup(topic); try { ConsumeStats consumeStats = null; try { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java index 45dc3a036..b66dfad20 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java @@ -34,6 +34,7 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ThreadFactoryImpl; @@ -172,7 +173,7 @@ public class MonitorService { TopicList topicList = defaultMQAdminExt.fetchAllTopicList(); for (String topic : topicList.getTopicList()) { if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { - String consumerGroup = topic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); + String consumerGroup = KeyBuilder.parseGroup(topic); try { this.reportUndoneMsgs(consumerGroup); -- 2.32.0.windows.2
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.
浙ICP备2022010568号-2