Projects
home:Eustace:branches:Eulaceura:Factory
rocketmq
_service:obs_scm:patch014-backport-Queue-Select...
Sign Up
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File _service:obs_scm:patch014-backport-Queue-Selection-Strategy-Optimization.patch of Package rocketmq
From b028277018946868838a82a08211071bc231a175 Mon Sep 17 00:00:00 2001 From: Ji Juntao <juntao.jjt@alibaba-inc.com> Date: Tue, 29 Aug 2023 16:13:38 +0800 Subject: [PATCH] [ISSUE #6567] [RIP-63] Queue Selection Strategy Optimization (#6568) Optimize the proxy's and client's selection strategy for brokers when sending messages, and use multiple selection strategies as a pipeline to filter suitable queues. --- .../apache/rocketmq/client/ClientConfig.java | 54 +++++ .../client/common/ThreadLocalIndex.java | 8 + .../rocketmq/client/impl/MQClientAPIImpl.java | 12 +- .../client/impl/factory/MQClientInstance.java | 7 + .../impl/producer/DefaultMQProducerImpl.java | 87 ++++++-- .../impl/producer/TopicPublishInfo.java | 40 ++++ .../client/latency/LatencyFaultTolerance.java | 66 +++++- .../latency/LatencyFaultToleranceImpl.java | 189 ++++++++++++++---- .../client/latency/MQFaultStrategy.java | 155 ++++++++++---- .../rocketmq/client/latency/Resolver.java | 17 +- .../client/latency/ServiceDetector.java | 30 +++ .../LatencyFaultToleranceImplTest.java | 36 +++- .../processor/DefaultRequestProcessor.java | 24 --- .../rocketmq/proxy/config/ProxyConfig.java | 46 +++++ .../grpc/v2/producer/SendMessageActivity.java | 2 +- .../proxy/processor/ProducerProcessor.java | 18 +- .../service/route/LocalTopicRouteService.java | 2 +- .../service/route/MessageQueueSelector.java | 95 ++++++++- .../proxy/service/route/MessageQueueView.java | 18 +- .../service/route/TopicRouteService.java | 80 +++++++- .../consumer/ReceiveMessageActivityTest.java | 5 +- .../v2/producer/SendMessageActivityTest.java | 82 +++++++- .../proxy/service/BaseServiceTest.java | 4 +- .../route/MessageQueueSelectorTest.java | 8 +- .../sysmessage/HeartbeatSyncerTest.java | 2 +- .../ClusterTransactionServiceTest.java | 8 +- 26 files changed, 919 insertions(+), 176 deletions(-) rename remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/GetRemoteClientConfigBody.java => client/src/main/java/org/apache/rocketmq/client/latency/Resolver.java (65%) create mode 100644 client/src/main/java/org/apache/rocketmq/client/latency/ServiceDetector.java diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java index f87450f66..bb0fe3522 100644 --- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java +++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java @@ -38,6 +38,8 @@ public class ClientConfig { public static final String SOCKS_PROXY_CONFIG = "com.rocketmq.socks.proxy.config"; public static final String DECODE_READ_BODY = "com.rocketmq.read.body"; public static final String DECODE_DECOMPRESS_BODY = "com.rocketmq.decompress.body"; + public static final String SEND_LATENCY_ENABLE = "com.rocketmq.sendLatencyEnable"; + public static final String START_DETECTOR_ENABLE = "com.rocketmq.startDetectorEnable"; public static final String HEART_BEAT_V2 = "com.rocketmq.heartbeat.v2"; private String namesrvAddr = NameServerAddressUtils.getNameServerAddresses(); private String clientIP = NetworkUtil.getLocalAddress(); @@ -72,6 +74,8 @@ public class ClientConfig { private String socksProxyConfig = System.getProperty(SOCKS_PROXY_CONFIG, "{}"); private int mqClientApiTimeout = 3 * 1000; + private int detectTimeout = 200; + private int detectInterval = 2 * 1000; private LanguageCode language = LanguageCode.JAVA; @@ -81,6 +85,15 @@ public class ClientConfig { */ protected boolean enableStreamRequestType = false; + /** + * Enable the fault tolerance mechanism of the client sending process. + * DO NOT OPEN when ORDER messages are required. + * Turning on will interfere with the queue selection functionality, + * possibly conflicting with the order message. + */ + private boolean sendLatencyEnable = Boolean.parseBoolean(System.getProperty(SEND_LATENCY_ENABLE, "false")); + private boolean startDetectorEnable = Boolean.parseBoolean(System.getProperty(START_DETECTOR_ENABLE, "false")); + public String buildMQClientId() { StringBuilder sb = new StringBuilder(); sb.append(this.getClientIP()); @@ -186,6 +199,10 @@ public class ClientConfig { this.decodeDecompressBody = cc.decodeDecompressBody; this.enableStreamRequestType = cc.enableStreamRequestType; this.useHeartbeatV2 = cc.useHeartbeatV2; + this.startDetectorEnable = cc.startDetectorEnable; + this.sendLatencyEnable = cc.sendLatencyEnable; + this.detectInterval = cc.detectInterval; + this.detectTimeout = cc.detectTimeout; } public ClientConfig cloneClientConfig() { @@ -210,6 +227,10 @@ public class ClientConfig { cc.decodeDecompressBody = decodeDecompressBody; cc.enableStreamRequestType = enableStreamRequestType; cc.useHeartbeatV2 = useHeartbeatV2; + cc.startDetectorEnable = startDetectorEnable; + cc.sendLatencyEnable = sendLatencyEnable; + cc.detectInterval = detectInterval; + cc.detectTimeout = detectTimeout; return cc; } @@ -381,6 +402,38 @@ public class ClientConfig { this.enableStreamRequestType = enableStreamRequestType; } + public boolean isSendLatencyEnable() { + return sendLatencyEnable; + } + + public void setSendLatencyEnable(boolean sendLatencyEnable) { + this.sendLatencyEnable = sendLatencyEnable; + } + + public boolean isStartDetectorEnable() { + return startDetectorEnable; + } + + public void setStartDetectorEnable(boolean startDetectorEnable) { + this.startDetectorEnable = startDetectorEnable; + } + + public int getDetectTimeout() { + return this.detectTimeout; + } + + public void setDetectTimeout(int detectTimeout) { + this.detectTimeout = detectTimeout; + } + + public int getDetectInterval() { + return this.detectInterval; + } + + public void setDetectInterval(int detectInterval) { + this.detectInterval = detectInterval; + } + public boolean isUseHeartbeatV2() { return useHeartbeatV2; } @@ -403,6 +456,7 @@ public class ClientConfig { + ", socksProxyConfig=" + socksProxyConfig + ", language=" + language.name() + ", namespace=" + namespace + ", mqClientApiTimeout=" + mqClientApiTimeout + ", decodeReadBody=" + decodeReadBody + ", decodeDecompressBody=" + decodeDecompressBody + + ", sendLatencyEnable=" + sendLatencyEnable + ", startDetectorEnable=" + startDetectorEnable + ", enableStreamRequestType=" + enableStreamRequestType + ", useHeartbeatV2=" + useHeartbeatV2 + "]"; } } diff --git a/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java b/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java index 4a3d90135..3a086c13d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java +++ b/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java @@ -33,6 +33,14 @@ public class ThreadLocalIndex { return index & POSITIVE_MASK; } + public void reset() { + int index = Math.abs(random.nextInt(Integer.MAX_VALUE)); + if (index < 0) { + index = 0; + } + this.threadLocalIndex.set(index); + } + @Override public String toString() { return "ThreadLocalIndex{" + diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 213c26fd6..3201a493f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -666,7 +666,7 @@ public class MQClientAPIImpl implements NameServerUpdateCallback { } catch (Throwable e) { } - producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false); + producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false, true); return; } @@ -684,14 +684,14 @@ public class MQClientAPIImpl implements NameServerUpdateCallback { } catch (Throwable e) { } - producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false); + producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false, true); } catch (Exception e) { - producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); + producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true, true); onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, e, context, false, producer); } } else { - producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); + producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true, true); if (!responseFuture.isSendRequestOK()) { MQClientException ex = new MQClientException("send request failed", responseFuture.getCause()); onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, @@ -711,7 +711,7 @@ public class MQClientAPIImpl implements NameServerUpdateCallback { }); } catch (Exception ex) { long cost = System.currentTimeMillis() - beginStartTime; - producer.updateFaultItem(brokerName, cost, true); + producer.updateFaultItem(brokerName, cost, true, false); onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } @@ -735,7 +735,7 @@ public class MQClientAPIImpl implements NameServerUpdateCallback { if (needRetry && tmp <= timesTotal) { String retryBrokerName = brokerName;//by default, it will send to the same broker if (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send - MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName); + MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName, false); retryBrokerName = instance.getBrokerNameFromMessageQueue(mqChosen); } String addr = instance.findBrokerAddressInPublish(retryBrokerName); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 8851bc815..9484b26f8 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; @@ -125,6 +126,12 @@ public class MQClientInstance { private final Set<String/* Broker address */> brokerSupportV2HeartbeatSet = new HashSet(); private final ConcurrentMap<String, Integer> brokerAddrHeartbeatFingerprintTable = new ConcurrentHashMap(); private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "MQClientFactoryScheduledThread")); + private final ScheduledExecutorService fetchRemoteConfigExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "MQClientFactoryFetchRemoteConfigScheduledThread"); + } + }); private final PullMessageService pullMessageService; private final RebalanceService rebalanceService; private final DefaultMQProducer defaultMQProducer; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 3f4c6e5f7..bbbb17b07 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -33,6 +33,8 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; + +import com.google.common.base.Optional; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.common.ClientErrorCode; @@ -49,6 +51,8 @@ import org.apache.rocketmq.client.impl.CommunicationMode; import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.latency.MQFaultStrategy; +import org.apache.rocketmq.client.latency.Resolver; +import org.apache.rocketmq.client.latency.ServiceDetector; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.LocalTransactionExecuter; import org.apache.rocketmq.client.producer.LocalTransactionState; @@ -112,7 +116,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { private ServiceState serviceState = ServiceState.CREATE_JUST; private MQClientInstance mQClientFactory; private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<>(); - private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy(); + private MQFaultStrategy mqFaultStrategy; private ExecutorService asyncSenderExecutor; // compression related @@ -153,8 +157,38 @@ public class DefaultMQProducerImpl implements MQProducerInner { semaphoreAsyncSendSize = new Semaphore(1024 * 1024, true); log.info("semaphoreAsyncSendSize can not be smaller than 1M."); } - } + ServiceDetector serviceDetector = new ServiceDetector() { + @Override + public boolean detect(String endpoint, long timeoutMillis) { + Optional<String> candidateTopic = pickTopic(); + if (!candidateTopic.isPresent()) { + return false; + } + try { + MessageQueue mq = new MessageQueue(candidateTopic.get(), null, 0); + mQClientFactory.getMQClientAPIImpl() + .getMaxOffset(endpoint, mq, timeoutMillis); + return true; + } catch (Exception e) { + return false; + } + } + }; + + this.mqFaultStrategy = new MQFaultStrategy(defaultMQProducer.cloneClientConfig(), new Resolver() { + @Override + public String resolve(String name) { + return DefaultMQProducerImpl.this.mQClientFactory.findBrokerAddressInPublish(name); + } + }, serviceDetector); + } + private Optional<String> pickTopic() { + if (topicPublishInfoTable.isEmpty()) { + return Optional.absent(); + } + return Optional.of(topicPublishInfoTable.keySet().iterator().next()); + } public void registerCheckForbiddenHook(CheckForbiddenHook checkForbiddenHook) { this.checkForbiddenHookList.add(checkForbiddenHook); log.info("register a new checkForbiddenHook. hookName={}, allHookSize={}", checkForbiddenHook.hookName(), @@ -229,6 +263,10 @@ public class DefaultMQProducerImpl implements MQProducerInner { mQClientFactory.start(); } + if (this.mqFaultStrategy.isStartDetectorEnable()) { + this.mqFaultStrategy.startDetector(); + } + log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), this.defaultMQProducer.isSendMessageWithVIPChannel()); this.serviceState = ServiceState.RUNNING; @@ -273,6 +311,9 @@ public class DefaultMQProducerImpl implements MQProducerInner { if (shutdownFactory) { this.mQClientFactory.shutdown(); } + if (this.mqFaultStrategy.isStartDetectorEnable()) { + this.mqFaultStrategy.shutdown(); + } RequestFutureHolder.getInstance().shutdown(this); log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup()); this.serviceState = ServiceState.SHUTDOWN_ALREADY; @@ -574,7 +615,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { } public MessageQueue invokeMessageQueueSelector(Message msg, MessageQueueSelector selector, Object arg, - final long timeout) throws MQClientException, RemotingTooMuchRequestException { + final long timeout) throws MQClientException, RemotingTooMuchRequestException { long beginStartTime = System.currentTimeMillis(); this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); @@ -584,7 +625,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { MessageQueue mq = null; try { List<MessageQueue> messageQueueList = - mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList()); + mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList()); Message userMessage = MessageAccessor.cloneMessage(msg); String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace()); userMessage.setTopic(userTopic); @@ -609,12 +650,13 @@ public class DefaultMQProducerImpl implements MQProducerInner { throw new MQClientException("No route info for this topic, " + msg.getTopic(), null); } - public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { - return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); + public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName, final boolean resetIndex) { + return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName, resetIndex); } - public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { - this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation); + public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation, + boolean reachable) { + this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation, reachable); } private void validateNameServerSetting() throws MQClientException { @@ -647,9 +689,13 @@ public class DefaultMQProducerImpl implements MQProducerInner { int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; + boolean resetIndex = false; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); - MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); + if (times > 0) { + resetIndex = true; + } + MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName, resetIndex); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); @@ -667,7 +713,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); - this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); + this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true); switch (communicationMode) { case ASYNC: return null; @@ -684,9 +730,22 @@ public class DefaultMQProducerImpl implements MQProducerInner { default: break; } - } catch (RemotingException | MQClientException e) { + } catch (MQClientException e) { endTimestamp = System.currentTimeMillis(); - this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); + this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true); + log.warn("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e); + log.warn(msg.toString()); + exception = e; + continue; + } catch (RemotingException e) { + endTimestamp = System.currentTimeMillis(); + if (this.mqFaultStrategy.isStartDetectorEnable()) { + // Set this broker unreachable when detecting schedule task is running for RemotingException. + this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true, false); + } else { + // Otherwise, isolate this broker. + this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true, true); + } log.warn("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e); if (log.isDebugEnabled()) { log.debug(msg.toString()); @@ -695,7 +754,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { continue; } catch (MQBrokerException e) { endTimestamp = System.currentTimeMillis(); - this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); + this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true, false); log.warn("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e); if (log.isDebugEnabled()) { log.debug(msg.toString()); @@ -712,7 +771,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { } } catch (InterruptedException e) { endTimestamp = System.currentTimeMillis(); - this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); + this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true); log.warn("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e); if (log.isDebugEnabled()) { log.debug(msg.toString()); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java index 275ada7ac..37b1f3252 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java @@ -18,6 +18,8 @@ package org.apache.rocketmq.client.impl.producer; import java.util.ArrayList; import java.util.List; + +import com.google.common.base.Preconditions; import org.apache.rocketmq.client.common.ThreadLocalIndex; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.protocol.route.QueueData; @@ -30,6 +32,10 @@ public class TopicPublishInfo { private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); private TopicRouteData topicRouteData; + public interface QueueFilter { + boolean filter(MessageQueue mq); + } + public boolean isOrderTopic() { return orderTopic; } @@ -66,6 +72,40 @@ public class TopicPublishInfo { this.haveTopicRouterInfo = haveTopicRouterInfo; } + public MessageQueue selectOneMessageQueue(QueueFilter ...filter) { + return selectOneMessageQueue(this.messageQueueList, this.sendWhichQueue, filter); + } + + private MessageQueue selectOneMessageQueue(List<MessageQueue> messageQueueList, ThreadLocalIndex sendQueue, QueueFilter ...filter) { + if (messageQueueList == null || messageQueueList.isEmpty()) { + return null; + } + + if (filter != null && filter.length != 0) { + for (int i = 0; i < messageQueueList.size(); i++) { + int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size()); + MessageQueue mq = messageQueueList.get(index); + boolean filterResult = true; + for (QueueFilter f: filter) { + Preconditions.checkNotNull(f); + filterResult &= f.filter(mq); + } + if (filterResult) { + return mq; + } + } + + return null; + } + + int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size()); + return messageQueueList.get(index); + } + + public void resetIndex() { + this.sendWhichQueue.reset(); + } + public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName == null) { return selectOneMessageQueue(); diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java index 09a8aa461..72d2f3450 100644 --- a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java +++ b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java @@ -18,11 +18,75 @@ package org.apache.rocketmq.client.latency; public interface LatencyFaultTolerance<T> { - void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration); + /** + * Update brokers' states, to decide if they are good or not. + * + * @param name Broker's name. + * @param currentLatency Current message sending process's latency. + * @param notAvailableDuration Corresponding not available time, ms. The broker will be not available until it + * spends such time. + * @param reachable To decide if this broker is reachable or not. + */ + void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration, + final boolean reachable); + /** + * To check if this broker is available. + * + * @param name Broker's name. + * @return boolean variable, if this is true, then the broker is available. + */ boolean isAvailable(final T name); + /** + * To check if this broker is reachable. + * + * @param name Broker's name. + * @return boolean variable, if this is true, then the broker is reachable. + */ + boolean isReachable(final T name); + + /** + * Remove the broker in this fault item table. + * + * @param name broker's name. + */ void remove(final T name); + /** + * The worst situation, no broker can be available. Then choose random one. + * + * @return A random mq will be returned. + */ T pickOneAtLeast(); + + /** + * Start a new thread, to detect the broker's reachable tag. + */ + void startDetector(); + + /** + * Shutdown threads that started by LatencyFaultTolerance. + */ + void shutdown(); + + /** + * A function reserved, just detect by once, won't create a new thread. + */ + void detectByOneRound(); + + /** + * Use it to set the detect timeout bound. + * + * @param detectTimeout timeout bound + */ + void setDetectTimeout(final int detectTimeout); + + /** + * Use it to set the detector's detector interval for each broker (each broker will be detected once during this + * time) + * + * @param detectInterval each broker's detecting interval + */ + void setDetectInterval(final int detectInterval); } diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java index 93795d957..8af629574 100644 --- a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java @@ -21,30 +21,97 @@ import java.util.Collections; import java.util.Enumeration; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.common.ThreadLocalIndex; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> { - private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<>(16); + private final static Logger log = LoggerFactory.getLogger(MQFaultStrategy.class); + private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16); + private int detectTimeout = 200; + private int detectInterval = 2000; + private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex(); + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "LatencyFaultToleranceScheduledThread"); + } + }); - private final ThreadLocalIndex randomItem = new ThreadLocalIndex(); + private final Resolver resolver; + + private final ServiceDetector serviceDetector; + + public LatencyFaultToleranceImpl(Resolver resolver, ServiceDetector serviceDetector) { + this.resolver = resolver; + this.serviceDetector = serviceDetector; + } + + public void detectByOneRound() { + for (Map.Entry<String, FaultItem> item : this.faultItemTable.entrySet()) { + FaultItem brokerItem = item.getValue(); + if (System.currentTimeMillis() - brokerItem.checkStamp >= 0) { + brokerItem.checkStamp = System.currentTimeMillis() + this.detectInterval; + String brokerAddr = resolver.resolve(brokerItem.getName()); + if (brokerAddr == null) { + faultItemTable.remove(item.getKey()); + continue; + } + if (null == serviceDetector) { + continue; + } + boolean serviceOK = serviceDetector.detect(brokerAddr, detectTimeout); + if (serviceOK && !brokerItem.reachableFlag) { + log.info(brokerItem.name + " is reachable now, then it can be used."); + brokerItem.reachableFlag = true; + } + } + } + } + + public void startDetector() { + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + detectByOneRound(); + } catch (Exception e) { + log.warn("Unexpected exception raised while detecting service reachability", e); + } + } + }, 3, 3, TimeUnit.SECONDS); + } + + public void shutdown() { + this.scheduledExecutorService.shutdown(); + } @Override - public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { + public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration, + final boolean reachable) { FaultItem old = this.faultItemTable.get(name); if (null == old) { final FaultItem faultItem = new FaultItem(name); faultItem.setCurrentLatency(currentLatency); - faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); - + faultItem.updateNotAvailableDuration(notAvailableDuration); + faultItem.setReachable(reachable); old = this.faultItemTable.putIfAbsent(name, faultItem); - if (old != null) { - old.setCurrentLatency(currentLatency); - old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); - } - } else { + } + + if (null != old) { old.setCurrentLatency(currentLatency); - old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); + old.updateNotAvailableDuration(notAvailableDuration); + old.setReachable(reachable); + } + + if (!reachable) { + log.info(name + " is unreachable, it will not be used until it's reachable"); } } @@ -57,6 +124,14 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> return true; } + public boolean isReachable(final String name) { + final FaultItem faultItem = this.faultItemTable.get(name); + if (faultItem != null) { + return faultItem.isReachable(); + } + return true; + } + @Override public void remove(final String name) { this.faultItemTable.remove(name); @@ -65,68 +140,98 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> @Override public String pickOneAtLeast() { final Enumeration<FaultItem> elements = this.faultItemTable.elements(); - List<FaultItem> tmpList = new LinkedList<>(); + List<FaultItem> tmpList = new LinkedList<FaultItem>(); while (elements.hasMoreElements()) { final FaultItem faultItem = elements.nextElement(); tmpList.add(faultItem); } + if (!tmpList.isEmpty()) { - Collections.sort(tmpList); - final int half = tmpList.size() / 2; - if (half <= 0) { - return tmpList.get(0).getName(); - } else { - final int i = this.randomItem.incrementAndGet() % half; - return tmpList.get(i).getName(); + Collections.shuffle(tmpList); + for (FaultItem faultItem : tmpList) { + if (faultItem.reachableFlag) { + return faultItem.name; + } } } + return null; } @Override public String toString() { return "LatencyFaultToleranceImpl{" + - "faultItemTable=" + faultItemTable + - ", whichItemWorst=" + randomItem + - '}'; + "faultItemTable=" + faultItemTable + + ", whichItemWorst=" + whichItemWorst + + '}'; + } + + public void setDetectTimeout(final int detectTimeout) { + this.detectTimeout = detectTimeout; } - class FaultItem implements Comparable<FaultItem> { + public void setDetectInterval(final int detectInterval) { + this.detectInterval = detectInterval; + } + + public class FaultItem implements Comparable<FaultItem> { private final String name; private volatile long currentLatency; private volatile long startTimestamp; + private volatile long checkStamp; + private volatile boolean reachableFlag; public FaultItem(final String name) { this.name = name; } + public void updateNotAvailableDuration(long notAvailableDuration) { + if (notAvailableDuration > 0 && System.currentTimeMillis() + notAvailableDuration > this.startTimestamp) { + this.startTimestamp = System.currentTimeMillis() + notAvailableDuration; + log.info(name + " will be isolated for " + notAvailableDuration + " ms."); + } + } + @Override public int compareTo(final FaultItem other) { if (this.isAvailable() != other.isAvailable()) { - if (this.isAvailable()) + if (this.isAvailable()) { return -1; + } - if (other.isAvailable()) + if (other.isAvailable()) { return 1; + } } - if (this.currentLatency < other.currentLatency) + if (this.currentLatency < other.currentLatency) { return -1; - else if (this.currentLatency > other.currentLatency) { + } else if (this.currentLatency > other.currentLatency) { return 1; } - if (this.startTimestamp < other.startTimestamp) + if (this.startTimestamp < other.startTimestamp) { return -1; - else if (this.startTimestamp > other.startTimestamp) { + } else if (this.startTimestamp > other.startTimestamp) { return 1; } - return 0; } + public void setReachable(boolean reachableFlag) { + this.reachableFlag = reachableFlag; + } + + public void setCheckStamp(long checkStamp) { + this.checkStamp = checkStamp; + } + public boolean isAvailable() { - return (System.currentTimeMillis() - startTimestamp) >= 0; + return reachableFlag && System.currentTimeMillis() >= startTimestamp; + } + + public boolean isReachable() { + return reachableFlag; } @Override @@ -139,28 +244,32 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> @Override public boolean equals(final Object o) { - if (this == o) + if (this == o) { return true; - if (!(o instanceof FaultItem)) + } + if (!(o instanceof FaultItem)) { return false; + } final FaultItem faultItem = (FaultItem) o; - if (getCurrentLatency() != faultItem.getCurrentLatency()) + if (getCurrentLatency() != faultItem.getCurrentLatency()) { return false; - if (getStartTimestamp() != faultItem.getStartTimestamp()) + } + if (getStartTimestamp() != faultItem.getStartTimestamp()) { return false; + } return getName() != null ? getName().equals(faultItem.getName()) : faultItem.getName() == null; - } @Override public String toString() { return "FaultItem{" + - "name='" + name + '\'' + - ", currentLatency=" + currentLatency + - ", startTimestamp=" + startTimestamp + - '}'; + "name='" + name + '\'' + + ", currentLatency=" + currentLatency + + ", startTimestamp=" + startTimestamp + + ", reachableFlag=" + reachableFlag + + '}'; } public String getName() { diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java index 1e1953fad..c01490784 100644 --- a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java +++ b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java @@ -17,25 +17,86 @@ package org.apache.rocketmq.client.latency; -import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; +import org.apache.rocketmq.client.impl.producer.TopicPublishInfo.QueueFilter; import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.logging.org.slf4j.Logger; -import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; public class MQFaultStrategy { - private final static Logger log = LoggerFactory.getLogger(MQFaultStrategy.class); - private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl(); + private LatencyFaultTolerance<String> latencyFaultTolerance; + private boolean sendLatencyFaultEnable; + private boolean startDetectorEnable; + private long[] latencyMax = {50L, 100L, 550L, 1800L, 3000L, 5000L, 15000L}; + private long[] notAvailableDuration = {0L, 0L, 2000L, 5000L, 6000L, 10000L, 30000L}; - private boolean sendLatencyFaultEnable = false; + public static class BrokerFilter implements QueueFilter { + private String lastBrokerName; + + public void setLastBrokerName(String lastBrokerName) { + this.lastBrokerName = lastBrokerName; + } + + @Override public boolean filter(MessageQueue mq) { + if (lastBrokerName != null) { + return !mq.getBrokerName().equals(lastBrokerName); + } + return true; + } + } + + private ThreadLocal<BrokerFilter> threadBrokerFilter = new ThreadLocal<BrokerFilter>() { + @Override protected BrokerFilter initialValue() { + return new BrokerFilter(); + } + }; + + private QueueFilter reachableFilter = new QueueFilter() { + @Override public boolean filter(MessageQueue mq) { + return latencyFaultTolerance.isReachable(mq.getBrokerName()); + } + }; + + private QueueFilter availableFilter = new QueueFilter() { + @Override public boolean filter(MessageQueue mq) { + return latencyFaultTolerance.isAvailable(mq.getBrokerName()); + } + }; + + + public MQFaultStrategy(ClientConfig cc, Resolver fetcher, ServiceDetector serviceDetector) { + this.setStartDetectorEnable(cc.isStartDetectorEnable()); + this.setSendLatencyFaultEnable(cc.isSendLatencyEnable()); + this.latencyFaultTolerance = new LatencyFaultToleranceImpl(fetcher, serviceDetector); + this.latencyFaultTolerance.setDetectInterval(cc.getDetectInterval()); + this.latencyFaultTolerance.setDetectTimeout(cc.getDetectTimeout()); + } + + // For unit test. + public MQFaultStrategy(ClientConfig cc, LatencyFaultTolerance<String> tolerance) { + this.setStartDetectorEnable(cc.isStartDetectorEnable()); + this.setSendLatencyFaultEnable(cc.isSendLatencyEnable()); + this.latencyFaultTolerance = tolerance; + this.latencyFaultTolerance.setDetectInterval(cc.getDetectInterval()); + this.latencyFaultTolerance.setDetectTimeout(cc.getDetectTimeout()); + } - private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; - private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; public long[] getNotAvailableDuration() { return notAvailableDuration; } + public QueueFilter getAvailableFilter() { + return availableFilter; + } + + public QueueFilter getReachableFilter() { + return reachableFilter; + } + + public ThreadLocal<BrokerFilter> getThreadBrokerFilter() { + return threadBrokerFilter; + } + public void setNotAvailableDuration(final long[] notAvailableDuration) { this.notAvailableDuration = notAvailableDuration; } @@ -56,51 +117,69 @@ public class MQFaultStrategy { this.sendLatencyFaultEnable = sendLatencyFaultEnable; } - public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { + public boolean isStartDetectorEnable() { + return startDetectorEnable; + } + + public void setStartDetectorEnable(boolean startDetectorEnable) { + this.startDetectorEnable = startDetectorEnable; + } + + public void startDetector() { + // user should start the detector + // and the thread should not be in running state. + if (this.sendLatencyFaultEnable && this.startDetectorEnable) { + // start the detector. + this.latencyFaultTolerance.startDetector(); + } + } + + public void shutdown() { + if (this.sendLatencyFaultEnable && this.startDetectorEnable) { + this.latencyFaultTolerance.shutdown(); + } + } + + public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName, final boolean resetIndex) { + BrokerFilter brokerFilter = threadBrokerFilter.get(); + brokerFilter.setLastBrokerName(lastBrokerName); if (this.sendLatencyFaultEnable) { - try { - int index = tpInfo.getSendWhichQueue().incrementAndGet(); - for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { - int pos = index++ % tpInfo.getMessageQueueList().size(); - MessageQueue mq = tpInfo.getMessageQueueList().get(pos); - if (!StringUtils.equals(lastBrokerName, mq.getBrokerName()) && latencyFaultTolerance.isAvailable(mq.getBrokerName())) { - return mq; - } - } - - final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); - int writeQueueNums = tpInfo.getWriteQueueNumsByBroker(notBestBroker); - if (writeQueueNums > 0) { - final MessageQueue mq = tpInfo.selectOneMessageQueue(); - if (notBestBroker != null) { - mq.setBrokerName(notBestBroker); - mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums); - } - return mq; - } else { - latencyFaultTolerance.remove(notBestBroker); - } - } catch (Exception e) { - log.error("Error occurred when selecting message queue", e); + if (resetIndex) { + tpInfo.resetIndex(); + } + MessageQueue mq = tpInfo.selectOneMessageQueue(availableFilter, brokerFilter); + if (mq != null) { + return mq; + } + + mq = tpInfo.selectOneMessageQueue(reachableFilter, brokerFilter); + if (mq != null) { + return mq; } return tpInfo.selectOneMessageQueue(); } - return tpInfo.selectOneMessageQueue(lastBrokerName); + MessageQueue mq = tpInfo.selectOneMessageQueue(brokerFilter); + if (mq != null) { + return mq; + } + return tpInfo.selectOneMessageQueue(); } - public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { + public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation, + final boolean reachable) { if (this.sendLatencyFaultEnable) { - long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); - this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); + long duration = computeNotAvailableDuration(isolation ? 10000 : currentLatency); + this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration, reachable); } } private long computeNotAvailableDuration(final long currentLatency) { for (int i = latencyMax.length - 1; i >= 0; i--) { - if (currentLatency >= latencyMax[i]) + if (currentLatency >= latencyMax[i]) { return this.notAvailableDuration[i]; + } } return 0; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/GetRemoteClientConfigBody.java b/client/src/main/java/org/apache/rocketmq/client/latency/Resolver.java similarity index 65% rename from remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/GetRemoteClientConfigBody.java rename to client/src/main/java/org/apache/rocketmq/client/latency/Resolver.java index 6aa547047..1c29ba334 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/GetRemoteClientConfigBody.java +++ b/client/src/main/java/org/apache/rocketmq/client/latency/Resolver.java @@ -14,20 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.remoting.protocol.body; +package org.apache.rocketmq.client.latency; -import java.util.ArrayList; -import java.util.List; -import org.apache.rocketmq.remoting.protocol.RemotingSerializable; +public interface Resolver { -public class GetRemoteClientConfigBody extends RemotingSerializable { - private List<String> keys = new ArrayList<>(); - - public List<String> getKeys() { - return keys; - } - - public void setKeys(List<String> keys) { - this.keys = keys; - } + String resolve(String name); } diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/ServiceDetector.java b/client/src/main/java/org/apache/rocketmq/client/latency/ServiceDetector.java new file mode 100644 index 000000000..c6ffbad1c --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/latency/ServiceDetector.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.latency; + +/** + * Detect whether the remote service state is normal. + */ +public interface ServiceDetector { + + /** + * Check if the remote service is normal. + * @param endpoint Service endpoint to check against + * @return true if the service is back to normal; false otherwise. + */ + boolean detect(String endpoint, long timeoutMillis); +} diff --git a/client/src/test/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImplTest.java b/client/src/test/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImplTest.java index 86690e40b..42ccdae5a 100644 --- a/client/src/test/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImplTest.java @@ -16,11 +16,14 @@ */ package org.apache.rocketmq.client.latency; -import java.util.concurrent.TimeUnit; +import org.awaitility.core.ThrowingRunnable; import org.junit.Before; import org.junit.Test; +import java.util.concurrent.TimeUnit; + import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; public class LatencyFaultToleranceImplTest { private LatencyFaultTolerance<String> latencyFaultTolerance; @@ -29,28 +32,31 @@ public class LatencyFaultToleranceImplTest { @Before public void init() { - latencyFaultTolerance = new LatencyFaultToleranceImpl(); + latencyFaultTolerance = new LatencyFaultToleranceImpl(null, null); } @Test public void testUpdateFaultItem() throws Exception { - latencyFaultTolerance.updateFaultItem(brokerName, 3000, 3000); + latencyFaultTolerance.updateFaultItem(brokerName, 3000, 3000, true); assertThat(latencyFaultTolerance.isAvailable(brokerName)).isFalse(); assertThat(latencyFaultTolerance.isAvailable(anotherBrokerName)).isTrue(); } @Test public void testIsAvailable() throws Exception { - latencyFaultTolerance.updateFaultItem(brokerName, 3000, 50); + latencyFaultTolerance.updateFaultItem(brokerName, 3000, 50, true); assertThat(latencyFaultTolerance.isAvailable(brokerName)).isFalse(); - TimeUnit.MILLISECONDS.sleep(70); - assertThat(latencyFaultTolerance.isAvailable(brokerName)).isTrue(); + await().atMost(500, TimeUnit.MILLISECONDS).untilAsserted(new ThrowingRunnable() { + @Override public void run() throws Throwable { + assertThat(latencyFaultTolerance.isAvailable(brokerName)).isTrue(); + } + }); } @Test public void testRemove() throws Exception { - latencyFaultTolerance.updateFaultItem(brokerName, 3000, 3000); + latencyFaultTolerance.updateFaultItem(brokerName, 3000, 3000, true); assertThat(latencyFaultTolerance.isAvailable(brokerName)).isFalse(); latencyFaultTolerance.remove(brokerName); assertThat(latencyFaultTolerance.isAvailable(brokerName)).isTrue(); @@ -58,10 +64,20 @@ public class LatencyFaultToleranceImplTest { @Test public void testPickOneAtLeast() throws Exception { - latencyFaultTolerance.updateFaultItem(brokerName, 1000, 3000); + latencyFaultTolerance.updateFaultItem(brokerName, 1000, 3000, true); assertThat(latencyFaultTolerance.pickOneAtLeast()).isEqualTo(brokerName); - latencyFaultTolerance.updateFaultItem(anotherBrokerName, 1001, 3000); - assertThat(latencyFaultTolerance.pickOneAtLeast()).isEqualTo(brokerName); + // Bad case, since pickOneAtLeast's behavior becomes random + // latencyFaultTolerance.updateFaultItem(anotherBrokerName, 1001, 3000, "127.0.0.1:12011", true); + // assertThat(latencyFaultTolerance.pickOneAtLeast()).isEqualTo(brokerName); + } + + @Test + public void testIsReachable() throws Exception { + latencyFaultTolerance.updateFaultItem(brokerName, 1000, 3000, true); + assertThat(latencyFaultTolerance.isReachable(brokerName)).isEqualTo(true); + + latencyFaultTolerance.updateFaultItem(anotherBrokerName, 1001, 3000, false); + assertThat(latencyFaultTolerance.isReachable(anotherBrokerName)).isEqualTo(false); } } \ No newline at end of file diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java index fada0efd7..485b95c42 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java @@ -41,7 +41,6 @@ import org.apache.rocketmq.remoting.protocol.RequestCode; import org.apache.rocketmq.remoting.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup; import org.apache.rocketmq.remoting.protocol.body.GetBrokerMemberGroupResponseBody; -import org.apache.rocketmq.remoting.protocol.body.GetRemoteClientConfigBody; import org.apache.rocketmq.remoting.protocol.body.RegisterBrokerBody; import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.remoting.protocol.body.TopicList; @@ -132,8 +131,6 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { return this.updateConfig(ctx, request); case RequestCode.GET_NAMESRV_CONFIG: return this.getConfig(ctx, request); - case RequestCode.GET_CLIENT_CONFIG: - return this.getClientConfigs(ctx, request); default: String error = " request type " + request.getCode() + " not supported"; return RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); @@ -661,25 +658,4 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand getClientConfigs(ChannelHandlerContext ctx, RemotingCommand request) { - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - final GetRemoteClientConfigBody body = GetRemoteClientConfigBody.decode(request.getBody(), GetRemoteClientConfigBody.class); - - String content = this.namesrvController.getConfiguration().getClientConfigsFormatString(body.getKeys()); - if (StringUtils.isNotBlank(content)) { - try { - response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); - } catch (UnsupportedEncodingException e) { - log.error("getConfig error, ", e); - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("UnsupportedEncodingException " + e); - return response; - } - } - - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } - } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java index 2994893d7..b2478fec3 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java @@ -232,6 +232,12 @@ public class ProxyConfig implements ConfigFile { private String remotingAccessAddr = ""; private int remotingListenPort = 8080; + // related to proxy's send strategy in cluster mode. + private boolean sendLatencyEnable = false; + private boolean startDetectorEnable = false; + private int detectTimeout = 200; + private int detectInterval = 2 * 1000; + private int remotingHeartbeatThreadPoolNums = 2 * PROCESSOR_NUMBER; private int remotingTopicRouteThreadPoolNums = 2 * PROCESSOR_NUMBER; private int remotingSendMessageThreadPoolNums = 4 * PROCESSOR_NUMBER; @@ -1409,6 +1415,46 @@ public class ProxyConfig implements ConfigFile { this.remotingWaitTimeMillsInDefaultQueue = remotingWaitTimeMillsInDefaultQueue; } + public boolean isSendLatencyEnable() { + return sendLatencyEnable; + } + + public boolean isStartDetectorEnable() { + return startDetectorEnable; + } + + public void setStartDetectorEnable(boolean startDetectorEnable) { + this.startDetectorEnable = startDetectorEnable; + } + + public void setSendLatencyEnable(boolean sendLatencyEnable) { + this.sendLatencyEnable = sendLatencyEnable; + } + + public boolean getStartDetectorEnable() { + return this.startDetectorEnable; + } + + public boolean getSendLatencyEnable() { + return this.sendLatencyEnable; + } + + public int getDetectTimeout() { + return detectTimeout; + } + + public void setDetectTimeout(int detectTimeout) { + this.detectTimeout = detectTimeout; + } + + public int getDetectInterval() { + return detectInterval; + } + + public void setDetectInterval(int detectInterval) { + this.detectInterval = detectInterval; + } + public boolean isEnableBatchAck() { return enableBatchAck; } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java index 6146c80cd..f670df205 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java @@ -382,7 +382,7 @@ public class SendMessageActivity extends AbstractMessingActivity { int bucket = Hashing.consistentHash(shardingKey.hashCode(), writeQueues.size()); targetMessageQueue = writeQueues.get(bucket); } else { - targetMessageQueue = messageQueueView.getWriteSelector().selectOne(false); + targetMessageQueue = messageQueueView.getWriteSelector().selectOneByPipeline(false); } return targetMessageQueue; } catch (Exception e) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java index 0d0c62168..a80f6df0b 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.proxy.processor; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; + import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; @@ -66,6 +67,8 @@ public class ProducerProcessor extends AbstractProcessor { public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx, QueueSelector queueSelector, String producerGroup, int sysFlag, List<Message> messageList, long timeoutMillis) { CompletableFuture<List<SendResult>> future = new CompletableFuture<>(); + long beginTimestampFirst = System.currentTimeMillis(); + AddressableMessageQueue messageQueue = null; try { Message message = messageList.get(0); String topic = message.getTopic(); @@ -79,7 +82,7 @@ public class ProducerProcessor extends AbstractProcessor { } } } - AddressableMessageQueue messageQueue = queueSelector.select(ctx, + messageQueue = queueSelector.select(ctx, this.serviceManager.getTopicRouteService().getCurrentMessageQueueView(ctx, topic)); if (messageQueue == null) { throw new ProxyException(ProxyExceptionCode.FORBIDDEN, "no writable queue"); @@ -90,6 +93,7 @@ public class ProducerProcessor extends AbstractProcessor { } SendMessageRequestHeader requestHeader = buildSendMessageRequestHeader(messageList, producerGroup, sysFlag, messageQueue.getQueueId()); + AddressableMessageQueue finalMessageQueue = messageQueue; future = this.serviceManager.getMessageService().sendMessage( ctx, messageQueue, @@ -102,11 +106,19 @@ public class ProducerProcessor extends AbstractProcessor { if (SendStatus.SEND_OK.equals(sendResult.getSendStatus()) && tranType == MessageSysFlag.TRANSACTION_PREPARED_TYPE && StringUtils.isNotBlank(sendResult.getTransactionId())) { - fillTransactionData(ctx, producerGroup, messageQueue, sendResult, messageList); + fillTransactionData(ctx, producerGroup, finalMessageQueue, sendResult, messageList); } } return sendResultList; - }, this.executor); + }, this.executor) + .whenComplete((result, exception) -> { + long endTimestamp = System.currentTimeMillis(); + if (exception != null) { + this.serviceManager.getTopicRouteService().updateFaultItem(finalMessageQueue.getBrokerName(), endTimestamp - beginTimestampFirst, true, false); + } else { + this.serviceManager.getTopicRouteService().updateFaultItem(finalMessageQueue.getBrokerName(),endTimestamp - beginTimestampFirst, false, true); + } + }); } catch (Throwable t) { future.completeExceptionally(t); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java index d67b68f38..aced15cee 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java @@ -54,7 +54,7 @@ public class LocalTopicRouteService extends TopicRouteService { @Override public MessageQueueView getCurrentMessageQueueView(ProxyContext ctx, String topic) throws Exception { TopicConfig topicConfig = this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic); - return new MessageQueueView(topic, toTopicRouteData(topicConfig)); + return new MessageQueueView(topic, toTopicRouteData(topicConfig), null); } @Override diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java index 85cd18d45..f25fb907e 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.proxy.service.route; import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; import com.google.common.math.IntMath; import java.util.ArrayList; import java.util.Collections; @@ -30,6 +31,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; +import org.apache.rocketmq.client.latency.MQFaultStrategy; import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.protocol.route.QueueData; @@ -44,8 +47,9 @@ public class MessageQueueSelector { private final Map<String, AddressableMessageQueue> brokerNameQueueMap = new ConcurrentHashMap<>(); private final AtomicInteger queueIndex; private final AtomicInteger brokerIndex; + private MQFaultStrategy mqFaultStrategy; - public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper, boolean read) { + public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper, MQFaultStrategy mqFaultStrategy, boolean read) { if (read) { this.queues.addAll(buildRead(topicRouteWrapper)); } else { @@ -55,6 +59,7 @@ public class MessageQueueSelector { Random random = new Random(); this.queueIndex = new AtomicInteger(random.nextInt()); this.brokerIndex = new AtomicInteger(random.nextInt()); + this.mqFaultStrategy = mqFaultStrategy; } private static List<AddressableMessageQueue> buildRead(TopicRouteWrapper topicRoute) { @@ -154,6 +159,86 @@ public class MessageQueueSelector { return selectOneByIndex(nextIndex, onlyBroker); } + public AddressableMessageQueue selectOneByPipeline(boolean onlyBroker) { + if (mqFaultStrategy != null && mqFaultStrategy.isSendLatencyFaultEnable()) { + List<MessageQueue> messageQueueList = null; + MessageQueue messageQueue = null; + if (onlyBroker) { + messageQueueList = transferAddressableQueues(brokerActingQueues); + } else { + messageQueueList = transferAddressableQueues(queues); + } + AddressableMessageQueue addressableMessageQueue = null; + + // use both available filter. + messageQueue = selectOneMessageQueue(messageQueueList, onlyBroker ? brokerIndex : queueIndex, + mqFaultStrategy.getAvailableFilter(), mqFaultStrategy.getReachableFilter()); + addressableMessageQueue = transferQueue2Addressable(messageQueue); + if (addressableMessageQueue != null) { + return addressableMessageQueue; + } + + // use available filter. + messageQueue = selectOneMessageQueue(messageQueueList, onlyBroker ? brokerIndex : queueIndex, + mqFaultStrategy.getAvailableFilter()); + addressableMessageQueue = transferQueue2Addressable(messageQueue); + if (addressableMessageQueue != null) { + return addressableMessageQueue; + } + + // no available filter, then use reachable filter. + messageQueue = selectOneMessageQueue(messageQueueList, onlyBroker ? brokerIndex : queueIndex, + mqFaultStrategy.getReachableFilter()); + addressableMessageQueue = transferQueue2Addressable(messageQueue); + if (addressableMessageQueue != null) { + return addressableMessageQueue; + } + } + + // SendLatency is not enabled, or no queue is selected, then select by index. + return selectOne(onlyBroker); + } + + private MessageQueue selectOneMessageQueue(List<MessageQueue> messageQueueList, AtomicInteger sendQueue, TopicPublishInfo.QueueFilter...filter) { + if (messageQueueList == null || messageQueueList.isEmpty()) { + return null; + } + if (filter != null && filter.length != 0) { + for (int i = 0; i < messageQueueList.size(); i++) { + int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size()); + MessageQueue mq = messageQueueList.get(index); + boolean filterResult = true; + for (TopicPublishInfo.QueueFilter f: filter) { + Preconditions.checkNotNull(f); + filterResult &= f.filter(mq); + } + if (filterResult) { + return mq; + } + } + } + return null; + } + + public List<MessageQueue> transferAddressableQueues(List<AddressableMessageQueue> addressableMessageQueueList) { + if (addressableMessageQueueList == null) { + return null; + } + + return addressableMessageQueueList.stream() + .map(AddressableMessageQueue::getMessageQueue) + .collect(Collectors.toList()); + } + + private AddressableMessageQueue transferQueue2Addressable(MessageQueue messageQueue) { + for (AddressableMessageQueue amq: queues) { + if (amq.getMessageQueue().equals(messageQueue)) { + return amq; + } + } + return null; + } + public AddressableMessageQueue selectNextOne(AddressableMessageQueue last) { boolean onlyBroker = last.getQueueId() < 0; AddressableMessageQueue newOne = last; @@ -190,6 +275,14 @@ public class MessageQueueSelector { return brokerActingQueues; } + public MQFaultStrategy getMQFaultStrategy() { + return mqFaultStrategy; + } + + public void setMQFaultStrategy(MQFaultStrategy mqFaultStrategy) { + this.mqFaultStrategy = mqFaultStrategy; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java index fe5387cfd..8b3c2f7c8 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java @@ -17,20 +17,22 @@ package org.apache.rocketmq.proxy.service.route; import com.google.common.base.MoreObjects; +import org.apache.rocketmq.client.latency.MQFaultStrategy; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; public class MessageQueueView { - public static final MessageQueueView WRAPPED_EMPTY_QUEUE = new MessageQueueView("", new TopicRouteData()); + public static final MessageQueueView WRAPPED_EMPTY_QUEUE = new MessageQueueView("", new TopicRouteData(), null); private final MessageQueueSelector readSelector; private final MessageQueueSelector writeSelector; private final TopicRouteWrapper topicRouteWrapper; + private MQFaultStrategy mqFaultStrategy; - public MessageQueueView(String topic, TopicRouteData topicRouteData) { + public MessageQueueView(String topic, TopicRouteData topicRouteData, MQFaultStrategy mqFaultStrategy) { this.topicRouteWrapper = new TopicRouteWrapper(topicRouteData, topic); - this.readSelector = new MessageQueueSelector(topicRouteWrapper, true); - this.writeSelector = new MessageQueueSelector(topicRouteWrapper, false); + this.readSelector = new MessageQueueSelector(topicRouteWrapper, mqFaultStrategy, true); + this.writeSelector = new MessageQueueSelector(topicRouteWrapper, mqFaultStrategy, false); } public TopicRouteData getTopicRouteData() { @@ -65,4 +67,12 @@ public class MessageQueueView { .add("topicRouteWrapper", topicRouteWrapper) .toString(); } + + public MQFaultStrategy getMQFaultStrategy() { + return mqFaultStrategy; + } + + public void setMQFaultStrategy(MQFaultStrategy mqFaultStrategy) { + this.mqFaultStrategy = mqFaultStrategy; + } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java index 84348adc3..74769a423 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java @@ -25,7 +25,13 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; + +import com.google.common.base.Optional; +import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.latency.MQFaultStrategy; +import org.apache.rocketmq.client.latency.Resolver; +import org.apache.rocketmq.client.latency.ServiceDetector; import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; @@ -39,6 +45,7 @@ import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.ProxyConfig; import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; @@ -47,6 +54,7 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown { private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); private final MQClientAPIFactory mqClientAPIFactory; + private MQFaultStrategy mqFaultStrategy; protected final LoadingCache<String /* topicName */, MessageQueueView> topicCache; protected final ScheduledExecutorService scheduledExecutorService; @@ -97,15 +105,83 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown { } } }); - + ServiceDetector serviceDetector = new ServiceDetector() { + @Override + public boolean detect(String endpoint, long timeoutMillis) { + Optional<String> candidateTopic = pickTopic(); + if (!candidateTopic.isPresent()) { + return false; + } + try { + GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader(); + requestHeader.setTopic(candidateTopic.get()); + requestHeader.setQueueId(0); + Long maxOffset = mqClientAPIFactory.getClient().getMaxOffset(endpoint, requestHeader, timeoutMillis).get(); + return true; + } catch (Exception e) { + return false; + } + } + }; + mqFaultStrategy = new MQFaultStrategy(extractClientConfigFromProxyConfig(config), new Resolver() { + @Override + public String resolve(String name) { + try { + String brokerAddr = getBrokerAddr(null, name); + return brokerAddr; + } catch (Exception e) { + return null; + } + } + }, serviceDetector); this.init(); } + // pickup one topic in the topic cache + private Optional<String> pickTopic() { + if (topicCache.asMap().isEmpty()) { + return Optional.absent(); + } + return Optional.of(topicCache.asMap().keySet().iterator().next()); + } + protected void init() { this.appendShutdown(this.scheduledExecutorService::shutdown); this.appendStartAndShutdown(this.mqClientAPIFactory); } + @Override + public void shutdown() throws Exception { + if (this.mqFaultStrategy.isStartDetectorEnable()) { + mqFaultStrategy.shutdown(); + } + } + + @Override + public void start() throws Exception { + if (this.mqFaultStrategy.isStartDetectorEnable()) { + this.mqFaultStrategy.startDetector(); + } + } + + public ClientConfig extractClientConfigFromProxyConfig(ProxyConfig proxyConfig) { + ClientConfig tempClientConfig = new ClientConfig(); + tempClientConfig.setSendLatencyEnable(proxyConfig.getSendLatencyEnable()); + tempClientConfig.setStartDetectorEnable(proxyConfig.getStartDetectorEnable()); + tempClientConfig.setDetectTimeout(proxyConfig.getDetectTimeout()); + tempClientConfig.setDetectInterval(proxyConfig.getDetectInterval()); + return tempClientConfig; + } + + public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation, + boolean reachable) { + this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation, reachable); + } + + public MQFaultStrategy getMqFaultStrategy() { + return this.mqFaultStrategy; + } + public MessageQueueView getAllMessageQueueView(ProxyContext ctx, String topicName) throws Exception { return getCacheMessageQueueWrapper(this.topicCache, topicName); } @@ -136,7 +212,7 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown { protected MessageQueueView buildMessageQueueView(String topic, TopicRouteData topicRouteData) { if (isTopicRouteValid(topicRouteData)) { - MessageQueueView tmp = new MessageQueueView(topic, topicRouteData); + MessageQueueView tmp = new MessageQueueView(topic, topicRouteData, TopicRouteService.this.getMqFaultStrategy()); log.debug("load topic route from namesrv. topic: {}, queue: {}", topic, tmp); return tmp; } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java index 7fd9a9ffd..77ae5e4d1 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java @@ -93,7 +93,6 @@ public class ReceiveMessageActivityTest extends BaseActivityTest { pollTimeCaptor.capture(), anyInt(), any(), anyBoolean(), any(), isNull(), anyLong())) .thenReturn(CompletableFuture.completedFuture(new PopResult(PopStatus.NO_NEW_MSG, Collections.emptyList()))); - ProxyContext context = createContext(); context.setRemainingMs(1L); this.receiveMessageActivity.receiveMessage( @@ -274,7 +273,7 @@ public class ReceiveMessageActivityTest extends BaseActivityTest { } @Test - public void testReceiveMessageQueueSelector() { + public void testReceiveMessageQueueSelector() throws Exception { TopicRouteData topicRouteData = new TopicRouteData(); List<QueueData> queueDatas = new ArrayList<>(); for (int i = 0; i < 2; i++) { @@ -298,7 +297,7 @@ public class ReceiveMessageActivityTest extends BaseActivityTest { } topicRouteData.setBrokerDatas(brokerDatas); - MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData); + MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData, null); ReceiveMessageActivity.ReceiveMessageQueueSelector selector = new ReceiveMessageActivity.ReceiveMessageQueueSelector(""); AddressableMessageQueue firstSelect = selector.select(ProxyContext.create(), messageQueueView); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java index 588423bb9..4882a5ed8 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java @@ -35,6 +35,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.latency.MQFaultStrategy; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.MixAll; @@ -49,6 +51,7 @@ import org.apache.rocketmq.proxy.grpc.v2.BaseActivityTest; import org.apache.rocketmq.proxy.grpc.v2.common.GrpcProxyException; import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue; import org.apache.rocketmq.proxy.service.route.MessageQueueView; +import org.apache.rocketmq.proxy.service.route.TopicRouteService; import org.apache.rocketmq.remoting.protocol.route.BrokerData; import org.apache.rocketmq.remoting.protocol.route.QueueData; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; @@ -62,15 +65,19 @@ import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class SendMessageActivityTest extends BaseActivityTest { protected static final String BROKER_NAME = "broker"; + protected static final String BROKER_NAME2 = "broker2"; protected static final String CLUSTER_NAME = "cluster"; protected static final String BROKER_ADDR = "127.0.0.1:10911"; + protected static final String BROKER_ADDR2 = "127.0.0.1:10912"; private static final String TOPIC = "topic"; private static final String CONSUMER_GROUP = "consumerGroup"; + MQFaultStrategy mqFaultStrategy; private SendMessageActivity sendMessageActivity; @@ -262,7 +269,7 @@ public class SendMessageActivityTest extends BaseActivityTest { } @Test - public void testSendOrderMessageQueueSelector() { + public void testSendOrderMessageQueueSelector() throws Exception { TopicRouteData topicRouteData = new TopicRouteData(); QueueData queueData = new QueueData(); BrokerData brokerData = new BrokerData(); @@ -277,7 +284,7 @@ public class SendMessageActivityTest extends BaseActivityTest { brokerData.setBrokerAddrs(brokerAddrs); topicRouteData.setBrokerDatas(Lists.newArrayList(brokerData)); - MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData); + MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData, null); SendMessageActivity.SendMessageQueueSelector selector1 = new SendMessageActivity.SendMessageQueueSelector( SendMessageRequest.newBuilder() .addMessages(Message.newBuilder() @@ -288,6 +295,12 @@ public class SendMessageActivityTest extends BaseActivityTest { .build() ); + TopicRouteService topicRouteService = mock(TopicRouteService.class); + MQFaultStrategy mqFaultStrategy = mock(MQFaultStrategy.class); + when(topicRouteService.getAllMessageQueueView(any(), any())).thenReturn(messageQueueView); + when(topicRouteService.getMqFaultStrategy()).thenReturn(mqFaultStrategy); + when(mqFaultStrategy.isSendLatencyFaultEnable()).thenReturn(false); + SendMessageActivity.SendMessageQueueSelector selector2 = new SendMessageActivity.SendMessageQueueSelector( SendMessageRequest.newBuilder() .addMessages(Message.newBuilder() @@ -328,12 +341,17 @@ public class SendMessageActivityTest extends BaseActivityTest { brokerData.setBrokerAddrs(brokerAddrs); topicRouteData.setBrokerDatas(Lists.newArrayList(brokerData)); - MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData); + SendMessageActivity.SendMessageQueueSelector selector = new SendMessageActivity.SendMessageQueueSelector( SendMessageRequest.newBuilder() .addMessages(Message.newBuilder().build()) .build() ); + TopicRouteService topicRouteService = mock(TopicRouteService.class); + MQFaultStrategy mqFaultStrategy = mock(MQFaultStrategy.class); + when(topicRouteService.getMqFaultStrategy()).thenReturn(mqFaultStrategy); + when(mqFaultStrategy.isSendLatencyFaultEnable()).thenReturn(false); + MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData, topicRouteService.getMqFaultStrategy()); AddressableMessageQueue firstSelect = selector.select(ProxyContext.create(), messageQueueView); AddressableMessageQueue secondSelect = selector.select(ProxyContext.create(), messageQueueView); @@ -343,6 +361,45 @@ public class SendMessageActivityTest extends BaseActivityTest { assertNotEquals(firstSelect, secondSelect); } + @Test + public void testSendNormalMessageQueueSelectorPipeLine() throws Exception { + TopicRouteData topicRouteData = new TopicRouteData(); + int queueNums = 2; + + QueueData queueData = createQueueData(BROKER_NAME, queueNums); + QueueData queueData2 = createQueueData(BROKER_NAME2, queueNums); + topicRouteData.setQueueDatas(Lists.newArrayList(queueData,queueData2)); + + + BrokerData brokerData = createBrokerData(CLUSTER_NAME, BROKER_NAME, BROKER_ADDR); + BrokerData brokerData2 = createBrokerData(CLUSTER_NAME, BROKER_NAME2, BROKER_ADDR2); + topicRouteData.setBrokerDatas(Lists.newArrayList(brokerData, brokerData2)); + + SendMessageActivity.SendMessageQueueSelector selector = new SendMessageActivity.SendMessageQueueSelector( + SendMessageRequest.newBuilder() + .addMessages(Message.newBuilder().build()) + .build() + ); + + ClientConfig cc = new ClientConfig(); + this.mqFaultStrategy = new MQFaultStrategy(cc, null, null); + mqFaultStrategy.setSendLatencyFaultEnable(true); + mqFaultStrategy.updateFaultItem(BROKER_NAME2, 1000, true, true); + mqFaultStrategy.updateFaultItem(BROKER_NAME, 1000, true, false); + + TopicRouteService topicRouteService = mock(TopicRouteService.class); + when(topicRouteService.getMqFaultStrategy()).thenReturn(mqFaultStrategy); + MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData, topicRouteService.getMqFaultStrategy()); + + + AddressableMessageQueue firstSelect = selector.select(ProxyContext.create(), messageQueueView); + assertEquals(firstSelect.getBrokerName(), BROKER_NAME2); + + mqFaultStrategy.updateFaultItem(BROKER_NAME2, 1000, true, false); + mqFaultStrategy.updateFaultItem(BROKER_NAME, 1000, true, true); + AddressableMessageQueue secondSelect = selector.select(ProxyContext.create(), messageQueueView); + assertEquals(secondSelect.getBrokerName(), BROKER_NAME); + } @Test public void testParameterValidate() { // too large message body @@ -850,4 +907,23 @@ public class SendMessageActivityTest extends BaseActivityTest { } return sb.toString(); } + + private static QueueData createQueueData(String brokerName, int writeQueueNums) { + QueueData queueData = new QueueData(); + queueData.setBrokerName(brokerName); + queueData.setWriteQueueNums(writeQueueNums); + queueData.setPerm(PermName.PERM_WRITE); + return queueData; + } + + private static BrokerData createBrokerData(String clusterName, String brokerName, String brokerAddrs) { + BrokerData brokerData = new BrokerData(); + brokerData.setCluster(clusterName); + brokerData.setBrokerName(brokerName); + HashMap<Long, String> brokerAddrsMap = new HashMap<>(); + brokerAddrsMap.put(MixAll.MASTER_ID, brokerAddrs); + brokerData.setBrokerAddrs(brokerAddrsMap); + + return brokerData; + } } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/BaseServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/BaseServiceTest.java index c97bd5a72..ca6fe909e 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/BaseServiceTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/BaseServiceTest.java @@ -78,7 +78,7 @@ public class BaseServiceTest extends InitConfigTest { topicRouteData.setBrokerDatas(Lists.newArrayList(brokerData)); when(this.topicRouteService.getAllMessageQueueView(any(), eq(ERR_TOPIC))).thenThrow(new MQClientException(ResponseCode.TOPIC_NOT_EXIST, "")); - when(this.topicRouteService.getAllMessageQueueView(any(), eq(TOPIC))).thenReturn(new MessageQueueView(TOPIC, topicRouteData)); - when(this.topicRouteService.getAllMessageQueueView(any(), eq(CLUSTER_NAME))).thenReturn(new MessageQueueView(CLUSTER_NAME, topicRouteData)); + when(this.topicRouteService.getAllMessageQueueView(any(), eq(TOPIC))).thenReturn(new MessageQueueView(TOPIC, topicRouteData, null)); + when(this.topicRouteService.getAllMessageQueueView(any(), eq(CLUSTER_NAME))).thenReturn(new MessageQueueView(CLUSTER_NAME, topicRouteData, null)); } } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java index e44ed28f4..d150f87c4 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java @@ -30,12 +30,12 @@ public class MessageQueueSelectorTest extends BaseServiceTest { public void testReadMessageQueue() { queueData.setPerm(PermName.PERM_READ); queueData.setReadQueueNums(0); - MessageQueueSelector messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), true); + MessageQueueSelector messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), null, true); assertTrue(messageQueueSelector.getQueues().isEmpty()); queueData.setPerm(PermName.PERM_READ); queueData.setReadQueueNums(3); - messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), true); + messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), null, true); assertEquals(3, messageQueueSelector.getQueues().size()); assertEquals(1, messageQueueSelector.getBrokerActingQueues().size()); for (int i = 0; i < messageQueueSelector.getQueues().size(); i++) { @@ -58,12 +58,12 @@ public class MessageQueueSelectorTest extends BaseServiceTest { public void testWriteMessageQueue() { queueData.setPerm(PermName.PERM_WRITE); queueData.setReadQueueNums(0); - MessageQueueSelector messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), false); + MessageQueueSelector messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), null, false); assertTrue(messageQueueSelector.getQueues().isEmpty()); queueData.setPerm(PermName.PERM_WRITE); queueData.setWriteQueueNums(3); - messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), false); + messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), null, false); assertEquals(3, messageQueueSelector.getQueues().size()); assertEquals(1, messageQueueSelector.getBrokerActingQueues().size()); for (int i = 0; i < messageQueueSelector.getQueues().size(); i++) { diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java index c67f4953d..43fba3d03 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java @@ -132,7 +132,7 @@ public class HeartbeatSyncerTest extends InitConfigTest { brokerAddr.put(0L, "127.0.0.1:10911"); brokerData.setBrokerAddrs(brokerAddr); topicRouteData.getBrokerDatas().add(brokerData); - MessageQueueView messageQueueView = new MessageQueueView("foo", topicRouteData); + MessageQueueView messageQueueView = new MessageQueueView("foo", topicRouteData, null); when(this.topicRouteService.getAllMessageQueueView(any(), anyString())).thenReturn(messageQueueView); } } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java index a0063544e..91af74cbe 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java @@ -64,7 +64,7 @@ public class ClusterTransactionServiceTest extends BaseServiceTest { this.clusterTransactionService = new ClusterTransactionService(this.topicRouteService, this.producerManager, this.mqClientAPIFactory); - MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData); + MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData, null); when(this.topicRouteService.getAllMessageQueueView(any(), anyString())) .thenReturn(messageQueueView); @@ -127,7 +127,7 @@ public class ClusterTransactionServiceTest extends BaseServiceTest { brokerData.setBrokerAddrs(brokerAddrs); topicRouteData.getQueueDatas().add(queueData); topicRouteData.getBrokerDatas().add(brokerData); - when(this.topicRouteService.getAllMessageQueueView(any(), eq(TOPIC))).thenReturn(new MessageQueueView(TOPIC, topicRouteData)); + when(this.topicRouteService.getAllMessageQueueView(any(), eq(TOPIC))).thenReturn(new MessageQueueView(TOPIC, topicRouteData, null)); TopicRouteData clusterTopicRouteData = new TopicRouteData(); QueueData clusterQueueData = new QueueData(); @@ -141,7 +141,7 @@ public class ClusterTransactionServiceTest extends BaseServiceTest { brokerAddrs.put(MixAll.MASTER_ID, BROKER_ADDR); clusterBrokerData.setBrokerAddrs(brokerAddrs); clusterTopicRouteData.setBrokerDatas(Lists.newArrayList(clusterBrokerData)); - when(this.topicRouteService.getAllMessageQueueView(any(), eq(CLUSTER_NAME))).thenReturn(new MessageQueueView(CLUSTER_NAME, clusterTopicRouteData)); + when(this.topicRouteService.getAllMessageQueueView(any(), eq(CLUSTER_NAME))).thenReturn(new MessageQueueView(CLUSTER_NAME, clusterTopicRouteData, null)); TopicRouteData clusterTopicRouteData2 = new TopicRouteData(); QueueData clusterQueueData2 = new QueueData(); @@ -155,7 +155,7 @@ public class ClusterTransactionServiceTest extends BaseServiceTest { brokerAddrs.put(MixAll.MASTER_ID, brokerAddr2); clusterBrokerData2.setBrokerAddrs(brokerAddrs); clusterTopicRouteData2.setBrokerDatas(Lists.newArrayList(clusterBrokerData2)); - when(this.topicRouteService.getAllMessageQueueView(any(), eq(clusterName2))).thenReturn(new MessageQueueView(clusterName2, clusterTopicRouteData2)); + when(this.topicRouteService.getAllMessageQueueView(any(), eq(clusterName2))).thenReturn(new MessageQueueView(clusterName2, clusterTopicRouteData2, null)); ConfigurationManager.getProxyConfig().setTransactionHeartbeatBatchNum(2); this.clusterTransactionService.start(); -- 2.32.0.windows.2
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.
浙ICP备2022010568号-2