Projects
home:Eustace:branches:Eulaceura:Factory
rocketmq
_service:obs_scm:patch017-backport-Convergent-t...
Sign Up
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File _service:obs_scm:patch017-backport-Convergent-thread-pool-creation.patch of Package rocketmq
From c100d815d754d7cb330bc63e145bafd2d9b59cb1 Mon Sep 17 00:00:00 2001 From: guyinyou <36399867+guyinyou@users.noreply.github.com> Date: Mon, 11 Sep 2023 10:13:56 +0800 Subject: [PATCH 1/6] [ISSUE #7328] Convergent thread pool creation (#7329) * Convergence thread pool creation to facilitate subsequent iteration management * Convergence thread pool creation in ThreadPoolMonitor.java * fix unit test * Convergence ThreadPool constructor * Convergence ScheduledThreadPool constructor * remove unused import * Convergence ScheduledThreadPool constructor * remove unused import --------- --- .../rocketmq/broker/BrokerController.java | 39 +++++----- .../client/ClientHousekeepingService.java | 4 +- .../DefaultConsumerIdsChangeListener.java | 3 +- .../broker/controller/ReplicasManager.java | 9 +-- .../dledger/DLedgerRoleChangeHandler.java | 4 +- .../broker/failover/EscapeBridge.java | 4 +- .../broker/latency/BrokerFastFailure.java | 5 +- .../BrokerFixedThreadPoolExecutor.java | 57 -------------- .../broker/latency/FutureTaskExt.java | 39 ---------- .../rocketmq/broker/out/BrokerOuterAPI.java | 7 +- .../schedule/ScheduleMessageService.java | 7 +- .../broker/topic/TopicRouteInfoManager.java | 4 +- ...ractTransactionalMessageCheckListener.java | 4 +- .../rocketmq/broker/BrokerControllerTest.java | 2 +- .../broker/latency/BrokerFastFailureTest.java | 1 + .../common/config/AbstractRocksDBStorage.java | 6 +- .../FutureTaskExtThreadPoolExecutor.java | 3 +- .../common/thread/ThreadPoolMonitor.java | 6 +- .../rocketmq/common/utils/ThreadUtils.java | 74 ++++++++++++++++--- .../rocketmq/container/BrokerContainer.java | 6 +- .../controller/ControllerManager.java | 14 +--- .../controller/impl/DLedgerController.java | 10 +-- .../DefaultBrokerHeartbeatManager.java | 3 +- .../rocketmq/namesrv/NamesrvController.java | 22 ++---- .../grpc/v2/channel/GrpcChannelManager.java | 6 +- .../remoting/RemotingProtocolServer.java | 4 +- .../proxy/service/ClusterServiceManager.java | 12 +-- .../proxy/service/LocalServiceManager.java | 4 +- .../receipt/DefaultReceiptHandleManager.java | 8 +- .../service/route/TopicRouteService.java | 9 +-- .../remoting/netty/NettyRemotingClient.java | 4 +- .../remoting/netty/NettyRemotingServer.java | 4 +- .../rocketmq/store/DefaultMessageStore.java | 8 +- .../ha/autoswitch/AutoSwitchHAService.java | 38 +++++----- .../rocketmq/store/kv/CompactionStore.java | 21 +++--- .../store/queue/ConsumeQueueStore.java | 4 +- .../store/stats/BrokerStatsManager.java | 14 ++-- .../store/timer/TimerMessageStore.java | 6 +- .../apache/rocketmq/test/util/StatUtil.java | 1 - .../common/TieredStoreExecutor.java | 14 ++-- .../tools/admin/DefaultMQAdminExtImpl.java | 3 +- 41 files changed, 215 insertions(+), 278 deletions(-) delete mode 100644 broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java delete mode 100644 broker/src/main/java/org/apache/rocketmq/broker/latency/FutureTaskExt.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 6aba70cb2..275b64b1a 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -34,7 +34,6 @@ import org.apache.rocketmq.broker.failover.EscapeBridge; import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap; import org.apache.rocketmq.broker.filter.ConsumerFilterManager; import org.apache.rocketmq.broker.latency.BrokerFastFailure; -import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor; import org.apache.rocketmq.broker.longpolling.LmqPullRequestHoldService; import org.apache.rocketmq.broker.longpolling.NotifyMessageArrivingListener; import org.apache.rocketmq.broker.longpolling.PullRequestHoldService; @@ -98,6 +97,7 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.common.stats.MomentStatsItem; import org.apache.rocketmq.common.utils.ServiceProvider; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.Configuration; @@ -160,7 +160,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -455,10 +454,10 @@ public class BrokerController { * Initialize resources including remoting server and thread executors. */ protected void initializeResources() { - this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, + this.scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1, new ThreadFactoryImpl("BrokerControllerScheduledThread", true, getBrokerIdentity())); - this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor( + this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor( this.brokerConfig.getSendMessageThreadPoolNums(), this.brokerConfig.getSendMessageThreadPoolNums(), 1000 * 60, @@ -466,7 +465,7 @@ public class BrokerController { this.sendThreadPoolQueue, new ThreadFactoryImpl("SendMessageThread_", getBrokerIdentity())); - this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor( + this.pullMessageExecutor = ThreadUtils.newThreadPoolExecutor( this.brokerConfig.getPullMessageThreadPoolNums(), this.brokerConfig.getPullMessageThreadPoolNums(), 1000 * 60, @@ -474,7 +473,7 @@ public class BrokerController { this.pullThreadPoolQueue, new ThreadFactoryImpl("PullMessageThread_", getBrokerIdentity())); - this.litePullMessageExecutor = new BrokerFixedThreadPoolExecutor( + this.litePullMessageExecutor = ThreadUtils.newThreadPoolExecutor( this.brokerConfig.getLitePullMessageThreadPoolNums(), this.brokerConfig.getLitePullMessageThreadPoolNums(), 1000 * 60, @@ -482,7 +481,7 @@ public class BrokerController { this.litePullThreadPoolQueue, new ThreadFactoryImpl("LitePullMessageThread_", getBrokerIdentity())); - this.putMessageFutureExecutor = new BrokerFixedThreadPoolExecutor( + this.putMessageFutureExecutor = ThreadUtils.newThreadPoolExecutor( this.brokerConfig.getPutMessageFutureThreadPoolNums(), this.brokerConfig.getPutMessageFutureThreadPoolNums(), 1000 * 60, @@ -490,7 +489,7 @@ public class BrokerController { this.putThreadPoolQueue, new ThreadFactoryImpl("SendMessageThread_", getBrokerIdentity())); - this.ackMessageExecutor = new BrokerFixedThreadPoolExecutor( + this.ackMessageExecutor = ThreadUtils.newThreadPoolExecutor( this.brokerConfig.getAckMessageThreadPoolNums(), this.brokerConfig.getAckMessageThreadPoolNums(), 1000 * 60, @@ -498,7 +497,7 @@ public class BrokerController { this.ackThreadPoolQueue, new ThreadFactoryImpl("AckMessageThread_", getBrokerIdentity())); - this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor( + this.queryMessageExecutor = ThreadUtils.newThreadPoolExecutor( this.brokerConfig.getQueryMessageThreadPoolNums(), this.brokerConfig.getQueryMessageThreadPoolNums(), 1000 * 60, @@ -506,7 +505,7 @@ public class BrokerController { this.queryThreadPoolQueue, new ThreadFactoryImpl("QueryMessageThread_", getBrokerIdentity())); - this.adminBrokerExecutor = new BrokerFixedThreadPoolExecutor( + this.adminBrokerExecutor = ThreadUtils.newThreadPoolExecutor( this.brokerConfig.getAdminBrokerThreadPoolNums(), this.brokerConfig.getAdminBrokerThreadPoolNums(), 1000 * 60, @@ -514,7 +513,7 @@ public class BrokerController { this.adminBrokerThreadPoolQueue, new ThreadFactoryImpl("AdminBrokerThread_", getBrokerIdentity())); - this.clientManageExecutor = new BrokerFixedThreadPoolExecutor( + this.clientManageExecutor = ThreadUtils.newThreadPoolExecutor( this.brokerConfig.getClientManageThreadPoolNums(), this.brokerConfig.getClientManageThreadPoolNums(), 1000 * 60, @@ -522,7 +521,7 @@ public class BrokerController { this.clientManagerThreadPoolQueue, new ThreadFactoryImpl("ClientManageThread_", getBrokerIdentity())); - this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor( + this.heartbeatExecutor = ThreadUtils.newThreadPoolExecutor( this.brokerConfig.getHeartbeatThreadPoolNums(), this.brokerConfig.getHeartbeatThreadPoolNums(), 1000 * 60, @@ -530,7 +529,7 @@ public class BrokerController { this.heartbeatThreadPoolQueue, new ThreadFactoryImpl("HeartbeatThread_", true, getBrokerIdentity())); - this.consumerManageExecutor = new BrokerFixedThreadPoolExecutor( + this.consumerManageExecutor = ThreadUtils.newThreadPoolExecutor( this.brokerConfig.getConsumerManageThreadPoolNums(), this.brokerConfig.getConsumerManageThreadPoolNums(), 1000 * 60, @@ -538,7 +537,7 @@ public class BrokerController { this.consumerManagerThreadPoolQueue, new ThreadFactoryImpl("ConsumerManageThread_", true, getBrokerIdentity())); - this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor( + this.replyMessageExecutor = ThreadUtils.newThreadPoolExecutor( this.brokerConfig.getProcessReplyMessageThreadPoolNums(), this.brokerConfig.getProcessReplyMessageThreadPoolNums(), 1000 * 60, @@ -546,7 +545,7 @@ public class BrokerController { this.replyThreadPoolQueue, new ThreadFactoryImpl("ProcessReplyMessageThread_", getBrokerIdentity())); - this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor( + this.endTransactionExecutor = ThreadUtils.newThreadPoolExecutor( this.brokerConfig.getEndTransactionThreadPoolNums(), this.brokerConfig.getEndTransactionThreadPoolNums(), 1000 * 60, @@ -554,7 +553,7 @@ public class BrokerController { this.endTransactionThreadPoolQueue, new ThreadFactoryImpl("EndTransactionThread_", getBrokerIdentity())); - this.loadBalanceExecutor = new BrokerFixedThreadPoolExecutor( + this.loadBalanceExecutor = ThreadUtils.newThreadPoolExecutor( this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(), this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(), 1000 * 60, @@ -562,9 +561,9 @@ public class BrokerController { this.loadBalanceThreadPoolQueue, new ThreadFactoryImpl("LoadBalanceProcessorThread_", getBrokerIdentity())); - this.syncBrokerMemberGroupExecutorService = new ScheduledThreadPoolExecutor(1, + this.syncBrokerMemberGroupExecutorService = ThreadUtils.newScheduledThreadPool(1, new ThreadFactoryImpl("BrokerControllerSyncBrokerScheduledThread", getBrokerIdentity())); - this.brokerHeartbeatExecutorService = new ScheduledThreadPoolExecutor(1, + this.brokerHeartbeatExecutorService = ThreadUtils.newScheduledThreadPool(1, new ThreadFactoryImpl("BrokerControllerHeartbeatScheduledThread", getBrokerIdentity())); this.topicQueueMappingCleanService = new TopicQueueMappingCleanService(this); @@ -828,8 +827,6 @@ public class BrokerController { initializeResources(); - registerProcessor(); - initializeScheduledTasks(); initialTransaction(); @@ -1690,6 +1687,8 @@ public class BrokerController { } } }, 10, 5, TimeUnit.SECONDS); + + registerProcessor(); } protected void scheduleSendHeartbeat() { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java index 98e5f450f..cbb81f632 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java @@ -18,11 +18,11 @@ package org.apache.rocketmq.broker.client; import io.netty.channel.Channel; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.ChannelEventListener; @@ -35,7 +35,7 @@ public class ClientHousekeepingService implements ChannelEventListener { public ClientHousekeepingService(final BrokerController brokerController) { this.brokerController = brokerController; - scheduledExecutorService = new ScheduledThreadPoolExecutor(1, + scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1, new ThreadFactoryImpl("ClientHousekeepingScheduledThread", brokerController.getBrokerIdentity())); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java index 2ce036a0f..d17a2a547 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java @@ -22,7 +22,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.AbstractBrokerRunnable; @@ -37,7 +36,7 @@ public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListen private final BrokerController brokerController; private final int cacheSize = 8096; - private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1, + private final ScheduledExecutorService scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1, ThreadUtils.newGenericThreadFactory("DefaultConsumerIdsChangeListener", true)); private ConcurrentHashMap<String,List<Channel>> consumerChannelMap = new ConcurrentHashMap<>(cacheSize); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java index 37c82e434..a989e6e68 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java @@ -27,10 +27,8 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; @@ -42,6 +40,7 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.protocol.EpochEntry; @@ -107,9 +106,9 @@ public class ReplicasManager { public ReplicasManager(final BrokerController brokerController) { this.brokerController = brokerController; this.brokerOuterAPI = brokerController.getBrokerOuterAPI(); - this.scheduledService = Executors.newScheduledThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ScheduledService_", brokerController.getBrokerIdentity())); - this.executorService = Executors.newFixedThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ExecutorService_", brokerController.getBrokerIdentity())); - this.scanExecutor = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS, + this.scheduledService = ThreadUtils.newScheduledThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ScheduledService_", brokerController.getBrokerIdentity())); + this.executorService = ThreadUtils.newThreadPoolExecutor(3, new ThreadFactoryImpl("ReplicasManager_ExecutorService_", brokerController.getBrokerIdentity())); + this.scanExecutor = ThreadUtils.newThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("ReplicasManager_scan_thread_", brokerController.getBrokerIdentity())); this.haService = (AutoSwitchHAService) brokerController.getMessageStore().getHaService(); this.brokerConfig = brokerController.getBrokerConfig(); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java b/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java index 75023ee1b..e6cb97640 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java @@ -21,12 +21,12 @@ import io.openmessaging.storage.dledger.DLedgerServer; import io.openmessaging.storage.dledger.MemberState; import io.openmessaging.storage.dledger.utils.DLedgerUtils; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.store.DefaultMessageStore; @@ -49,7 +49,7 @@ public class DLedgerRoleChangeHandler implements DLedgerLeaderElector.RoleChange this.messageStore = messageStore; this.dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog(); this.dLegerServer = dLedgerCommitLog.getdLedgerServer(); - this.executorService = Executors.newSingleThreadExecutor( + this.executorService = ThreadUtils.newSingleThreadExecutor( new ThreadFactoryImpl("DLegerRoleChangeHandler_", brokerController.getBrokerIdentity())); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java index 7c350fc1d..6a0817480 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java @@ -24,7 +24,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.broker.BrokerController; @@ -43,6 +42,7 @@ import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.exception.RemotingException; @@ -72,7 +72,7 @@ public class EscapeBridge { public void start() throws Exception { if (brokerController.getBrokerConfig().isEnableSlaveActingMaster() && brokerController.getBrokerConfig().isEnableRemoteEscape()) { final BlockingQueue<Runnable> asyncSenderThreadPoolQueue = new LinkedBlockingQueue<>(50000); - this.defaultAsyncSenderExecutor = new ThreadPoolExecutor( + this.defaultAsyncSenderExecutor = ThreadUtils.newThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1000 * 60, diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java index d3d0bc8ba..3b6e9dc67 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java @@ -18,13 +18,14 @@ package org.apache.rocketmq.broker.latency; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.AbstractBrokerRunnable; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.future.FutureTaskExt; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.netty.RequestTask; @@ -43,7 +44,7 @@ public class BrokerFastFailure { public BrokerFastFailure(final BrokerController brokerController) { this.brokerController = brokerController; - this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, + this.scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1, new ThreadFactoryImpl("BrokerFastFailureScheduledThread", true, brokerController == null ? null : brokerController.getBrokerConfig())); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java deleted file mode 100644 index d2d1143a3..000000000 --- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.broker.latency; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.RunnableFuture; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -public class BrokerFixedThreadPoolExecutor extends ThreadPoolExecutor { - public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, - final TimeUnit unit, - final BlockingQueue<Runnable> workQueue) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); - } - - public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, - final TimeUnit unit, - final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); - } - - public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, - final TimeUnit unit, - final BlockingQueue<Runnable> workQueue, final RejectedExecutionHandler handler) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); - } - - public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, - final TimeUnit unit, - final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory, - final RejectedExecutionHandler handler) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); - } - - @Override - protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) { - return new FutureTaskExt<>(runnable, value); - } -} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/FutureTaskExt.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/FutureTaskExt.java deleted file mode 100644 index f132efaeb..000000000 --- a/broker/src/main/java/org/apache/rocketmq/broker/latency/FutureTaskExt.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.broker.latency; - -import java.util.concurrent.Callable; -import java.util.concurrent.FutureTask; - -public class FutureTaskExt<V> extends FutureTask<V> { - private final Runnable runnable; - - public FutureTaskExt(final Callable<V> callable) { - super(callable); - this.runnable = null; - } - - public FutureTaskExt(final Runnable runnable, final V result) { - super(runnable, result); - this.runnable = runnable; - } - - public Runnable getRunnable() { - return runnable; - } -} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index ae81e8b11..9dfb8127d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -27,9 +27,9 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; -import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullStatus; import org.apache.rocketmq.client.exception.MQBrokerException; @@ -59,6 +59,7 @@ import org.apache.rocketmq.common.namesrv.DefaultTopAddressing; import org.apache.rocketmq.common.namesrv.TopAddressing; import org.apache.rocketmq.common.sysflag.PullSysFlag; import org.apache.rocketmq.common.topic.TopicValidator; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.InvokeCallback; @@ -144,7 +145,7 @@ public class BrokerOuterAPI { private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final RemotingClient remotingClient; private final TopAddressing topAddressing = new DefaultTopAddressing(MixAll.getWSAddr()); - private final BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES, + private final ExecutorService brokerOuterExecutor = ThreadUtils.newThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true)); private final ClientMetadata clientMetadata; private final RpcClient rpcClient; @@ -1092,7 +1093,7 @@ public class BrokerOuterAPI { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public BrokerFixedThreadPoolExecutor getBrokerOuterExecutor() { + public ExecutorService getBrokerOuterExecutor() { return brokerOuterExecutor; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java index 297b14207..0c2e6507b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java @@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -91,7 +90,7 @@ public class ScheduleMessageService extends ConfigManager { public ScheduleMessageService(final BrokerController brokerController) { this.brokerController = brokerController; this.enableAsyncDeliver = brokerController.getMessageStoreConfig().isEnableScheduleAsyncDeliver(); - scheduledPersistService = new ScheduledThreadPoolExecutor(1, + scheduledPersistService = ThreadUtils.newScheduledThreadPool(1, new ThreadFactoryImpl("ScheduleMessageServicePersistThread", true, brokerController.getBrokerConfig())); } @@ -134,9 +133,9 @@ public class ScheduleMessageService extends ConfigManager { public void start() { if (started.compareAndSet(false, true)) { this.load(); - this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_")); + this.deliverExecutorService = ThreadUtils.newScheduledThreadPool(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_")); if (this.enableAsyncDeliver) { - this.handleExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_")); + this.handleExecutorService = ThreadUtils.newScheduledThreadPool(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_")); } for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { Integer level = entry.getKey(); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java index b35564725..11bde5f5f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java @@ -23,7 +23,6 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; @@ -36,6 +35,7 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.exception.RemotingException; @@ -66,7 +66,7 @@ public class TopicRouteInfoManager { } public void start() { - this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("TopicRouteInfoManagerScheduledThread")); + this.scheduledExecutorService = ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("TopicRouteInfoManagerScheduledThread")); this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java index 771d84300..982355d78 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java @@ -19,7 +19,6 @@ package org.apache.rocketmq.broker.transaction; import io.netty.channel.Channel; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.broker.BrokerController; @@ -27,6 +26,7 @@ import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.protocol.header.CheckTransactionStateRequestHeader; @@ -97,7 +97,7 @@ public abstract class AbstractTransactionalMessageCheckListener { public synchronized void initExecutorService() { if (executorService == null) { - executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), + executorService = ThreadUtils.newThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), new ThreadFactoryImpl("Transaction-msg-check-thread", brokerController.getBrokerIdentity()), new CallerRunsPolicy()); } } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java index 75ad961ce..6035a20ac 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java @@ -23,9 +23,9 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import org.apache.rocketmq.broker.latency.FutureTaskExt; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.future.FutureTaskExt; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.remoting.netty.RequestTask; diff --git a/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java b/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java index 5d0f7f9d7..31b547cf1 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.latency; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.common.future.FutureTaskExt; import org.apache.rocketmq.remoting.netty.RequestTask; import org.junit.Test; diff --git a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java index a720a5be3..6f19a9815 100644 --- a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java +++ b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -33,6 +32,7 @@ import com.google.common.collect.Maps; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.rocksdb.ColumnFamilyDescriptor; @@ -82,8 +82,8 @@ public abstract class AbstractRocksDBStorage { private volatile boolean closed; private final Semaphore reloadPermit = new Semaphore(1); - private final ScheduledExecutorService reloadScheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("RocksDBStorageReloadService_")); - private final ThreadPoolExecutor manualCompactionThread = new ThreadPoolExecutor( + private final ScheduledExecutorService reloadScheduler = ThreadUtils.newScheduledThreadPool(1, new ThreadFactoryImpl("RocksDBStorageReloadService_")); + private final ThreadPoolExecutor manualCompactionThread = (ThreadPoolExecutor) ThreadUtils.newThreadPoolExecutor( 1, 1, 1000 * 60, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(1), new ThreadFactoryImpl("RocksDBManualCompactionService_"), diff --git a/common/src/main/java/org/apache/rocketmq/common/thread/FutureTaskExtThreadPoolExecutor.java b/common/src/main/java/org/apache/rocketmq/common/thread/FutureTaskExtThreadPoolExecutor.java index 411da9221..7b68873a9 100644 --- a/common/src/main/java/org/apache/rocketmq/common/thread/FutureTaskExtThreadPoolExecutor.java +++ b/common/src/main/java/org/apache/rocketmq/common/thread/FutureTaskExtThreadPoolExecutor.java @@ -29,7 +29,8 @@ public class FutureTaskExtThreadPoolExecutor extends ThreadPoolExecutor { public FutureTaskExtThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, - BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, + BlockingQueue<Runnable> workQueue, + ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } diff --git a/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java b/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java index 49d97a5d7..1bfabbffe 100644 --- a/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java +++ b/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java @@ -22,12 +22,12 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.Collections; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; @@ -36,7 +36,7 @@ public class ThreadPoolMonitor { private static Logger waterMarkLogger = LoggerFactory.getLogger(ThreadPoolMonitor.class); private static final List<ThreadPoolWrapper> MONITOR_EXECUTOR = new CopyOnWriteArrayList<>(); - private static final ScheduledExecutorService MONITOR_SCHEDULED = Executors.newSingleThreadScheduledExecutor( + private static final ScheduledExecutorService MONITOR_SCHEDULED = ThreadUtils.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("ThreadPoolMonitor-%d").build() ); @@ -81,7 +81,7 @@ public class ThreadPoolMonitor { String name, int queueCapacity, List<ThreadPoolStatusMonitor> threadPoolStatusMonitors) { - ThreadPoolExecutor executor = new FutureTaskExtThreadPoolExecutor( + ThreadPoolExecutor executor = (ThreadPoolExecutor) ThreadUtils.newThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java b/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java index 4b366d4e3..1644c6360 100644 --- a/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java @@ -20,38 +20,94 @@ package org.apache.rocketmq.common.utils; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.thread.FutureTaskExtThreadPoolExecutor; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; public final class ThreadUtils { private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.TOOLS_LOGGER_NAME); - public static ExecutorService newThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, - TimeUnit unit, BlockingQueue<Runnable> workQueue, String processName, boolean isDaemon) { - return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, newThreadFactory(processName, isDaemon)); + public static ExecutorService newSingleThreadExecutor(String processName, boolean isDaemon) { + return ThreadUtils.newSingleThreadExecutor(newThreadFactory(processName, isDaemon)); } - public static ExecutorService newSingleThreadExecutor(String processName, boolean isDaemon) { - return Executors.newSingleThreadExecutor(newThreadFactory(processName, isDaemon)); + public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { + return ThreadUtils.newThreadPoolExecutor(1, threadFactory); + } + + public static ExecutorService newThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { + return ThreadUtils.newThreadPoolExecutor(corePoolSize, corePoolSize, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), + threadFactory); + } + + public static ExecutorService newThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, BlockingQueue<Runnable> workQueue, + String processName, + boolean isDaemon) { + return ThreadUtils.newThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, newThreadFactory(processName, isDaemon)); + } + + public static ExecutorService newThreadPoolExecutor(final int corePoolSize, + final int maximumPoolSize, + final long keepAliveTime, + final TimeUnit unit, + final BlockingQueue<Runnable> workQueue, + final ThreadFactory threadFactory) { + return ThreadUtils.newThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new ThreadPoolExecutor.AbortPolicy()); + } + + public static ExecutorService newThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue<Runnable> workQueue, + ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + return new FutureTaskExtThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } public static ScheduledExecutorService newSingleThreadScheduledExecutor(String processName, boolean isDaemon) { - return Executors.newSingleThreadScheduledExecutor(newThreadFactory(processName, isDaemon)); + return ThreadUtils.newScheduledThreadPool(1, processName, isDaemon); + } + + public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { + return ThreadUtils.newScheduledThreadPool(1, threadFactory); + } + + public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { + return ThreadUtils.newScheduledThreadPool(corePoolSize, Executors.defaultThreadFactory()); } - public static ScheduledExecutorService newFixedThreadScheduledPool(int nThreads, String processName, + public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, String processName, boolean isDaemon) { - return Executors.newScheduledThreadPool(nThreads, newThreadFactory(processName, isDaemon)); + return ThreadUtils.newScheduledThreadPool(corePoolSize, newThreadFactory(processName, isDaemon)); + } + + public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) { + return ThreadUtils.newScheduledThreadPool(corePoolSize, threadFactory, new ThreadPoolExecutor.AbortPolicy()); + } + + public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, + ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory, handler); } public static ThreadFactory newThreadFactory(String processName, boolean isDaemon) { - return newGenericThreadFactory("Remoting-" + processName, isDaemon); + return newGenericThreadFactory("ThreadUtils-" + processName, isDaemon); } public static ThreadFactory newGenericThreadFactory(String processName) { diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java index c6446f058..5b712bc30 100644 --- a/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java +++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java @@ -47,14 +47,12 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class BrokerContainer implements IBrokerContainer { private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1, + private final ScheduledExecutorService scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1, new BasicThreadFactory.Builder() .namingPattern("BrokerContainerScheduledThread") .daemon(true) @@ -143,7 +141,7 @@ public class BrokerContainer implements IBrokerContainer { this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.containerClientHouseKeepingService); this.fastRemotingServer = this.remotingServer.newRemotingServer(this.nettyServerConfig.getListenPort() - 2); - this.brokerContainerExecutor = new ThreadPoolExecutor( + this.brokerContainerExecutor = ThreadUtils.newThreadPoolExecutor( 1, 1, 1000 * 60, diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java index 7c91e70da..3e6b0eba5 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java @@ -25,8 +25,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RunnableFuture; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; @@ -34,8 +32,8 @@ import org.apache.rocketmq.common.ControllerConfig; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.common.future.FutureTaskExt; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy; import org.apache.rocketmq.controller.impl.DLedgerController; import org.apache.rocketmq.controller.impl.heartbeat.DefaultBrokerHeartbeatManager; @@ -93,18 +91,14 @@ public class ControllerManager { public boolean initialize() { this.controllerRequestThreadPoolQueue = new LinkedBlockingQueue<>(this.controllerConfig.getControllerRequestThreadPoolQueueCapacity()); - this.controllerRequestExecutor = new ThreadPoolExecutor( + this.controllerRequestExecutor = ThreadUtils.newThreadPoolExecutor( this.controllerConfig.getControllerThreadPoolNums(), this.controllerConfig.getControllerThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.controllerRequestThreadPoolQueue, - new ThreadFactoryImpl("ControllerRequestExecutorThread_")) { - @Override - protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) { - return new FutureTaskExt<T>(runnable, value); - } - }; + new ThreadFactoryImpl("ControllerRequestExecutorThread_")); + this.notifyService.initialize(); if (StringUtils.isEmpty(this.controllerConfig.getControllerDLegerPeers())) { throw new IllegalArgumentException("Attribute value controllerDLegerPeers of ControllerConfig is null or empty"); diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java index fa91f288e..33e4406e4 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java @@ -32,7 +32,6 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -44,6 +43,7 @@ import org.apache.rocketmq.common.ControllerConfig; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.controller.Controller; import org.apache.rocketmq.controller.elect.ElectPolicy; import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy; @@ -66,11 +66,11 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.body.SyncStateSet; import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader; -import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; -import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader; @@ -136,7 +136,7 @@ public class DLedgerController implements Controller { this.dLedgerServer = new DLedgerServer(dLedgerConfig, nettyServerConfig, nettyClientConfig, channelEventListener); this.dLedgerServer.registerStateMachine(this.statemachine); this.dLedgerServer.getDLedgerLeaderElector().addRoleChangeHandler(this.roleHandler); - this.scanInactiveMasterService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DLedgerController_scanInactiveService_")); + this.scanInactiveMasterService = ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DLedgerController_scanInactiveService_")); this.brokerLifecycleListeners = new ArrayList<>(); } @@ -513,7 +513,7 @@ public class DLedgerController implements Controller { class RoleChangeHandler implements DLedgerLeaderElector.RoleChangeHandler { private final String selfId; - private final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DLedgerControllerRoleChangeHandler_")); + private final ExecutorService executorService = ThreadUtils.newSingleThreadExecutor(new ThreadFactoryImpl("DLedgerControllerRoleChangeHandler_")); private volatile MemberState.Role currentRole = MemberState.Role.FOLLOWER; public RoleChangeHandler(final String selfId) { diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java index 2fbddb9cd..6ebb2c994 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java @@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.ControllerConfig; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.controller.BrokerHeartbeatManager; import org.apache.rocketmq.controller.helper.BrokerLifecycleListener; import org.apache.rocketmq.logging.org.slf4j.Logger; @@ -66,7 +67,7 @@ public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager { @Override public void initialize() { - this.scheduledService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DefaultBrokerHeartbeatManager_scheduledService_")); + this.scheduledService = ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DefaultBrokerHeartbeatManager_scheduledService_")); this.executor = Executors.newFixedThreadPool(2, new ThreadFactoryImpl("DefaultBrokerHeartbeatManager_executorService_")); } diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java index 15c65ebec..be327cffa 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java @@ -20,10 +20,7 @@ import java.util.Collections; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RunnableFuture; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.rocketmq.common.ThreadFactoryImpl; @@ -31,6 +28,7 @@ import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.future.FutureTaskExt; import org.apache.rocketmq.common.namesrv.NamesrvConfig; import org.apache.rocketmq.common.utils.NetworkUtil; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.namesrv.kvconfig.KVConfigManager; @@ -62,10 +60,10 @@ public class NamesrvController { private final NettyServerConfig nettyServerConfig; private final NettyClientConfig nettyClientConfig; - private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1, + private final ScheduledExecutorService scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("NSScheduledThread").daemon(true).build()); - private final ScheduledExecutorService scanExecutorService = new ScheduledThreadPoolExecutor(1, + private final ScheduledExecutorService scanExecutorService = ThreadUtils.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("NSScanScheduledThread").daemon(true).build()); private final KVConfigManager kvConfigManager; @@ -138,20 +136,10 @@ public class NamesrvController { private void initiateThreadExecutors() { this.defaultThreadPoolQueue = new LinkedBlockingQueue<>(this.namesrvConfig.getDefaultThreadPoolQueueCapacity()); - this.defaultExecutor = new ThreadPoolExecutor(this.namesrvConfig.getDefaultThreadPoolNums(), this.namesrvConfig.getDefaultThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.defaultThreadPoolQueue, new ThreadFactoryImpl("RemotingExecutorThread_")) { - @Override - protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) { - return new FutureTaskExt<>(runnable, value); - } - }; + this.defaultExecutor = ThreadUtils.newThreadPoolExecutor(this.namesrvConfig.getDefaultThreadPoolNums(), this.namesrvConfig.getDefaultThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.defaultThreadPoolQueue, new ThreadFactoryImpl("RemotingExecutorThread_")); this.clientRequestThreadPoolQueue = new LinkedBlockingQueue<>(this.namesrvConfig.getClientRequestThreadPoolQueueCapacity()); - this.clientRequestExecutor = new ThreadPoolExecutor(this.namesrvConfig.getClientRequestThreadPoolNums(), this.namesrvConfig.getClientRequestThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.clientRequestThreadPoolQueue, new ThreadFactoryImpl("ClientRequestExecutorThread_")) { - @Override - protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) { - return new FutureTaskExt<>(runnable, value); - } - }; + this.clientRequestExecutor = ThreadUtils.newThreadPoolExecutor(this.namesrvConfig.getClientRequestThreadPoolNums(), this.namesrvConfig.getClientRequestThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.clientRequestThreadPoolQueue, new ThreadFactoryImpl("ClientRequestExecutorThread_")); } private void initiateSslContext() { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java index 14330dd8d..a18cf7600 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java @@ -21,13 +21,13 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.common.ThreadFactoryImpl; -import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.common.utils.StartAndShutdown; +import org.apache.rocketmq.common.utils.ThreadUtils; +import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.ProxyConfig; import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager; @@ -43,7 +43,7 @@ public class GrpcChannelManager implements StartAndShutdown { protected final AtomicLong nonceIdGenerator = new AtomicLong(0); protected final ConcurrentMap<String /* nonce */, ResultFuture> resultNonceFutureMap = new ConcurrentHashMap<>(); - protected final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( + protected final ScheduledExecutorService scheduledExecutorService = ThreadUtils.newSingleThreadScheduledExecutor( new ThreadFactoryImpl("GrpcChannelManager_") ); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java index bcc9edd09..fe07090d5 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java @@ -22,7 +22,6 @@ import io.netty.channel.Channel; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -33,6 +32,7 @@ import org.apache.rocketmq.common.future.FutureTaskExt; import org.apache.rocketmq.common.thread.ThreadPoolMonitor; import org.apache.rocketmq.common.thread.ThreadPoolStatusMonitor; import org.apache.rocketmq.common.utils.StartAndShutdown; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.config.ConfigurationManager; @@ -178,7 +178,7 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu new ThreadPoolHeadSlowTimeMillsMonitor(config.getRemotingWaitTimeMillsInDefaultQueue()) ); - this.timerExecutor = Executors.newSingleThreadScheduledExecutor( + this.timerExecutor = ThreadUtils.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("RemotingServerScheduler-%d").build() ); this.timerExecutor.scheduleAtFixedRate(this::cleanExpireRequest, 10, 10, TimeUnit.SECONDS); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java index d2ddfc352..9786cec55 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.proxy.service; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.broker.client.ClientChannelInfo; @@ -27,23 +26,24 @@ import org.apache.rocketmq.broker.client.ProducerChangeListener; import org.apache.rocketmq.broker.client.ProducerGroupEvent; import org.apache.rocketmq.broker.client.ProducerManager; import org.apache.rocketmq.client.common.NameserverAccessConfig; +import org.apache.rocketmq.client.impl.mqclient.DoNothingClientRemotingProcessor; +import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; -import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.ProxyConfig; import org.apache.rocketmq.proxy.service.admin.AdminService; import org.apache.rocketmq.proxy.service.admin.DefaultAdminService; import org.apache.rocketmq.proxy.service.client.ClusterConsumerManager; +import org.apache.rocketmq.proxy.service.client.ProxyClientRemotingProcessor; import org.apache.rocketmq.proxy.service.message.ClusterMessageService; import org.apache.rocketmq.proxy.service.message.MessageService; import org.apache.rocketmq.proxy.service.metadata.ClusterMetadataService; import org.apache.rocketmq.proxy.service.metadata.MetadataService; -import org.apache.rocketmq.client.impl.mqclient.DoNothingClientRemotingProcessor; -import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory; -import org.apache.rocketmq.proxy.service.client.ProxyClientRemotingProcessor; import org.apache.rocketmq.proxy.service.relay.ClusterProxyRelayService; import org.apache.rocketmq.proxy.service.relay.ProxyRelayService; import org.apache.rocketmq.proxy.service.route.ClusterTopicRouteService; @@ -73,7 +73,7 @@ public class ClusterServiceManager extends AbstractStartAndShutdown implements S ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); NameserverAccessConfig nameserverAccessConfig = new NameserverAccessConfig(proxyConfig.getNamesrvAddr(), proxyConfig.getNamesrvDomain(), proxyConfig.getNamesrvDomainSubgroup()); - this.scheduledExecutorService = Executors.newScheduledThreadPool(3); + this.scheduledExecutorService = ThreadUtils.newScheduledThreadPool(3); this.messagingClientAPIFactory = new MQClientAPIFactory( nameserverAccessConfig, diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java index 4d1ca7b66..59cd92685 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.proxy.service; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.broker.BrokerController; @@ -28,6 +27,7 @@ import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; import org.apache.rocketmq.common.utils.StartAndShutdown; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.ProxyConfig; import org.apache.rocketmq.proxy.service.admin.AdminService; @@ -58,7 +58,7 @@ public class LocalServiceManager extends AbstractStartAndShutdown implements Ser private final MQClientAPIFactory mqClientAPIFactory; private final ChannelManager channelManager; - private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( + private final ScheduledExecutorService scheduledExecutorService = ThreadUtils.newSingleThreadScheduledExecutor( new ThreadFactoryImpl("LocalServiceManagerScheduledThread")); public LocalServiceManager(BrokerController brokerController, RPCHook rpcHook) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java index 69f44344a..207603fe8 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java @@ -24,7 +24,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -42,20 +41,21 @@ import org.apache.rocketmq.common.thread.ThreadPoolMonitor; import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils; import org.apache.rocketmq.common.utils.StartAndShutdown; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; -import org.apache.rocketmq.proxy.common.RenewEvent; import org.apache.rocketmq.proxy.common.MessageReceiptHandle; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.common.ProxyException; import org.apache.rocketmq.proxy.common.ProxyExceptionCode; import org.apache.rocketmq.proxy.common.ReceiptHandleGroup; +import org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey; +import org.apache.rocketmq.proxy.common.RenewEvent; import org.apache.rocketmq.proxy.common.RenewStrategyPolicy; import org.apache.rocketmq.proxy.common.channel.ChannelHelper; import org.apache.rocketmq.proxy.common.utils.ExceptionUtils; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.ProxyConfig; -import org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey; import org.apache.rocketmq.proxy.service.metadata.MetadataService; import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; @@ -68,7 +68,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem protected final StateEventListener<RenewEvent> eventListener; protected final static RetryPolicy RENEW_POLICY = new RenewStrategyPolicy(); protected final ScheduledExecutorService scheduledExecutorService = - Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RenewalScheduledThread_")); + ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RenewalScheduledThread_")); protected final ThreadPoolExecutor renewalWorkerService; public DefaultReceiptHandleManager(MetadataService metadataService, ConsumerManager consumerManager, StateEventListener<RenewEvent> eventListener) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java index caf62a1e0..ccf094c03 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java @@ -19,25 +19,24 @@ package org.apache.rocketmq.proxy.service.route; import com.github.benmanes.caffeine.cache.CacheLoader; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; +import com.google.common.base.Optional; import java.time.Duration; import java.util.List; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; - -import com.google.common.base.Optional; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory; import org.apache.rocketmq.client.latency.MQFaultStrategy; import org.apache.rocketmq.client.latency.Resolver; import org.apache.rocketmq.client.latency.ServiceDetector; -import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.thread.ThreadPoolMonitor; import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.common.Address; @@ -63,7 +62,7 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown { public TopicRouteService(MQClientAPIFactory mqClientAPIFactory) { ProxyConfig config = ConfigurationManager.getProxyConfig(); - this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( + this.scheduledExecutorService = ThreadUtils.newSingleThreadScheduledExecutor( new ThreadFactoryImpl("TopicRouteService_") ); this.cacheRefreshExecutor = ThreadPoolMonitor.createAndMonitor( diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index 8491f4354..64621dd6c 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -61,7 +61,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -71,6 +70,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.ChannelEventListener; @@ -142,7 +142,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactoryImpl("NettyClientPublicExecutor_")); - this.scanExecutor = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS, + this.scanExecutor = ThreadUtils.newThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("NettyClientScan_thread_")); if (eventLoopGroup != null) { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java index e626260c9..aa0d46542 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java @@ -61,6 +61,7 @@ import org.apache.rocketmq.common.constant.HAProxyConstants; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.utils.BinaryUtil; import org.apache.rocketmq.common.utils.NetworkUtil; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.ChannelEventListener; @@ -83,7 +84,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -171,7 +171,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti } private ScheduledExecutorService buildScheduleExecutor() { - return new ScheduledThreadPoolExecutor(1, + return ThreadUtils.newScheduledThreadPool(1, new ThreadFactoryImpl("NettyServerScheduler_", true), new ThreadPoolExecutor.DiscardOldestPolicy()); } diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index f2a54ddf6..02ea47f13 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -48,7 +48,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; @@ -83,6 +82,7 @@ import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.common.utils.CleanupPolicyUtils; import org.apache.rocketmq.common.utils.QueueTypeUtils; import org.apache.rocketmq.common.utils.ServiceProvider; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo; @@ -205,7 +205,7 @@ public class DefaultMessageStore implements MessageStore { private ConcurrentMap<String, TopicConfig> topicConfigTable; private final ScheduledExecutorService scheduledCleanQueueExecutorService = - Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreCleanQueueScheduledThread")); + ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreCleanQueueScheduledThread")); public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager, final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig, final ConcurrentMap<String, TopicConfig> topicConfigTable) throws IOException { @@ -253,7 +253,7 @@ public class DefaultMessageStore implements MessageStore { this.transientStorePool = new TransientStorePool(messageStoreConfig.getTransientStorePoolSize(), messageStoreConfig.getMappedFileSizeCommitLog()); this.scheduledExecutorService = - Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread", getBrokerIdentity())); + ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread", getBrokerIdentity())); this.dispatcherList = new LinkedList<>(); this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue()); @@ -2915,7 +2915,7 @@ public class DefaultMessageStore implements MessageStore { private final ExecutorService batchDispatchRequestExecutor; public MainBatchDispatchRequestService() { - batchDispatchRequestExecutor = new ThreadPoolExecutor( + batchDispatchRequestExecutor = ThreadUtils.newThreadPoolExecutor( DefaultMessageStore.this.getMessageStoreConfig().getBatchDispatchRequestThreadPoolNums(), DefaultMessageStore.this.getMessageStoreConfig().getBatchDispatchRequestThreadPoolNums(), 1000 * 60, diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java index d5393fdca..f20bc3e28 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java @@ -17,10 +17,26 @@ package org.apache.rocketmq.store.ha.autoswitch; - +import java.io.IOException; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; +import java.util.stream.Collectors; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.protocol.EpochEntry; @@ -36,30 +52,12 @@ import org.apache.rocketmq.store.ha.HAClient; import org.apache.rocketmq.store.ha.HAConnection; import org.apache.rocketmq.store.ha.HAConnectionStateNotificationService; -import java.io.IOException; -import java.nio.channels.SocketChannel; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Iterator; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Consumer; -import java.util.stream.Collectors; - /** * SwitchAble ha service, support switch role to master or slave. */ public class AutoSwitchHAService extends DefaultHAService { private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); - private final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("AutoSwitchHAService_Executor_")); + private final ExecutorService executorService = ThreadUtils.newSingleThreadExecutor(new ThreadFactoryImpl("AutoSwitchHAService_Executor_")); private final ConcurrentHashMap<Long/*brokerId*/, Long/*lastCaughtUpTimestamp*/> connectionCaughtUpTimeTable = new ConcurrentHashMap<>(); private final List<Consumer<Set<Long/*brokerId*/>>> syncStateSetChangedListeners = new ArrayList<>(); private final Set<Long/*brokerId*/> syncStateSet = new HashSet<>(); diff --git a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java index b37c90726..639084fa2 100644 --- a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java @@ -16,17 +16,25 @@ */ package org.apache.rocketmq.store.kv; -import java.util.Random; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.attribute.CleanupPolicy; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.utils.CleanupPolicyUtils; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.store.DefaultMessageStore; @@ -35,15 +43,6 @@ import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.config.MessageStoreConfig; -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - public class CompactionStore { public static final String COMPACTION_DIR = "compaction"; @@ -76,7 +75,7 @@ public class CompactionStore { this.positionMgr = new CompactionPositionMgr(compactionPath); this.compactionThreadNum = Math.min(Runtime.getRuntime().availableProcessors(), Math.max(1, config.getCompactionThreadNum())); - this.compactionSchedule = Executors.newScheduledThreadPool(this.compactionThreadNum, + this.compactionSchedule = ThreadUtils.newScheduledThreadPool(this.compactionThreadNum, new ThreadFactoryImpl("compactionSchedule_")); this.offsetMapSize = config.getMaxOffsetMapSize() / compactionThreadNum; diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java index 8d38503b3..d03d15d65 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java @@ -24,7 +24,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.TopicConfig; @@ -34,6 +33,7 @@ import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.common.utils.QueueTypeUtils; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.store.CommitLog; @@ -175,7 +175,7 @@ public class ConsumeQueueStore { } private ExecutorService buildExecutorService(BlockingQueue<Runnable> blockingQueue, String threadNamePrefix) { - return new ThreadPoolExecutor( + return ThreadUtils.newThreadPoolExecutor( this.messageStore.getBrokerConfig().getRecoverThreadPoolNums(), this.messageStore.getBrokerConfig().getRecoverThreadPoolNums(), 1000 * 60, diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java index 2dd3fc5b5..489d7b4fb 100644 --- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java +++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java @@ -17,7 +17,6 @@ package org.apache.rocketmq.store.stats; import java.util.HashMap; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import org.apache.commons.lang3.tuple.Pair; import org.apache.rocketmq.common.BrokerConfig; @@ -32,13 +31,14 @@ import org.apache.rocketmq.common.statistics.StatisticsItemScheduledPrinter; import org.apache.rocketmq.common.statistics.StatisticsItemStateGetter; import org.apache.rocketmq.common.statistics.StatisticsKindMeta; import org.apache.rocketmq.common.statistics.StatisticsManager; +import org.apache.rocketmq.common.stats.MomentStatsItemSet; import org.apache.rocketmq.common.stats.Stats; +import org.apache.rocketmq.common.stats.StatsItem; +import org.apache.rocketmq.common.stats.StatsItemSet; import org.apache.rocketmq.common.topic.TopicValidator; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; -import org.apache.rocketmq.common.stats.MomentStatsItemSet; -import org.apache.rocketmq.common.stats.StatsItem; -import org.apache.rocketmq.common.stats.StatsItemSet; public class BrokerStatsManager { @@ -281,11 +281,11 @@ public class BrokerStatsManager { private void initScheduleService() { this.scheduledExecutorService = - Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("BrokerStatsThread", true, brokerConfig)); + ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("BrokerStatsThread", true, brokerConfig)); this.commercialExecutor = - Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CommercialStatsThread", true, brokerConfig)); + ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CommercialStatsThread", true, brokerConfig)); this.accountExecutor = - Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("AccountStatsThread", true, brokerConfig)); + ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("AccountStatsThread", true, brokerConfig)); } public MomentStatsItemSet getMomentStatsItemSetFallSize() { diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java index 181f7087a..0d50de65a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java @@ -35,7 +35,6 @@ import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -54,6 +53,7 @@ import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.common.topic.TopicValidator; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.store.ConsumeQueue; @@ -174,11 +174,11 @@ public class TimerMessageStore { this.lastBrokerRole = storeConfig.getBrokerRole(); if (messageStore instanceof DefaultMessageStore) { - scheduler = Executors.newSingleThreadScheduledExecutor( + scheduler = ThreadUtils.newSingleThreadScheduledExecutor( new ThreadFactoryImpl("TimerScheduledThread", ((DefaultMessageStore) messageStore).getBrokerIdentity())); } else { - scheduler = Executors.newSingleThreadScheduledExecutor( + scheduler = ThreadUtils.newSingleThreadScheduledExecutor( new ThreadFactoryImpl("TimerScheduledThread")); } diff --git a/test/src/main/java/org/apache/rocketmq/test/util/StatUtil.java b/test/src/main/java/org/apache/rocketmq/test/util/StatUtil.java index f3d105bc6..080b7e385 100644 --- a/test/src/main/java/org/apache/rocketmq/test/util/StatUtil.java +++ b/test/src/main/java/org/apache/rocketmq/test/util/StatUtil.java @@ -28,7 +28,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; - import javax.annotation.Generated; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java index 6dd0e8846..65d586f43 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java @@ -20,10 +20,10 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.utils.ThreadUtils; public class TieredStoreExecutor { @@ -43,20 +43,20 @@ public class TieredStoreExecutor { public static ExecutorService compactIndexFileExecutor; public static void init() { - commonScheduledExecutor = new ScheduledThreadPoolExecutor( + commonScheduledExecutor = ThreadUtils.newScheduledThreadPool( Math.max(4, Runtime.getRuntime().availableProcessors()), new ThreadFactoryImpl("TieredCommonExecutor_")); - commitExecutor = new ScheduledThreadPoolExecutor( + commitExecutor = ThreadUtils.newScheduledThreadPool( Math.max(16, Runtime.getRuntime().availableProcessors() * 4), new ThreadFactoryImpl("TieredCommitExecutor_")); - cleanExpiredFileExecutor = new ScheduledThreadPoolExecutor( + cleanExpiredFileExecutor = ThreadUtils.newScheduledThreadPool( Math.max(4, Runtime.getRuntime().availableProcessors()), new ThreadFactoryImpl("TieredCleanFileExecutor_")); dispatchThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY); - dispatchExecutor = new ThreadPoolExecutor( + dispatchExecutor = ThreadUtils.newThreadPoolExecutor( Math.max(2, Runtime.getRuntime().availableProcessors()), Math.max(16, Runtime.getRuntime().availableProcessors() * 4), 1000 * 60, @@ -66,7 +66,7 @@ public class TieredStoreExecutor { new ThreadPoolExecutor.DiscardOldestPolicy()); fetchDataThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY); - fetchDataExecutor = new ThreadPoolExecutor( + fetchDataExecutor = ThreadUtils.newThreadPoolExecutor( Math.max(16, Runtime.getRuntime().availableProcessors() * 4), Math.max(64, Runtime.getRuntime().availableProcessors() * 8), 1000 * 60, @@ -75,7 +75,7 @@ public class TieredStoreExecutor { new ThreadFactoryImpl("TieredFetchExecutor_")); compactIndexFileThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY); - compactIndexFileExecutor = new ThreadPoolExecutor( + compactIndexFileExecutor = ThreadUtils.newThreadPoolExecutor( 1, 1, 1000 * 60, diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index fa3596d51..1ebff6d8a 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -66,6 +66,7 @@ import org.apache.rocketmq.common.namesrv.NamesrvUtil; import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.common.utils.NetworkUtil; import org.apache.rocketmq.common.BoundaryType; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.RPCHook; @@ -193,7 +194,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { int threadPoolCoreSize = Integer.parseInt(System.getProperty("rocketmq.admin.threadpool.coresize", "20")); - this.threadPoolExecutor = new ThreadPoolExecutor(threadPoolCoreSize, 100, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactoryImpl("DefaultMQAdminExtImpl_")); + this.threadPoolExecutor = (ThreadPoolExecutor) ThreadUtils.newThreadPoolExecutor(threadPoolCoreSize, 100, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactoryImpl("DefaultMQAdminExtImpl_")); break; case RUNNING: -- 2.32.0.windows.2 From dad6b4dadfec7a58e78a6715ec16c2eb6b17ff27 Mon Sep 17 00:00:00 2001 From: Ziyi Tan <ajb459684460@gmail.com> Date: Mon, 11 Sep 2023 14:34:10 +0800 Subject: [PATCH 2/6] [ISSUE #7334] `registerIncrementBrokerData` for single topic update (#7335) Signed-off-by: Ziy1-Tan <ajb459684460@gmail.com> --- .../broker/topic/TopicConfigManager.java | 30 +++++++++++++++---- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index 4e3c1736c..754605438 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -290,7 +290,11 @@ public class TopicConfigManager extends ConfigManager { } if (createNew) { - this.brokerController.registerBrokerAll(false, true, true); + if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { + this.brokerController.registerSingleTopicAll(topicConfig); + } else { + this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); + } } return topicConfig; @@ -394,7 +398,11 @@ public class TopicConfigManager extends ConfigManager { } if (createNew) { - this.brokerController.registerBrokerAll(false, true, true); + if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { + this.brokerController.registerSingleTopicAll(topicConfig); + } else { + this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); + } } return topicConfig; @@ -435,7 +443,11 @@ public class TopicConfigManager extends ConfigManager { } if (createNew) { - this.brokerController.registerBrokerAll(false, true, true); + if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { + this.brokerController.registerSingleTopicAll(topicConfig); + } else { + this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); + } } return topicConfig; @@ -461,7 +473,11 @@ public class TopicConfigManager extends ConfigManager { dataVersion.nextVersion(stateMachineVersion); this.persist(); - this.brokerController.registerBrokerAll(false, true, true); + if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { + this.brokerController.registerSingleTopicAll(topicConfig); + } else { + this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); + } } } @@ -484,7 +500,11 @@ public class TopicConfigManager extends ConfigManager { dataVersion.nextVersion(stateMachineVersion); this.persist(); - this.brokerController.registerBrokerAll(false, true, true); + if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { + this.brokerController.registerSingleTopicAll(topicConfig); + } else { + this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); + } } } -- 2.32.0.windows.2 From 0dbd0772b99f618f757d42cd64542b83e2100e4f Mon Sep 17 00:00:00 2001 From: Ziyi Tan <ajb459684460@gmail.com> Date: Mon, 11 Sep 2023 15:48:07 +0800 Subject: [PATCH 3/6] [ISSUE #7326] Split the request to register to the nameserver (#7325) Signed-off-by: Ziy1-Tan <ajb459684460@gmail.com> --- .../rocketmq/broker/BrokerController.java | 41 +++++++++++-------- .../broker/topic/TopicConfigManager.java | 21 ++++++++++ .../apache/rocketmq/common/BrokerConfig.java | 24 +++++++++++ .../test/route/CreateAndUpdateTopicIT.java | 31 ++++++++++++++ 4 files changed, 99 insertions(+), 18 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 275b64b1a..9e49f636d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -1765,29 +1765,34 @@ public class BrokerController { } public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) { + ConcurrentMap<String, TopicConfig> topicConfigMap = this.getTopicConfigManager().getTopicConfigTable(); + ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>(); - TopicConfigAndMappingSerializeWrapper topicConfigWrapper = new TopicConfigAndMappingSerializeWrapper(); - - topicConfigWrapper.setDataVersion(this.getTopicConfigManager().getDataVersion()); - topicConfigWrapper.setTopicConfigTable(this.getTopicConfigManager().getTopicConfigTable()); - - topicConfigWrapper.setTopicQueueMappingInfoMap(this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream().map( - entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue())) - ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); - - if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) - || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { - ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>(); - for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) { - TopicConfig tmp = + for (TopicConfig topicConfig : topicConfigMap.values()) { + if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) + || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { + topicConfigTable.put(topicConfig.getTopicName(), new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(), - topicConfig.getPerm() & this.brokerConfig.getBrokerPermission(), topicConfig.getTopicSysFlag()); - topicConfigTable.put(topicConfig.getTopicName(), tmp); + topicConfig.getPerm() & getBrokerConfig().getBrokerPermission())); + } else { + topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + } + + if (this.brokerConfig.isEnableSplitRegistration() + && topicConfigTable.size() >= this.brokerConfig.getSplitRegistrationSize()) { + TopicConfigAndMappingSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildSerializeWrapper(topicConfigTable); + doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper); + topicConfigTable.clear(); } - topicConfigWrapper.setTopicConfigTable(topicConfigTable); } - if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(), + Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream() + .map(entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue()))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + TopicConfigAndMappingSerializeWrapper topicConfigWrapper = this.getTopicConfigManager(). + buildSerializeWrapper(topicConfigTable, topicQueueMappingInfoMap); + if (this.brokerConfig.isEnableSplitRegistration() || forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index 754605438..8537929be 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantLock; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; @@ -47,7 +48,9 @@ import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.protocol.DataVersion; import org.apache.rocketmq.remoting.protocol.body.KVTable; +import org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper; import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper; +import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingInfo; import static com.google.common.base.Preconditions.checkNotNull; @@ -609,6 +612,24 @@ public class TopicConfigManager extends ConfigManager { return topicConfigSerializeWrapper; } + public TopicConfigAndMappingSerializeWrapper buildSerializeWrapper(final ConcurrentMap<String, TopicConfig> topicConfigTable) { + return buildSerializeWrapper(topicConfigTable, Maps.newHashMap()); + } + + public TopicConfigAndMappingSerializeWrapper buildSerializeWrapper( + final ConcurrentMap<String, TopicConfig> topicConfigTable, + final Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap + ) { + TopicConfigAndMappingSerializeWrapper topicConfigWrapper = new TopicConfigAndMappingSerializeWrapper(); + topicConfigWrapper.setTopicConfigTable(topicConfigTable); + topicConfigWrapper.setTopicQueueMappingInfoMap(topicQueueMappingInfoMap); + topicConfigWrapper.setDataVersion(this.getDataVersion()); + if (this.brokerController.getBrokerConfig().isEnableSplitRegistration()) { + this.getDataVersion().nextVersion(); + } + return topicConfigWrapper; + } + @Override public String encode() { return encode(false); diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 45d26b29c..0d248c4e1 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -396,6 +396,14 @@ public class BrokerConfig extends BrokerIdentity { private boolean enableMixedMessageType = false; + /** + * This flag and deleteTopicWithBrokerRegistration flag in the NameServer cannot be set to true at the same time, + * otherwise there will be a loss of routing + */ + private boolean enableSplitRegistration = false; + + private int splitRegistrationSize = 800; + public long getMaxPopPollingSize() { return maxPopPollingSize; } @@ -1731,4 +1739,20 @@ public class BrokerConfig extends BrokerIdentity { public void setEnableMixedMessageType(boolean enableMixedMessageType) { this.enableMixedMessageType = enableMixedMessageType; } + + public boolean isEnableSplitRegistration() { + return enableSplitRegistration; + } + + public void setEnableSplitRegistration(boolean enableSplitRegistration) { + this.enableSplitRegistration = enableSplitRegistration; + } + + public int getSplitRegistrationSize() { + return splitRegistrationSize; + } + + public void setSplitRegistrationSize(int splitRegistrationSize) { + this.splitRegistrationSize = splitRegistrationSize; + } } diff --git a/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java index 7e3c7b871..2370e68c0 100644 --- a/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.test.route; +import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; import org.apache.rocketmq.test.base.BaseConf; import org.apache.rocketmq.test.util.MQAdminTestUtils; @@ -111,4 +112,34 @@ public class CreateAndUpdateTopicIT extends BaseConf { brokerController3.getBrokerConfig().setEnableSingleTopicRegister(false); namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(false); } + + @Test + public void testCreateOrUpdateTopic_EnableSplitRegistration() { + brokerController1.getBrokerConfig().setEnableSplitRegistration(true); + brokerController2.getBrokerConfig().setEnableSplitRegistration(true); + brokerController3.getBrokerConfig().setEnableSplitRegistration(true); + + String testTopic = "test-topic-"; + + for (int i = 0; i < 1000; i++) { + TopicConfig topicConfig = new TopicConfig(testTopic + i, 8, 8); + brokerController1.getTopicConfigManager().updateTopicConfig(topicConfig); + brokerController2.getTopicConfigManager().updateTopicConfig(topicConfig); + brokerController3.getTopicConfigManager().updateTopicConfig(topicConfig); + } + + brokerController1.registerBrokerAll(false, true, true); + brokerController2.registerBrokerAll(false, true, true); + brokerController3.registerBrokerAll(false, true, true); + + for (int i = 0; i < 1000; i++) { + TopicRouteData route = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, testTopic + i); + assertThat(route.getBrokerDatas()).hasSize(3); + assertThat(route.getQueueDatas()).hasSize(3); + } + + brokerController1.getBrokerConfig().setEnableSplitRegistration(false); + brokerController2.getBrokerConfig().setEnableSplitRegistration(false); + brokerController3.getBrokerConfig().setEnableSplitRegistration(false); + } } -- 2.32.0.windows.2 From a9e353285cea762b0c5eab567bdfa8e5c8c2d279 Mon Sep 17 00:00:00 2001 From: rongtong <jinrongtong5@163.com> Date: Mon, 11 Sep 2023 15:55:18 +0800 Subject: [PATCH 4/6] Add the configuration of topicQueueLock number to better support different scenarios (#7317) --- .../main/java/org/apache/rocketmq/store/CommitLog.java | 2 +- .../java/org/apache/rocketmq/store/TopicQueueLock.java | 8 ++++++++ .../rocketmq/store/config/MessageStoreConfig.java | 10 ++++++++++ 3 files changed, 19 insertions(+), 1 deletion(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index e6ee3bacc..456bf2b86 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -122,7 +122,7 @@ public class CommitLog implements Swappable { this.flushDiskWatcher = new FlushDiskWatcher(); - this.topicQueueLock = new TopicQueueLock(); + this.topicQueueLock = new TopicQueueLock(messageStore.getMessageStoreConfig().getTopicQueueLockNum()); this.commitLogSize = messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(); } diff --git a/store/src/main/java/org/apache/rocketmq/store/TopicQueueLock.java b/store/src/main/java/org/apache/rocketmq/store/TopicQueueLock.java index a78eeed23..5a131b5c3 100644 --- a/store/src/main/java/org/apache/rocketmq/store/TopicQueueLock.java +++ b/store/src/main/java/org/apache/rocketmq/store/TopicQueueLock.java @@ -34,6 +34,14 @@ public class TopicQueueLock { } } + public TopicQueueLock(int size) { + this.size = size; + this.lockList = new ArrayList<>(size); + for (int i = 0; i < this.size; i++) { + this.lockList.add(new ReentrantLock()); + } + } + public void lock(String topicQueueKey) { Lock lock = this.lockList.get((topicQueueKey.hashCode() & 0x7fffffff) % this.size); lock.lock(); diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index efb728ac0..9fa448043 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -401,6 +401,8 @@ public class MessageStoreConfig { private long memTableFlushInterval = 60 * 60 * 1000L; private boolean enableRocksDBLog = false; + private int topicQueueLockNum = 32; + public boolean isDebugLockEnable() { return debugLockEnable; } @@ -1751,4 +1753,12 @@ public class MessageStoreConfig { public void setEnableRocksDBLog(boolean enableRocksDBLog) { this.enableRocksDBLog = enableRocksDBLog; } + + public int getTopicQueueLockNum() { + return topicQueueLockNum; + } + + public void setTopicQueueLockNum(int topicQueueLockNum) { + this.topicQueueLockNum = topicQueueLockNum; + } } -- 2.32.0.windows.2 From 57f04c95d3a2ba6b91583058a6e4eda209f72d6e Mon Sep 17 00:00:00 2001 From: guyinyou <36399867+guyinyou@users.noreply.github.com> Date: Mon, 11 Sep 2023 18:23:25 +0800 Subject: [PATCH 5/6] [ISSUE #7343] Rollback modifications to registerProcessor Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com> --- .../java/org/apache/rocketmq/broker/BrokerController.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 9e49f636d..13a3feb4e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -827,6 +827,8 @@ public class BrokerController { initializeResources(); + registerProcessor(); + initializeScheduledTasks(); initialTransaction(); @@ -1687,8 +1689,6 @@ public class BrokerController { } } }, 10, 5, TimeUnit.SECONDS); - - registerProcessor(); } protected void scheduleSendHeartbeat() { -- 2.32.0.windows.2 From dad6ad09d13dadc36b6342671c77f619bbb8c522 Mon Sep 17 00:00:00 2001 From: Ao Qiao <qiao_ao@foxmail.com> Date: Tue, 12 Sep 2023 08:28:45 +0800 Subject: [PATCH 6/6] [ISSUE #7340] Abstract Duplicate code into a method in `TopicConfigManager` (#7341) --- .../broker/topic/TopicConfigManager.java | 44 ++++++------------- 1 file changed, 14 insertions(+), 30 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index 8537929be..511d29e12 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -293,11 +293,7 @@ public class TopicConfigManager extends ConfigManager { } if (createNew) { - if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { - this.brokerController.registerSingleTopicAll(topicConfig); - } else { - this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); - } + registerBrokerData(topicConfig); } return topicConfig; @@ -337,11 +333,7 @@ public class TopicConfigManager extends ConfigManager { log.error("createTopicIfAbsent ", e); } if (createNew && register) { - if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { - this.brokerController.registerSingleTopicAll(topicConfig); - } else { - this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); - } + registerBrokerData(topicConfig); } return getTopicConfig(topicConfig.getTopicName()); } @@ -401,11 +393,7 @@ public class TopicConfigManager extends ConfigManager { } if (createNew) { - if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { - this.brokerController.registerSingleTopicAll(topicConfig); - } else { - this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); - } + registerBrokerData(topicConfig); } return topicConfig; @@ -446,11 +434,7 @@ public class TopicConfigManager extends ConfigManager { } if (createNew) { - if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { - this.brokerController.registerSingleTopicAll(topicConfig); - } else { - this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); - } + registerBrokerData(topicConfig); } return topicConfig; @@ -476,11 +460,7 @@ public class TopicConfigManager extends ConfigManager { dataVersion.nextVersion(stateMachineVersion); this.persist(); - if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { - this.brokerController.registerSingleTopicAll(topicConfig); - } else { - this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); - } + registerBrokerData(topicConfig); } } @@ -503,11 +483,7 @@ public class TopicConfigManager extends ConfigManager { dataVersion.nextVersion(stateMachineVersion); this.persist(); - if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { - this.brokerController.registerSingleTopicAll(topicConfig); - } else { - this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); - } + registerBrokerData(topicConfig); } } @@ -699,6 +675,14 @@ public class TopicConfigManager extends ConfigManager { } } + private void registerBrokerData(TopicConfig topicConfig) { + if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { + this.brokerController.registerSingleTopicAll(topicConfig); + } else { + this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); + } + } + public boolean containsTopic(String topic) { return topicConfigTable.containsKey(topic); } -- 2.32.0.windows.2
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.
浙ICP备2022010568号-2