Projects
home:Eustace:branches:Eulaceura:Factory
rocketmq
_service:obs_scm:patch007-backport-fix-some-bug...
Sign Up
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File _service:obs_scm:patch007-backport-fix-some-bugs.patch of Package rocketmq
From 90c5382aee07879a80309f257f04114201ccaac6 Mon Sep 17 00:00:00 2001 From: ShuangxiDing <dingshuangxi888@gmail.com> Date: Fri, 21 Jul 2023 20:28:58 +0800 Subject: [PATCH 01/10] [ISSUE #7061] Support forward HAProxyMessage for Multi Protocol server. (#7062) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Support dynamic modification of grpc tls mode to improve the scalability of ProtocolNegotiator * Support dynamic modification of grpc tls mode to improve the scalability of ProtocolNegotiator * [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator * [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator * [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator * [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator * Support proxy protocol for gRPC server. * Support proxy protocol for gRPC server. * Support proxy protocol for gRPC server. * Support proxy protocol for gRPC server. * Support proxy protocol for gRPC server. * Support proxy protocol for gRPC and Remoting server. * 回滚netty的升级 * Support proxy protocol for gRPC and Remoting server. * Support proxy protocol for gRPC and Remoting server. * Support proxy protocol for gRPC and Remoting server. * add grpc-netty-codec-haproxy in bazel * add grpc-netty-codec-haproxy in bazel * Support proxy protocol for gRPC and Remoting server. * Fix Test * add grpc-netty-codec-haproxy in bazel * add ProxyProtocolTest for Remoting * Support HAProxyMessage forward for multi protocol server. --------- Co-authored-by: 徒钟 <shuangxi.dsx@alibaba-inc.com> --- .../http2proxy/HAProxyMessageForwarder.java | 129 ++++++++++++++++++ .../http2proxy/Http2ProtocolProxyHandler.java | 23 +++- .../http2proxy/Http2ProxyBackendHandler.java | 2 + .../http2proxy/Http2ProxyFrontendHandler.java | 28 ++-- 4 files changed, 164 insertions(+), 18 deletions(-) create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java new file mode 100644 index 000000000..8f139d3d9 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting.protocol.http2proxy; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.haproxy.HAProxyCommand; +import io.netty.handler.codec.haproxy.HAProxyMessage; +import io.netty.handler.codec.haproxy.HAProxyProtocolVersion; +import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol; +import io.netty.handler.codec.haproxy.HAProxyTLV; +import io.netty.util.Attribute; +import io.netty.util.DefaultAttributeMap; +import org.apache.commons.codec.binary.Hex; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.rocketmq.acl.common.AclUtils; +import org.apache.rocketmq.common.constant.HAProxyConstants; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.remoting.netty.AttributeKeys; + +import java.lang.reflect.Field; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; + +public class HAProxyMessageForwarder extends ChannelInboundHandlerAdapter { + + private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME); + + private static final Field FIELD_ATTRIBUTE = + FieldUtils.getField(DefaultAttributeMap.class, "attributes", true); + + private final Channel outboundChannel; + + public HAProxyMessageForwarder(final Channel outboundChannel) { + this.outboundChannel = outboundChannel; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + try { + forwardHAProxyMessage(ctx.channel(), outboundChannel); + ctx.fireChannelRead(msg); + } catch (Exception e) { + log.error("Forward HAProxyMessage from Remoting to gRPC server error.", e); + throw e; + } finally { + ctx.pipeline().remove(this); + } + } + + private void forwardHAProxyMessage(Channel inboundChannel, Channel outboundChannel) throws Exception { + if (!inboundChannel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) { + return; + } + + if (!(inboundChannel instanceof DefaultAttributeMap)) { + return; + } + + Attribute<?>[] attributes = (Attribute<?>[]) FieldUtils.readField(FIELD_ATTRIBUTE, inboundChannel); + if (ArrayUtils.isEmpty(attributes)) { + return; + } + + String sourceAddress = null, destinationAddress = null; + int sourcePort = 0, destinationPort = 0; + List<HAProxyTLV> haProxyTLVs = new ArrayList<>(); + + for (Attribute<?> attribute : attributes) { + String attributeKey = attribute.key().name(); + if (!StringUtils.startsWith(attributeKey, HAProxyConstants.PROXY_PROTOCOL_PREFIX)) { + continue; + } + String attributeValue = (String) attribute.get(); + if (StringUtils.isEmpty(attributeValue)) { + continue; + } + if (attribute.key() == AttributeKeys.PROXY_PROTOCOL_ADDR) { + sourceAddress = attributeValue; + } + if (attribute.key() == AttributeKeys.PROXY_PROTOCOL_PORT) { + sourcePort = Integer.parseInt(attributeValue); + } + if (attribute.key() == AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR) { + destinationAddress = attributeValue; + } + if (attribute.key() == AttributeKeys.PROXY_PROTOCOL_SERVER_PORT) { + destinationPort = Integer.parseInt(attributeValue); + } + if (StringUtils.startsWith(attributeKey, HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX)) { + String typeString = StringUtils.substringAfter(attributeKey, HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX); + ByteBuf byteBuf = Unpooled.buffer(); + byteBuf.writeBytes(attributeValue.getBytes(Charset.defaultCharset())); + HAProxyTLV haProxyTLV = new HAProxyTLV(Hex.decodeHex(typeString)[0], byteBuf); + haProxyTLVs.add(haProxyTLV); + } + } + + HAProxyProxiedProtocol proxiedProtocol = AclUtils.isColon(sourceAddress) ? HAProxyProxiedProtocol.TCP6 : + HAProxyProxiedProtocol.TCP4; + + HAProxyMessage message = new HAProxyMessage(HAProxyProtocolVersion.V2, HAProxyCommand.PROXY, + proxiedProtocol, sourceAddress, destinationAddress, sourcePort, destinationPort, haProxyTLVs); + outboundChannel.writeAndFlush(message).sync(); + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java index 913f35c93..c37db92af 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java @@ -24,13 +24,14 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; +import io.netty.handler.codec.haproxy.HAProxyMessageEncoder; import io.netty.handler.ssl.ApplicationProtocolConfig; import io.netty.handler.ssl.ApplicationProtocolNames; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslProvider; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; -import javax.net.ssl.SSLException; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; @@ -38,8 +39,11 @@ import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.ProxyConfig; import org.apache.rocketmq.proxy.remoting.protocol.ProtocolHandler; import org.apache.rocketmq.remoting.common.TlsMode; +import org.apache.rocketmq.remoting.netty.AttributeKeys; import org.apache.rocketmq.remoting.netty.TlsSystemConfig; +import javax.net.ssl.SSLException; + public class Http2ProtocolProxyHandler implements ProtocolHandler { private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME); private static final String LOCAL_HOST = "127.0.0.1"; @@ -101,11 +105,8 @@ public class Http2ProtocolProxyHandler implements ProtocolHandler { .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { - if (sslContext != null) { - ch.pipeline() - .addLast(sslContext.newHandler(ch.alloc(), LOCAL_HOST, config.getGrpcServerPort())); - } - ch.pipeline().addLast(new Http2ProxyBackendHandler(inboundChannel)); + ch.pipeline().addLast(null, Http2ProxyBackendHandler.HANDLER_NAME, + new Http2ProxyBackendHandler(inboundChannel)); } }) .option(ChannelOption.AUTO_READ, false) @@ -120,7 +121,15 @@ public class Http2ProtocolProxyHandler implements ProtocolHandler { } final Channel outboundChannel = f.channel(); + if (inboundChannel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) { + ctx.pipeline().addLast(new HAProxyMessageForwarder(outboundChannel)); + outboundChannel.pipeline().addFirst(HAProxyMessageEncoder.INSTANCE); + } - ctx.pipeline().addLast(new Http2ProxyFrontendHandler(outboundChannel)); + SslHandler sslHandler = null; + if (sslContext != null) { + sslHandler = sslContext.newHandler(outboundChannel.alloc(), LOCAL_HOST, config.getGrpcServerPort()); + } + ctx.pipeline().addLast(new Http2ProxyFrontendHandler(outboundChannel, sslHandler)); } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java index 0195b0c1c..fd5408fae 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java @@ -29,6 +29,8 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; public class Http2ProxyBackendHandler extends ChannelInboundHandlerAdapter { private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME); + public static final String HANDLER_NAME = "Http2ProxyBackendHandler"; + private final Channel inboundChannel; public Http2ProxyBackendHandler(Channel inboundChannel) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java index 87147a322..9b37e85e5 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java @@ -19,36 +19,42 @@ package org.apache.rocketmq.proxy.remoting.protocol.http2proxy; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.ssl.SslHandler; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; public class Http2ProxyFrontendHandler extends ChannelInboundHandlerAdapter { private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME); + + public static final String HANDLER_NAME = "SslHandler"; + // As we use inboundChannel.eventLoop() when building the Bootstrap this does not need to be volatile as // the outboundChannel will use the same EventLoop (and therefore Thread) as the inboundChannel. private final Channel outboundChannel; + private final SslHandler sslHandler; - public Http2ProxyFrontendHandler(final Channel outboundChannel) { + public Http2ProxyFrontendHandler(final Channel outboundChannel, final SslHandler sslHandler) { this.outboundChannel = outboundChannel; + this.sslHandler = sslHandler; } @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) { if (outboundChannel.isActive()) { - outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) { - if (future.isSuccess()) { - // was able to flush out data, start to read the next chunk - ctx.channel().read(); - } else { - future.channel().close(); - } + if (sslHandler != null && outboundChannel.pipeline().get(HANDLER_NAME) == null) { + outboundChannel.pipeline().addBefore(Http2ProxyBackendHandler.HANDLER_NAME, HANDLER_NAME, sslHandler); + } + + outboundChannel.writeAndFlush(msg).addListener((ChannelFutureListener) future -> { + if (future.isSuccess()) { + // was able to flush out data, start to read the next chunk + ctx.channel().read(); + } else { + future.channel().close(); } }); } -- 2.32.0.windows.2 From 8027cfc7cbb6c120d2fc045e0caa8debe1028a31 Mon Sep 17 00:00:00 2001 From: maclong1989 <814742806@qq.com> Date: Sun, 23 Jul 2023 09:15:05 +0800 Subject: [PATCH 02/10] [ISSUE #7063] doc: fix typo in user_guide.md Signed-off-by: jiangyl3 <jiangyl3@asiainfo.com> Co-authored-by: jiangyl3 <jiangyl3@asiainfo.com> --- docs/cn/msg_trace/user_guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/cn/msg_trace/user_guide.md b/docs/cn/msg_trace/user_guide.md index d8314052b..9cf139fd3 100644 --- a/docs/cn/msg_trace/user_guide.md +++ b/docs/cn/msg_trace/user_guide.md @@ -35,7 +35,7 @@ namesrvAddr=XX.XX.XX.XX:9876 RocketMQ集群中每一个Broker节点均用于存储Client端收集并发送过来的消息轨迹数据。因此,对于RocketMQ集群中的Broker节点数量并无要求和限制。 ### 2.3 物理IO隔离模式 -对于消息轨迹数据量较大的场景,可以在RocketMQ集群中选择其中一个Broker节点专用于存储消息轨迹,使得用户普通的消息数据与消息轨迹数据的物理IO完全隔离,互不影响。在该模式下,RockeMQ集群中至少有两个Broker节点,其中一个Broker节点定义为存储消息轨迹数据的服务端。 +对于消息轨迹数据量较大的场景,可以在RocketMQ集群中选择其中一个Broker节点专用于存储消息轨迹,使得用户普通的消息数据与消息轨迹数据的物理IO完全隔离,互不影响。在该模式下,RocketMQ集群中至少有两个Broker节点,其中一个Broker节点定义为存储消息轨迹数据的服务端。 ### 2.4 启动开启消息轨迹的Broker `nohup sh mqbroker -c ../conf/2m-noslave/broker-a.properties &` -- 2.32.0.windows.2 From 3102758487f3e21e977424d7f1b7187eb6c069cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E6=98=9F=E7=81=BF?= <37405937+wuyoudexiao@users.noreply.github.com> Date: Tue, 25 Jul 2023 13:47:53 +0800 Subject: [PATCH 03/10] fix: npe in lockBatchMQ and unlockBatchMQ (#7078) Co-authored-by: wxc <wuxingcan666@foxmail.com> --- .../rocketmq/proxy/processor/ConsumerProcessor.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java index cc973813b..656a6339d 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.proxy.processor; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -425,13 +426,15 @@ public class ConsumerProcessor extends AbstractProcessor { } protected Set<AddressableMessageQueue> buildAddressableSet(ProxyContext ctx, Set<MessageQueue> mqSet) { - return mqSet.stream().map(mq -> { + Set<AddressableMessageQueue> addressableMessageQueueSet = new HashSet<>(mqSet.size()); + for (MessageQueue mq:mqSet) { try { - return serviceManager.getTopicRouteService().buildAddressableMessageQueue(ctx, mq); + addressableMessageQueueSet.add(serviceManager.getTopicRouteService().buildAddressableMessageQueue(ctx, mq)) ; } catch (Exception e) { - return null; + log.error("build addressable message queue fail, messageQueue = {}", mq, e); } - }).collect(Collectors.toSet()); + } + return addressableMessageQueueSet; } protected HashMap<String, List<AddressableMessageQueue>> buildAddressableMapByBrokerName( -- 2.32.0.windows.2 From 047ef7498f2203a2234052603a99a114d8a65e17 Mon Sep 17 00:00:00 2001 From: rongtong <jinrongtong5@163.com> Date: Tue, 25 Jul 2023 14:00:22 +0800 Subject: [PATCH 04/10] Ensuring consistency between broker and nameserver data when deleting a topic (#7066) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 尘央 <xinyuzhou.zxy@alibaba-inc.com> --- .../rocketmq/broker/BrokerController.java | 11 ++ .../rocketmq/broker/out/BrokerOuterAPI.java | 62 ++++++++++ .../processor/AdminBrokerProcessor.java | 26 ++-- .../broker/topic/TopicConfigManager.java | 6 +- .../apache/rocketmq/common/BrokerConfig.java | 14 +++ .../common/namesrv/NamesrvConfig.java | 17 +++ .../namesrv/routeinfo/RouteInfoManager.java | 64 ++++++++-- .../routeinfo/RouteInfoManagerNewTest.java | 99 +++++++++++++++ .../rocketmq/test/util/MQAdminTestUtils.java | 37 ++++++ .../dledger/DLedgerProduceAndConsumeIT.java | 2 +- .../test/route/CreateAndUpdateTopicIT.java | 114 ++++++++++++++++++ 11 files changed, 429 insertions(+), 23 deletions(-) rename test/src/test/java/org/apache/rocketmq/test/{base => }/dledger/DLedgerProduceAndConsumeIT.java (99%) create mode 100644 test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 196401e26..972457194 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -1678,6 +1678,17 @@ public class BrokerController { }, 1000, brokerConfig.getBrokerHeartbeatInterval(), TimeUnit.MILLISECONDS)); } + public synchronized void registerSingleTopicAll(final TopicConfig topicConfig) { + TopicConfig tmpTopic = topicConfig; + if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) + || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { + // Copy the topic config and modify the perm + tmpTopic = new TopicConfig(topicConfig); + tmpTopic.setPerm(topicConfig.getPerm() & this.brokerConfig.getBrokerPermission()); + } + this.brokerOuterAPI.registerSingleTopicAll(this.brokerConfig.getBrokerName(), tmpTopic, 3000); + } + public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) { this.registerIncrementBrokerData(Collections.singletonList(topicConfig), dataVersion); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index b6273e9ed..1793a83c0 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -42,6 +42,7 @@ import org.apache.rocketmq.common.LockCallback; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UnlockCallback; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; @@ -120,12 +121,14 @@ import org.apache.rocketmq.remoting.protocol.header.namesrv.QueryDataVersionRequ import org.apache.rocketmq.remoting.protocol.header.namesrv.QueryDataVersionResponseHeader; import org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterBrokerRequestHeader; import org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterBrokerResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterTopicRequestHeader; import org.apache.rocketmq.remoting.protocol.header.namesrv.UnRegisterBrokerRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.remoting.protocol.namesrv.RegisterBrokerResult; import org.apache.rocketmq.remoting.protocol.route.BrokerData; +import org.apache.rocketmq.remoting.protocol.route.QueueData; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.rpc.ClientMetadata; import org.apache.rocketmq.remoting.rpc.RpcClient; @@ -614,6 +617,65 @@ public class BrokerOuterAPI { throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr); } + /** + * Register the topic route info of single topic to all name server nodes. + * This method is used to replace incremental broker registration feature. + */ + public void registerSingleTopicAll( + final String brokerName, + final TopicConfig topicConfig, + final int timeoutMills) { + String topic = topicConfig.getTopicName(); + RegisterTopicRequestHeader requestHeader = new RegisterTopicRequestHeader(); + requestHeader.setTopic(topic); + + TopicRouteData topicRouteData = new TopicRouteData(); + List<QueueData> queueDatas = new ArrayList<>(); + topicRouteData.setQueueDatas(queueDatas); + + final QueueData queueData = new QueueData(); + queueData.setBrokerName(brokerName); + queueData.setPerm(topicConfig.getPerm()); + queueData.setReadQueueNums(topicConfig.getReadQueueNums()); + queueData.setWriteQueueNums(topicConfig.getWriteQueueNums()); + queueData.setTopicSysFlag(topicConfig.getTopicSysFlag()); + queueDatas.add(queueData); + final byte[] topicRouteBody = topicRouteData.encode(); + + + List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); + final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); + for (final String namesrvAddr : nameServerAddressList) { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_TOPIC_IN_NAMESRV, requestHeader); + request.setBody(topicRouteBody); + + try { + brokerOuterExecutor.execute(() -> { + try { + RemotingCommand response = BrokerOuterAPI.this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills); + assert response != null; + LOGGER.info("Register single topic %s to broker %s with response code %s", topic, brokerName, response.getCode()); + } catch (Exception e) { + LOGGER.warn(String.format("Register single topic %s to broker %s exception", topic, brokerName), e); + } finally { + countDownLatch.countDown(); + } + }); + } catch (Exception e) { + LOGGER.warn("Execute single topic registration task failed, topic {}, broker name {}", topic, brokerName); + countDownLatch.countDown(); + } + + } + + try { + if (!countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS)) { + LOGGER.warn("Registration single topic to one or more name servers timeout. Timeout threshold: {}ms", timeoutMills); + } + } catch (InterruptedException ignore) { + } + } + public List<Boolean> needRegister( final String clusterName, final String brokerAddr, diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 892a71330..569a1c57b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -441,13 +441,18 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { try { this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); - this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion()); + if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { + this.brokerController.registerSingleTopicAll(topicConfig); + } else { + this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion()); + } response.setCode(ResponseCode.SUCCESS); } catch (Exception e) { LOGGER.error("Update / create topic failed for [{}]", request, e); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(e.getMessage()); } + return response; } @@ -769,7 +774,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } - private synchronized RemotingCommand updateColdDataFlowCtrGroupConfig(ChannelHandlerContext ctx, RemotingCommand request) { + private synchronized RemotingCommand updateColdDataFlowCtrGroupConfig(ChannelHandlerContext ctx, + RemotingCommand request) { final RemotingCommand response = RemotingCommand.createResponseCommand(null); LOGGER.info("updateColdDataFlowCtrGroupConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); @@ -876,7 +882,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { } MessageStore messageStore = this.brokerController.getMessageStore(); if (messageStore instanceof DefaultMessageStore) { - DefaultMessageStore defaultMessageStore = (DefaultMessageStore)messageStore; + DefaultMessageStore defaultMessageStore = (DefaultMessageStore) messageStore; if (mode == LibC.MADV_NORMAL) { defaultMessageStore.getMessageStoreConfig().setDataReadAheadEnable(true); } else { @@ -1835,13 +1841,13 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { /** * Reset consumer offset. * - * @param topic Required, not null. - * @param group Required, not null. - * @param queueId if target queue ID is negative, all message queues will be reset; - * otherwise, only the target queue would get reset. - * @param timestamp if timestamp is negative, offset would be reset to broker offset at the time being; - * otherwise, binary search is performed to locate target offset. - * @param offset Target offset to reset to if target queue ID is properly provided. + * @param topic Required, not null. + * @param group Required, not null. + * @param queueId if target queue ID is negative, all message queues will be reset; otherwise, only the target queue + * would get reset. + * @param timestamp if timestamp is negative, offset would be reset to broker offset at the time being; otherwise, + * binary search is performed to locate target offset. + * @param offset Target offset to reset to if target queue ID is properly provided. * @return Affected queues and their new offset */ private RemotingCommand resetOffsetInner(String topic, String group, int queueId, long timestamp, Long offset) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index e5fdd8675..e90530512 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -305,7 +305,11 @@ public class TopicConfigManager extends ConfigManager { log.error("createTopicIfAbsent ", e); } if (createNew && register) { - this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); + if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { + this.brokerController.registerSingleTopicAll(topicConfig); + } else { + this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); + } } return this.topicConfigTable.get(topicConfig.getTopicName()); } diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index a4d82d1c5..02c692e2b 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -386,6 +386,12 @@ public class BrokerConfig extends BrokerIdentity { */ private boolean popResponseReturnActualRetryTopic = false; + /** + * If both the deleteTopicWithBrokerRegistration flag in the NameServer configuration and this flag are set to true, + * it guarantees the ultimate consistency of data between the broker and the nameserver during topic deletion. + */ + private boolean enableSingleTopicRegister = false; + public long getMaxPopPollingSize() { return maxPopPollingSize; } @@ -1689,4 +1695,12 @@ public class BrokerConfig extends BrokerIdentity { public void setPopResponseReturnActualRetryTopic(boolean popResponseReturnActualRetryTopic) { this.popResponseReturnActualRetryTopic = popResponseReturnActualRetryTopic; } + + public boolean isEnableSingleTopicRegister() { + return enableSingleTopicRegister; + } + + public void setEnableSingleTopicRegister(boolean enableSingleTopicRegister) { + this.enableSingleTopicRegister = enableSingleTopicRegister; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java index 700febfe2..5b8a6dedb 100644 --- a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java @@ -82,6 +82,15 @@ public class NamesrvConfig { private int waitSecondsForService = 45; + /** + * If enable this flag, the topics that don't exist in broker registration payload will be deleted from name server. + * + * WARNING: + * 1. Enable this flag and "enableSingleTopicRegister" of broker config meanwhile to avoid losing topic route info unexpectedly. + * 2. This flag does not support static topic currently. + */ + private boolean deleteTopicWithBrokerRegistration = false; + public boolean isOrderMessageEnable() { return orderMessageEnable; } @@ -241,4 +250,12 @@ public class NamesrvConfig { public void setWaitSecondsForService(int waitSecondsForService) { this.waitSecondsForService = waitSecondsForService; } + + public boolean isDeleteTopicWithBrokerRegistration() { + return deleteTopicWithBrokerRegistration; + } + + public void setDeleteTopicWithBrokerRegistration(boolean deleteTopicWithBrokerRegistration) { + this.deleteTopicWithBrokerRegistration = deleteTopicWithBrokerRegistration; + } } diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java index ac27d76ce..0055a1cc8 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java @@ -121,9 +121,18 @@ public class RouteInfoManager { if (queueDatas == null || queueDatas.isEmpty()) { return; } + try { this.lock.writeLock().lockInterruptibly(); if (this.topicQueueTable.containsKey(topic)) { + Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topic); + for (QueueData queueData : queueDatas) { + if (!this.brokerAddrTable.containsKey(queueData.getBrokerName())) { + log.warn("Register topic contains illegal broker, {}, {}", topic, queueData); + return; + } + queueDataMap.put(queueData.getBrokerName(), queueData); + } log.info("Topic route already exist.{}, {}", topic, this.topicQueueTable.get(topic)); } else { // check and construct queue data map @@ -299,7 +308,32 @@ public class RouteInfoManager { ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable(); + if (tcTable != null) { + + TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper); + Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap(); + + // Delete the topics that don't exist in tcTable from the current broker + // Static topic is not supported currently + if (namesrvConfig.isDeleteTopicWithBrokerRegistration() && topicQueueMappingInfoMap.isEmpty()) { + final Set<String> oldTopicSet = topicSetOfBrokerName(brokerName); + final Set<String> newTopicSet = tcTable.keySet(); + final Sets.SetView<String> toDeleteTopics = Sets.difference(oldTopicSet, newTopicSet); + for (final String toDeleteTopic : toDeleteTopics) { + Map<String, QueueData> queueDataMap = topicQueueTable.get(toDeleteTopic); + final QueueData removedQD = queueDataMap.remove(brokerName); + if (removedQD != null) { + log.info("deleteTopic, remove one broker's topic {} {} {}", brokerName, toDeleteTopic, removedQD); + } + + if (queueDataMap.isEmpty()) { + log.info("deleteTopic, remove the topic all queue {}", toDeleteTopic); + topicQueueTable.remove(toDeleteTopic); + } + } + } + for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) { if (registerFirst || this.isTopicConfigChanged(clusterName, brokerAddr, topicConfigWrapper.getDataVersion(), brokerName, @@ -312,19 +346,17 @@ public class RouteInfoManager { this.createAndUpdateQueueData(brokerName, topicConfig); } } - } - if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { - TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper); - Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap(); - //the topicQueueMappingInfoMap should never be null, but can be empty - for (Map.Entry<String, TopicQueueMappingInfo> entry : topicQueueMappingInfoMap.entrySet()) { - if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) { - topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<>()); + if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { + //the topicQueueMappingInfoMap should never be null, but can be empty + for (Map.Entry<String, TopicQueueMappingInfo> entry : topicQueueMappingInfoMap.entrySet()) { + if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) { + topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<>()); + } + //Note asset brokerName equal entry.getValue().getBname() + //here use the mappingDetail.bname + topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue()); } - //Note asset brokerName equal entry.getValue().getBname() - //here use the mappingDetail.bname - topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue()); } } } @@ -374,6 +406,16 @@ public class RouteInfoManager { return result; } + private Set<String> topicSetOfBrokerName(final String brokerName) { + Set<String> topicOfBroker = new HashSet<>(); + for (final Entry<String, Map<String, QueueData>> entry : this.topicQueueTable.entrySet()) { + if (entry.getValue().containsKey(brokerName)) { + topicOfBroker.add(entry.getKey()); + } + } + return topicOfBroker; + } + public BrokerMemberGroup getBrokerMemberGroup(String clusterName, String brokerName) { BrokerMemberGroup groupMember = new BrokerMemberGroup(clusterName, brokerName); try { diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java index b53519e5f..6002d1f5a 100644 --- a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java +++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java @@ -22,6 +22,7 @@ import io.netty.channel.Channel; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -37,6 +38,7 @@ import org.apache.rocketmq.remoting.protocol.body.TopicList; import org.apache.rocketmq.remoting.protocol.header.namesrv.UnRegisterBrokerRequestHeader; import org.apache.rocketmq.remoting.protocol.namesrv.RegisterBrokerResult; import org.apache.rocketmq.remoting.protocol.route.BrokerData; +import org.apache.rocketmq.remoting.protocol.route.QueueData; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; import org.junit.After; import org.junit.Before; @@ -624,6 +626,92 @@ public class RouteInfoManagerNewTest { .containsValues(BrokerBasicInfo.defaultBroker().brokerAddr, BrokerBasicInfo.slaveBroker().brokerAddr); } + @Test + public void keepTopicWithBrokerRegistration() { + RegisterBrokerResult masterResult = registerBrokerWithNormalTopic(BrokerBasicInfo.defaultBroker(), "TestTopic", "TestTopic1"); + assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull(); + assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull(); + + masterResult = registerBrokerWithNormalTopic(BrokerBasicInfo.defaultBroker(), "TestTopic1"); + assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull(); + assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull(); + } + + @Test + public void deleteTopicWithBrokerRegistration() { + config.setDeleteTopicWithBrokerRegistration(true); + registerBrokerWithNormalTopic(BrokerBasicInfo.defaultBroker(), "TestTopic", "TestTopic1"); + assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull(); + assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull(); + + registerBrokerWithNormalTopic(BrokerBasicInfo.defaultBroker(), "TestTopic1"); + assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull(); + assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull(); + } + + @Test + public void deleteTopicWithBrokerRegistration2() { + // Register two brokers and delete a specific one by one + config.setDeleteTopicWithBrokerRegistration(true); + final BrokerBasicInfo master1 = BrokerBasicInfo.defaultBroker(); + final BrokerBasicInfo master2 = BrokerBasicInfo.defaultBroker().name(DEFAULT_BROKER + 1).addr(DEFAULT_ADDR + 9); + + registerBrokerWithNormalTopic(master1, "TestTopic", "TestTopic1"); + registerBrokerWithNormalTopic(master2, "TestTopic", "TestTopic1"); + + assertThat(routeInfoManager.pickupTopicRouteData("TestTopic").getBrokerDatas()).hasSize(2); + assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1").getBrokerDatas()).hasSize(2); + + + registerBrokerWithNormalTopic(master1, "TestTopic1"); + assertThat(routeInfoManager.pickupTopicRouteData("TestTopic").getBrokerDatas()).hasSize(1); + assertThat(routeInfoManager.pickupTopicRouteData("TestTopic").getBrokerDatas().get(0).getBrokerName()) + .isEqualTo(master2.brokerName); + assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1").getBrokerDatas()).hasSize(2); + + registerBrokerWithNormalTopic(master2, "TestTopic1"); + assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull(); + assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1").getBrokerDatas()).hasSize(2); + } + + @Test + public void registerSingleTopicWithBrokerRegistration() { + config.setDeleteTopicWithBrokerRegistration(true); + final BrokerBasicInfo master1 = BrokerBasicInfo.defaultBroker(); + + registerSingleTopicWithBrokerName(master1.brokerName, "TestTopic"); + + // Single topic registration failed because there is no broker connection exists + assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull(); + + // Register broker with TestTopic first and then register single topic TestTopic1 + registerBrokerWithNormalTopic(master1, "TestTopic"); + assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull(); + + registerSingleTopicWithBrokerName(master1.brokerName, "TestTopic1"); + assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull(); + + // Register the two topics to keep the route info + registerBrokerWithNormalTopic(master1, "TestTopic", "TestTopic1"); + assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull(); + assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull(); + + // Cancel the TestTopic1 with broker registration + registerBrokerWithNormalTopic(master1, "TestTopic"); + assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull(); + assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNull(); + + // Add TestTopic1 and cancel all the topics with broker un-registration + registerSingleTopicWithBrokerName(master1.brokerName, "TestTopic1"); + assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull(); + + routeInfoManager.unregisterBroker(master1.clusterName, master1.brokerAddr, master1.brokerName, 0); + assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull(); + assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNull(); + + + } + private RegisterBrokerResult registerBrokerWithNormalTopic(BrokerBasicInfo brokerInfo, String... topics) { ConcurrentHashMap<String, TopicConfig> topicConfigConcurrentHashMap = new ConcurrentHashMap<>(); TopicConfig baseTopic = new TopicConfig("baseTopic"); @@ -711,6 +799,17 @@ public class RouteInfoManagerNewTest { return registerBrokerResult; } + private void registerSingleTopicWithBrokerName(String brokerName, String... topics) { + for (final String topic : topics) { + QueueData queueData = new QueueData(); + queueData.setBrokerName(brokerName); + queueData.setReadQueueNums(8); + queueData.setWriteQueueNums(8); + queueData.setPerm(6); + routeInfoManager.registerTopic(topic, Collections.singletonList(queueData)); + } + } + static class BrokerBasicInfo { String clusterName; String brokerName; diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java index 11b00a72c..d3d5de9e2 100644 --- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java +++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.test.util; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -38,6 +39,7 @@ import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats; import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.route.BrokerData; +import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.protocol.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingOne; import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingUtils; @@ -319,4 +321,39 @@ public class MQAdminTestUtils { } return consumeStats; } + + /** + * Delete topic from broker only without cleaning route info from name server forwardly + * + * @param nameSrvAddr the namesrv addr to connect + * @param brokerName the specific broker + * @param topic the specific topic to delete + */ + public static void deleteTopicFromBrokerOnly(String nameSrvAddr, String brokerName, String topic) { + DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(); + mqAdminExt.setNamesrvAddr(nameSrvAddr); + + try { + mqAdminExt.start(); + String brokerAddr = CommandUtil.fetchMasterAddrByBrokerName(mqAdminExt, brokerName); + mqAdminExt.deleteTopicInBroker(Collections.singleton(brokerAddr), topic); + } catch (Exception ignored) { + } finally { + mqAdminExt.shutdown(); + } + } + + public static TopicRouteData examineTopicRouteInfo(String nameSrvAddr, String topicName) { + DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(); + mqAdminExt.setNamesrvAddr(nameSrvAddr); + TopicRouteData route = null; + try { + mqAdminExt.start(); + route = mqAdminExt.examineTopicRouteInfo(topicName); + } catch (Exception ignored) { + } finally { + mqAdminExt.shutdown(); + } + return route; + } } diff --git a/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java b/test/src/test/java/org/apache/rocketmq/test/dledger/DLedgerProduceAndConsumeIT.java similarity index 99% rename from test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java rename to test/src/test/java/org/apache/rocketmq/test/dledger/DLedgerProduceAndConsumeIT.java index 9e142eb61..43fefd616 100644 --- a/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/dledger/DLedgerProduceAndConsumeIT.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.test.base.dledger; +package org.apache.rocketmq.test.dledger; import java.util.UUID; import org.apache.rocketmq.broker.BrokerController; diff --git a/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java new file mode 100644 index 000000000..7e3c7b871 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.route; + +import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.util.MQAdminTestUtils; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class CreateAndUpdateTopicIT extends BaseConf { + + @Test + public void testCreateOrUpdateTopic_EnableSingleTopicRegistration() { + String topic = "test-topic-without-broker-registration"; + brokerController1.getBrokerConfig().setEnableSingleTopicRegister(true); + brokerController2.getBrokerConfig().setEnableSingleTopicRegister(true); + brokerController3.getBrokerConfig().setEnableSingleTopicRegister(true); + + final boolean createResult = MQAdminTestUtils.createTopic(NAMESRV_ADDR, CLUSTER_NAME, topic, 8, null); + assertThat(createResult).isTrue(); + + TopicRouteData route = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, topic); + assertThat(route.getBrokerDatas()).hasSize(3); + assertThat(route.getQueueDatas()).hasSize(3); + + brokerController1.getBrokerConfig().setEnableSingleTopicRegister(false); + brokerController2.getBrokerConfig().setEnableSingleTopicRegister(false); + brokerController3.getBrokerConfig().setEnableSingleTopicRegister(false); + + } + + @Test + public void testDeleteTopicFromNameSrvWithBrokerRegistration() { + namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(true); + brokerController1.getBrokerConfig().setEnableSingleTopicRegister(true); + brokerController2.getBrokerConfig().setEnableSingleTopicRegister(true); + brokerController3.getBrokerConfig().setEnableSingleTopicRegister(true); + + String testTopic1 = "test-topic-keep-route"; + String testTopic2 = "test-topic-delete-route"; + + boolean createResult = MQAdminTestUtils.createTopic(NAMESRV_ADDR, CLUSTER_NAME, testTopic1, 8, null); + assertThat(createResult).isTrue(); + + + createResult = MQAdminTestUtils.createTopic(NAMESRV_ADDR, CLUSTER_NAME, testTopic2, 8, null); + assertThat(createResult).isTrue(); + + + TopicRouteData route = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, testTopic2); + assertThat(route.getBrokerDatas()).hasSize(3); + + MQAdminTestUtils.deleteTopicFromBrokerOnly(NAMESRV_ADDR, BROKER1_NAME, testTopic2); + + // Deletion is lazy, trigger broker registration + brokerController1.registerBrokerAll(false, false, true); + + // The route info of testTopic2 will be removed from broker1 after the registration + route = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, testTopic2); + assertThat(route.getBrokerDatas()).hasSize(2); + assertThat(route.getQueueDatas().get(0).getBrokerName()).isEqualTo(BROKER2_NAME); + assertThat(route.getQueueDatas().get(1).getBrokerName()).isEqualTo(BROKER3_NAME); + + brokerController1.getBrokerConfig().setEnableSingleTopicRegister(false); + brokerController2.getBrokerConfig().setEnableSingleTopicRegister(false); + brokerController3.getBrokerConfig().setEnableSingleTopicRegister(false); + namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(false); + } + + @Test + public void testStaticTopicNotAffected() throws Exception { + namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(true); + brokerController1.getBrokerConfig().setEnableSingleTopicRegister(true); + brokerController2.getBrokerConfig().setEnableSingleTopicRegister(true); + brokerController3.getBrokerConfig().setEnableSingleTopicRegister(true); + + String testTopic = "test-topic-not-affected"; + String testStaticTopic = "test-static-topic"; + + boolean createResult = MQAdminTestUtils.createTopic(NAMESRV_ADDR, CLUSTER_NAME, testTopic, 8, null); + assertThat(createResult).isTrue(); + + TopicRouteData route = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, testTopic); + assertThat(route.getBrokerDatas()).hasSize(3); + assertThat(route.getQueueDatas()).hasSize(3); + + MQAdminTestUtils.createStaticTopicWithCommand(testStaticTopic, 10, null, CLUSTER_NAME, NAMESRV_ADDR); + + assertThat(route.getBrokerDatas()).hasSize(3); + assertThat(route.getQueueDatas()).hasSize(3); + + brokerController1.getBrokerConfig().setEnableSingleTopicRegister(false); + brokerController2.getBrokerConfig().setEnableSingleTopicRegister(false); + brokerController3.getBrokerConfig().setEnableSingleTopicRegister(false); + namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(false); + } +} -- 2.32.0.windows.2 From 32eb1d55570af81641a4a40d96ff5554329b93cb Mon Sep 17 00:00:00 2001 From: gaoyf <gaoyf@users.noreply.github.com> Date: Tue, 25 Jul 2023 15:26:20 +0800 Subject: [PATCH 05/10] [ISSUE #7068] Fix failed to create syncer topic when the proxy was just started (#7076) --- .../apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java | 1 + 1 file changed, 1 insertion(+) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java index f7d9b11ba..c68859b28 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java @@ -104,6 +104,7 @@ public class MQClientAPIFactory implements StartAndShutdown { rpcHook); if (!mqClientAPIExt.updateNameServerAddressList()) { + mqClientAPIExt.fetchNameServerAddr(); this.scheduledExecutorService.scheduleAtFixedRate( mqClientAPIExt::fetchNameServerAddr, Duration.ofSeconds(10).toMillis(), -- 2.32.0.windows.2 From d79737788078707168c0258c4af0d800de32c137 Mon Sep 17 00:00:00 2001 From: Vincent Lee <cool8511@gmail.com> Date: Thu, 27 Jul 2023 10:51:51 +0800 Subject: [PATCH 06/10] [ISSUE #7056] Avoid close success channel if invokeSync most time cost on get connection for channel (#7057) * fix: avoid close success channel if invokeSync most time cost on get channel Change-Id: I29741cf55ac6333bfa30fef755357b78a22b1325 * fix: ci style Change-Id: I8c9b86e9cb6f1463bf213e64c9b8c139afa794c8 --- .../rocketmq/remoting/netty/NettyRemotingClient.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index 9715b918a..8491f4354 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -88,6 +88,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME); private static final long LOCK_TIMEOUT_MILLIS = 3000; + private static final long MIN_CLOSE_TIMEOUT_MILLIS = 100; private final NettyClientConfig nettyClientConfig; private final Bootstrap bootstrap = new Bootstrap(); @@ -524,13 +525,15 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti final Channel channel = this.getAndCreateChannel(addr); String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel); if (channel != null && channel.isActive()) { + long left = timeoutMillis; try { doBeforeRpcHooks(channelRemoteAddr, request); long costTime = System.currentTimeMillis() - beginStartTime; - if (timeoutMillis < costTime) { + left -= costTime; + if (left <= 0) { throw new RemotingTimeoutException("invokeSync call the addr[" + channelRemoteAddr + "] timeout"); } - RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime); + RemotingCommand response = this.invokeSyncImpl(channel, request, left); doAfterRpcHooks(channelRemoteAddr, request, response); this.updateChannelLastResponseTime(addr); return response; @@ -539,7 +542,9 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti this.closeChannel(addr, channel); throw e; } catch (RemotingTimeoutException e) { - if (nettyClientConfig.isClientCloseSocketIfTimeout()) { + // avoid close the success channel if left timeout is small, since it may cost too much time in get the success channel, the left timeout for read is small + boolean shouldClose = left > MIN_CLOSE_TIMEOUT_MILLIS || left > timeoutMillis / 4; + if (nettyClientConfig.isClientCloseSocketIfTimeout() && shouldClose) { this.closeChannel(addr, channel); LOGGER.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, channelRemoteAddr); } -- 2.32.0.windows.2 From d0a69be563785ca815dc31ef1aab4c1bc5588c01 Mon Sep 17 00:00:00 2001 From: zd46319 <zd46319@163.com> Date: Thu, 27 Jul 2023 16:56:41 +0800 Subject: [PATCH 07/10] [ISSUE #6810] Fix the bug of mistakenly deleting data in clientChannelTable when the channel expire (#7073) --- .../broker/client/ProducerManager.java | 5 ++- .../broker/client/ProducerManagerTest.java | 34 +++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java index 52d67bf28..f9fe1193e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java @@ -112,7 +112,10 @@ public class ProducerManager { long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp(); if (diff > CHANNEL_EXPIRED_TIMEOUT) { it.remove(); - clientChannelTable.remove(info.getClientId()); + Channel channelInClientTable = clientChannelTable.get(info.getClientId()); + if (channelInClientTable != null && channelInClientTable.equals(info.getChannel())) { + clientChannelTable.remove(info.getClientId()); + } log.warn( "ProducerManager#scanNotActiveChannel: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}", RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java index dac5468c8..3d6091e02 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java @@ -27,6 +27,7 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; import static org.assertj.core.api.Assertions.assertThat; @@ -79,6 +80,39 @@ public class ProducerManagerTest { assertThat(producerManager.findChannel("clientId")).isNull(); } + @Test + public void scanNotActiveChannelWithSameClientId() throws Exception { + producerManager.registerProducer(group, clientInfo); + Channel channel1 = Mockito.mock(Channel.class); + ClientChannelInfo clientInfo1 = new ClientChannelInfo(channel1, clientInfo.getClientId(), LanguageCode.JAVA, 0); + producerManager.registerProducer(group, clientInfo1); + AtomicReference<String> groupRef = new AtomicReference<>(); + AtomicReference<ClientChannelInfo> clientChannelInfoRef = new AtomicReference<>(); + producerManager.appendProducerChangeListener((event, group, clientChannelInfo) -> { + switch (event) { + case GROUP_UNREGISTER: + groupRef.set(group); + break; + case CLIENT_UNREGISTER: + clientChannelInfoRef.set(clientChannelInfo); + break; + default: + break; + } + }); + assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNotNull(); + assertThat(producerManager.getGroupChannelTable().get(group).get(channel1)).isNotNull(); + assertThat(producerManager.findChannel("clientId")).isNotNull(); + Field field = ProducerManager.class.getDeclaredField("CHANNEL_EXPIRED_TIMEOUT"); + field.setAccessible(true); + long channelExpiredTimeout = field.getLong(producerManager); + clientInfo.setLastUpdateTimestamp(System.currentTimeMillis() - channelExpiredTimeout - 10); + when(channel.close()).thenReturn(mock(ChannelFuture.class)); + producerManager.scanNotActiveChannel(); + assertThat(producerManager.getGroupChannelTable().get(group).get(channel1)).isNotNull(); + assertThat(producerManager.findChannel("clientId")).isNotNull(); + } + @Test public void doChannelCloseEvent() throws Exception { producerManager.registerProducer(group, clientInfo); -- 2.32.0.windows.2 From d429bd72dfae0901f4325c8e9c6ce631286e40d4 Mon Sep 17 00:00:00 2001 From: cnScarb <jjhfen00@163.com> Date: Fri, 28 Jul 2023 09:46:39 +0800 Subject: [PATCH 08/10] [ISSUE #7039] Fix retry message filter when subtype is TAG (#7040) --- .../broker/filter/ExpressionForRetryMessageFilter.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java index d2d1087ef..bc01b21cb 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java @@ -45,12 +45,12 @@ public class ExpressionForRetryMessageFilter extends ExpressionMessageFilter { return true; } - boolean isRetryTopic = subscriptionData.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX); - - if (!isRetryTopic && ExpressionType.isTagType(subscriptionData.getExpressionType())) { + if (ExpressionType.isTagType(subscriptionData.getExpressionType())) { return true; } + boolean isRetryTopic = subscriptionData.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX); + ConsumerFilterData realFilterData = this.consumerFilterData; Map<String, String> tempProperties = properties; boolean decoded = false; -- 2.32.0.windows.2 From 8baa51e85e569429293720b2ba7fcaee745abecc Mon Sep 17 00:00:00 2001 From: Zack_Aayush <60972989+AayushSaini101@users.noreply.github.com> Date: Sun, 30 Jul 2023 09:02:02 +0530 Subject: [PATCH 09/10] [ISSUE #7091] Update the cd command in README (#7096) * Update the cd command * Removed extra space --------- Co-authored-by: Aayush <aaayush@redhat.com> --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 393ef88e6..56d253ce1 100644 --- a/README.md +++ b/README.md @@ -63,7 +63,7 @@ $ unzip rocketmq-all-5.1.3-bin-release.zip Prepare a terminal and change to the extracted `bin` directory: ```shell -$ cd rocketmq-all-5.1.3/bin +$ cd rocketmq-all-5.1.3-bin-release/bin ``` **1) Start NameServer** -- 2.32.0.windows.2 From 8bcc94829d2ef2597a8eeab3c6b7099432a0bea1 Mon Sep 17 00:00:00 2001 From: weihubeats <weihubeats@163.com> Date: Tue, 1 Aug 2023 10:15:07 +0800 Subject: [PATCH 10/10] [ISSUE #7077] Schedule CQ offset invalid. offset=77, cqMinOffset=0, cqMaxOffset=74, queueId=1 (#7084) * Adding null does not update * delete slave put correctDelayOffset * Remove duplicate delayOffset file loading * add loadWhenSyncDelayOffset * add method * add method --- .../rocketmq/broker/schedule/ScheduleMessageService.java | 6 ++++++ .../org/apache/rocketmq/broker/slave/SlaveSynchronize.java | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java index 2a4ace098..26f09dcd0 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java @@ -223,6 +223,12 @@ public class ScheduleMessageService extends ConfigManager { result = result && this.correctDelayOffset(); return result; } + + public boolean loadWhenSyncDelayOffset() { + boolean result = super.load(); + result = result && this.parseDelayLevel(); + return result; + } public boolean correctDelayOffset() { try { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java index b9de5173b..53cdecdf8 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java @@ -152,7 +152,7 @@ public class SlaveSynchronize { .getMessageStoreConfig().getStorePathRootDir()); try { MixAll.string2File(delayOffset, fileName); - this.brokerController.getScheduleMessageService().load(); + this.brokerController.getScheduleMessageService().loadWhenSyncDelayOffset(); } catch (IOException e) { LOGGER.error("Persist file Exception, {}", fileName, e); } -- 2.32.0.windows.2
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.
浙ICP备2022010568号-2