Projects
Eulaceura:Factory
rocketmq
_service:obs_scm:patch016-backport-Optimize-fau...
Sign Up
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File _service:obs_scm:patch016-backport-Optimize-fault-tolerant-mechanism.patch of Package rocketmq
From e11e29419f6e2d1d9673d0329e57b824ebf3da47 Mon Sep 17 00:00:00 2001 From: lizhimins <707364882@qq.com> Date: Wed, 6 Sep 2023 20:42:24 +0800 Subject: [PATCH 1/3] [ISSUE #7308] Adding topic blacklist and filter in tiered storage module (#7310) --- .../tieredstore/TieredDispatcher.java | 21 +++++++-- .../tieredstore/TieredMessageStore.java | 1 + .../file/TieredFlatFileManager.java | 17 ++++--- .../TieredStoreTopicBlackListFilter.java | 45 +++++++++++++++++++ .../provider/TieredStoreTopicFilter.java | 25 +++++++++++ .../TieredStoreTopicBlackListFilterTest.java | 36 +++++++++++++++ 6 files changed, 136 insertions(+), 9 deletions(-) create mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilter.java create mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicFilter.java create mode 100644 tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilterTest.java diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java index 430c2b62e..766c559e9 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java @@ -48,6 +48,8 @@ import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile; import org.apache.rocketmq.tieredstore.file.TieredFlatFileManager; import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant; import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager; +import org.apache.rocketmq.tieredstore.provider.TieredStoreTopicBlackListFilter; +import org.apache.rocketmq.tieredstore.provider.TieredStoreTopicFilter; import org.apache.rocketmq.tieredstore.util.CQItemBufferUtil; import org.apache.rocketmq.tieredstore.util.MessageBufferUtil; import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; @@ -56,6 +58,7 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME); + private TieredStoreTopicFilter topicFilter; private final String brokerName; private final MessageStore defaultStore; private final TieredMessageStoreConfig storeConfig; @@ -70,15 +73,15 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch this.defaultStore = defaultStore; this.storeConfig = storeConfig; this.brokerName = storeConfig.getBrokerName(); + this.topicFilter = new TieredStoreTopicBlackListFilter(); this.tieredFlatFileManager = TieredFlatFileManager.getInstance(storeConfig); this.dispatchRequestReadMap = new ConcurrentHashMap<>(); this.dispatchRequestWriteMap = new ConcurrentHashMap<>(); this.dispatchTaskLock = new ReentrantLock(); this.dispatchWriteLock = new ReentrantLock(); - this.initScheduleTask(); } - private void initScheduleTask() { + protected void initScheduleTask() { TieredStoreExecutor.commonScheduledExecutor.scheduleWithFixedDelay(() -> tieredFlatFileManager.deepCopyFlatFileToList().forEach(flatFile -> { if (!flatFile.getCompositeFlatFileLock().isLocked()) { @@ -87,6 +90,14 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch }), 30, 10, TimeUnit.SECONDS); } + public TieredStoreTopicFilter getTopicFilter() { + return topicFilter; + } + + public void setTopicFilter(TieredStoreTopicFilter topicFilter) { + this.topicFilter = topicFilter; + } + @Override public void dispatch(DispatchRequest request) { if (stopped) { @@ -94,7 +105,7 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch } String topic = request.getTopic(); - if (TieredStoreUtil.isSystemTopic(topic)) { + if (topicFilter != null && topicFilter.filterTopic(topic)) { return; } @@ -219,6 +230,10 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch return; } + if (topicFilter != null && topicFilter.filterTopic(flatFile.getMessageQueue().getTopic())) { + return; + } + if (flatFile.getDispatchOffset() == -1L) { return; } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java index 78e855f36..9fb1b2f01 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java @@ -90,6 +90,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore { boolean loadNextStore = next.load(); boolean result = loadFlatFile && loadNextStore; if (result) { + dispatcher.initScheduleTask(); dispatcher.start(); } return result; diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java index e9ae4a5a5..7c744af3b 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java @@ -134,21 +134,21 @@ public class TieredFlatFileManager { public void doCleanExpiredFile() { long expiredTimeStamp = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime()); - Random random = new Random(); for (CompositeQueueFlatFile flatFile : deepCopyFlatFileToList()) { - int delay = random.nextInt(storeConfig.getMaxCommitJitter()); - TieredStoreExecutor.cleanExpiredFileExecutor.schedule(() -> { + TieredStoreExecutor.cleanExpiredFileExecutor.submit(() -> { flatFile.getCompositeFlatFileLock().lock(); try { flatFile.cleanExpiredFile(expiredTimeStamp); flatFile.destroyExpiredFile(); if (flatFile.getConsumeQueueBaseOffset() == -1) { + logger.info("Clean flatFile because file not initialized, topic={}, queueId={}", + flatFile.getMessageQueue().getTopic(), flatFile.getMessageQueue().getQueueId()); destroyCompositeFile(flatFile.getMessageQueue()); } } finally { flatFile.getCompositeFlatFileLock().unlock(); } - }, delay, TimeUnit.MILLISECONDS); + }); } if (indexFile != null) { indexFile.cleanExpiredFile(expiredTimeStamp); @@ -218,8 +218,13 @@ public class TieredFlatFileManager { storeConfig.getBrokerName(), queueMetadata.getQueue().getQueueId())); queueCount.incrementAndGet(); }); - logger.info("Recover TopicFlatFile, topic: {}, queueCount: {}, cost: {}ms", - topicMetadata.getTopic(), queueCount.get(), subWatch.elapsed(TimeUnit.MILLISECONDS)); + + if (queueCount.get() == 0L) { + metadataStore.deleteTopic(topicMetadata.getTopic()); + } else { + logger.info("Recover TopicFlatFile, topic: {}, queueCount: {}, cost: {}ms", + topicMetadata.getTopic(), queueCount.get(), subWatch.elapsed(TimeUnit.MILLISECONDS)); + } } catch (Exception e) { logger.error("Recover TopicFlatFile error, topic: {}", topicMetadata.getTopic(), e); } finally { diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilter.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilter.java new file mode 100644 index 000000000..50adbb713 --- /dev/null +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilter.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.tieredstore.provider; + +import java.util.HashSet; +import java.util.Set; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; + +public class TieredStoreTopicBlackListFilter implements TieredStoreTopicFilter { + + private final Set<String> topicBlackSet; + + public TieredStoreTopicBlackListFilter() { + this.topicBlackSet = new HashSet<>(); + } + + @Override + public boolean filterTopic(String topicName) { + if (StringUtils.isBlank(topicName)) { + return true; + } + return TieredStoreUtil.isSystemTopic(topicName) || topicBlackSet.contains(topicName); + } + + @Override + public void addTopicToWhiteList(String topicName) { + this.topicBlackSet.add(topicName); + } +} diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicFilter.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicFilter.java new file mode 100644 index 000000000..3f26b8b02 --- /dev/null +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicFilter.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.tieredstore.provider; + +public interface TieredStoreTopicFilter { + + boolean filterTopic(String topicName); + + void addTopicToWhiteList(String topicName); +} diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilterTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilterTest.java new file mode 100644 index 000000000..2bf48173c --- /dev/null +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilterTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tieredstore.provider; + +import org.apache.rocketmq.common.topic.TopicValidator; +import org.junit.Assert; +import org.junit.Test; + +public class TieredStoreTopicBlackListFilterTest { + + @Test + public void filterTopicTest() { + TieredStoreTopicFilter topicFilter = new TieredStoreTopicBlackListFilter(); + Assert.assertTrue(topicFilter.filterTopic("")); + Assert.assertTrue(topicFilter.filterTopic(TopicValidator.SYSTEM_TOPIC_PREFIX + "_Topic")); + + String topicName = "WhiteTopic"; + Assert.assertFalse(topicFilter.filterTopic(topicName)); + topicFilter.addTopicToWhiteList(topicName); + Assert.assertTrue(topicFilter.filterTopic(topicName)); + } +} \ No newline at end of file -- 2.32.0.windows.2 From 628020537fa7035226bc8dcde9fa33d9d5df30ff Mon Sep 17 00:00:00 2001 From: rongtong <jinrongtong5@163.com> Date: Thu, 7 Sep 2023 16:17:47 +0800 Subject: [PATCH 2/3] [ISSUE #7293] Fix NPE when alter sync state set --- .../rocketmq/controller/impl/manager/ReplicasInfoManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java index b0a67531d..d83a690f9 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java @@ -104,7 +104,7 @@ public class ReplicasInfoManager { } // Check master - if (!syncStateInfo.getMasterBrokerId().equals(request.getMasterBrokerId())) { + if (syncStateInfo.getMasterBrokerId() == null || !syncStateInfo.getMasterBrokerId().equals(request.getMasterBrokerId())) { String err = String.format("Rejecting alter syncStateSet request because the current leader is:{%s}, not {%s}", syncStateInfo.getMasterBrokerId(), request.getMasterBrokerId()); LOGGER.error("{}", err); -- 2.32.0.windows.2 From 6fd0073d6475c539e8f4c30dc4f104a56a21d724 Mon Sep 17 00:00:00 2001 From: Ji Juntao <juntao.jjt@alibaba-inc.com> Date: Thu, 7 Sep 2023 20:21:16 +0800 Subject: [PATCH 3/3] [ISSUE #7319] Optimize fault-tolerant mechanism for sending messages and hot update switch (#7320) --- .../impl/producer/DefaultMQProducerImpl.java | 8 ++------ .../client/latency/LatencyFaultTolerance.java | 14 +++++++++++++ .../latency/LatencyFaultToleranceImpl.java | 13 +++++++++++- .../client/latency/MQFaultStrategy.java | 20 +++++++------------ .../proxy/service/route/MessageQueueView.java | 9 --------- .../service/route/TopicRouteService.java | 10 +++++++++- 6 files changed, 44 insertions(+), 30 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 2d6b83ac2..b0c212e46 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -263,9 +263,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { mQClientFactory.start(); } - if (this.mqFaultStrategy.isStartDetectorEnable()) { - this.mqFaultStrategy.startDetector(); - } + this.mqFaultStrategy.startDetector(); log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), this.defaultMQProducer.isSendMessageWithVIPChannel()); @@ -311,9 +309,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { if (shutdownFactory) { this.mQClientFactory.shutdown(); } - if (this.mqFaultStrategy.isStartDetectorEnable()) { - this.mqFaultStrategy.shutdown(); - } + this.mqFaultStrategy.shutdown(); RequestFutureHolder.getInstance().shutdown(this); log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup()); this.serviceState = ServiceState.SHUTDOWN_ALREADY; diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java index 72d2f3450..17aaa266a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java +++ b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java @@ -89,4 +89,18 @@ public interface LatencyFaultTolerance<T> { * @param detectInterval each broker's detecting interval */ void setDetectInterval(final int detectInterval); + + /** + * Use it to set the detector work or not. + * + * @param startDetectorEnable set the detector's work status + */ + void setStartDetectorEnable(final boolean startDetectorEnable); + + /** + * Use it to judge if the detector enabled. + * + * @return is the detector should be started. + */ + boolean isStartDetectorEnable(); } diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java index 8af629574..d3ff7eb45 100644 --- a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java @@ -37,6 +37,8 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> private int detectTimeout = 200; private int detectInterval = 2000; private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex(); + + private volatile boolean startDetectorEnable = false; private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { @@ -80,7 +82,9 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> @Override public void run() { try { - detectByOneRound(); + if (startDetectorEnable) { + detectByOneRound(); + } } catch (Exception e) { log.warn("Unexpected exception raised while detecting service reachability", e); } @@ -137,6 +141,13 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> this.faultItemTable.remove(name); } + public boolean isStartDetectorEnable() { + return startDetectorEnable; + } + + public void setStartDetectorEnable(boolean startDetectorEnable) { + this.startDetectorEnable = startDetectorEnable; + } @Override public String pickOneAtLeast() { final Enumeration<FaultItem> elements = this.faultItemTable.elements(); diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java index c01490784..69fb533e5 100644 --- a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java +++ b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java @@ -24,8 +24,8 @@ import org.apache.rocketmq.common.message.MessageQueue; public class MQFaultStrategy { private LatencyFaultTolerance<String> latencyFaultTolerance; - private boolean sendLatencyFaultEnable; - private boolean startDetectorEnable; + private volatile boolean sendLatencyFaultEnable; + private volatile boolean startDetectorEnable; private long[] latencyMax = {50L, 100L, 550L, 1800L, 3000L, 5000L, 15000L}; private long[] notAvailableDuration = {0L, 0L, 2000L, 5000L, 6000L, 10000L, 30000L}; @@ -64,11 +64,11 @@ public class MQFaultStrategy { public MQFaultStrategy(ClientConfig cc, Resolver fetcher, ServiceDetector serviceDetector) { - this.setStartDetectorEnable(cc.isStartDetectorEnable()); - this.setSendLatencyFaultEnable(cc.isSendLatencyEnable()); this.latencyFaultTolerance = new LatencyFaultToleranceImpl(fetcher, serviceDetector); this.latencyFaultTolerance.setDetectInterval(cc.getDetectInterval()); this.latencyFaultTolerance.setDetectTimeout(cc.getDetectTimeout()); + this.setStartDetectorEnable(cc.isStartDetectorEnable()); + this.setSendLatencyFaultEnable(cc.isSendLatencyEnable()); } // For unit test. @@ -123,21 +123,15 @@ public class MQFaultStrategy { public void setStartDetectorEnable(boolean startDetectorEnable) { this.startDetectorEnable = startDetectorEnable; + this.latencyFaultTolerance.setStartDetectorEnable(startDetectorEnable); } public void startDetector() { - // user should start the detector - // and the thread should not be in running state. - if (this.sendLatencyFaultEnable && this.startDetectorEnable) { - // start the detector. - this.latencyFaultTolerance.startDetector(); - } + this.latencyFaultTolerance.startDetector(); } public void shutdown() { - if (this.sendLatencyFaultEnable && this.startDetectorEnable) { - this.latencyFaultTolerance.shutdown(); - } + this.latencyFaultTolerance.shutdown(); } public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName, final boolean resetIndex) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java index 8b3c2f7c8..898e529f8 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java @@ -26,7 +26,6 @@ public class MessageQueueView { private final MessageQueueSelector readSelector; private final MessageQueueSelector writeSelector; private final TopicRouteWrapper topicRouteWrapper; - private MQFaultStrategy mqFaultStrategy; public MessageQueueView(String topic, TopicRouteData topicRouteData, MQFaultStrategy mqFaultStrategy) { this.topicRouteWrapper = new TopicRouteWrapper(topicRouteData, topic); @@ -67,12 +66,4 @@ public class MessageQueueView { .add("topicRouteWrapper", topicRouteWrapper) .toString(); } - - public MQFaultStrategy getMQFaultStrategy() { - return mqFaultStrategy; - } - - public void setMQFaultStrategy(MQFaultStrategy mqFaultStrategy) { - this.mqFaultStrategy = mqFaultStrategy; - } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java index 74769a423..caf62a1e0 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java @@ -127,7 +127,7 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown { @Override public String resolve(String name) { try { - String brokerAddr = getBrokerAddr(null, name); + String brokerAddr = getBrokerAddr(ProxyContext.createForInner("MQFaultStrategy"), name); return brokerAddr; } catch (Exception e) { return null; @@ -175,9 +175,17 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown { public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation, boolean reachable) { + checkSendFaultToleranceEnable(); this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation, reachable); } + public void checkSendFaultToleranceEnable() { + boolean hotLatencySwitch = ConfigurationManager.getProxyConfig().isSendLatencyEnable(); + boolean hotDetectorSwitch = ConfigurationManager.getProxyConfig().isStartDetectorEnable(); + this.mqFaultStrategy.setSendLatencyFaultEnable(hotLatencySwitch); + this.mqFaultStrategy.setStartDetectorEnable(hotDetectorSwitch); + } + public MQFaultStrategy getMqFaultStrategy() { return this.mqFaultStrategy; } -- 2.32.0.windows.2
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.
浙ICP备2022010568号-2