Projects
home:Eustace:branches:Eulaceura:Factory
rocketmq
_service:obs_scm:patch003-backport-feature-refa...
Sign Up
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File _service:obs_scm:patch003-backport-feature-refactor-recipt-processor.patch of Package rocketmq
From d1bcda57b32f7ee033a3cb0067aef781dc12b7f1 Mon Sep 17 00:00:00 2001 From: Zhouxiang Zhan <zhouxzhan@apache.org> Date: Mon, 3 Jul 2023 14:09:21 +0800 Subject: [PATCH] [ISSUE #6974] Feature/refector receipt processor (#6975) * Refector ReceiptHandleProcessor --- .../common/state/StateEventListener.java | 22 + .../rocketmq/proxy/common/RenewEvent.java | 45 ++ .../grpc/v2/DefaultGrpcMessingActivity.java | 12 +- .../grpc/v2/consumer/AckMessageActivity.java | 8 +- .../ChangeInvisibleDurationActivity.java | 6 +- .../v2/consumer/ReceiveMessageActivity.java | 7 +- .../producer/ForwardMessageToDLQActivity.java | 7 +- .../processor/DefaultMessagingProcessor.java | 16 +- .../proxy/processor/MessagingProcessor.java | 5 + .../processor/ReceiptHandleProcessor.java | 292 ++----------- .../service/receipt/ReceiptHandleManager.java | 282 +++++++++++++ .../v2/consumer/AckMessageActivityTest.java | 2 +- .../ChangeInvisibleDurationActivityTest.java | 4 +- .../consumer/ReceiveMessageActivityTest.java | 2 +- .../ForwardMessageToDLQActivityTest.java | 4 +- .../processor/ConsumerProcessorTest.java | 1 - .../receipt/ReceiptHandleManagerTest.java} | 389 ++++-------------- 17 files changed, 499 insertions(+), 605 deletions(-) create mode 100644 common/src/main/java/org/apache/rocketmq/common/state/StateEventListener.java create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java rename proxy/src/test/java/org/apache/rocketmq/proxy/{processor/ReceiptHandleProcessorTest.java => service/receipt/ReceiptHandleManagerTest.java} (63%) diff --git a/common/src/main/java/org/apache/rocketmq/common/state/StateEventListener.java b/common/src/main/java/org/apache/rocketmq/common/state/StateEventListener.java new file mode 100644 index 000000000..aed04dc31 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/state/StateEventListener.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.state; + +public interface StateEventListener<T> { + void fireEvent(T event); +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java new file mode 100644 index 000000000..fdf9833cc --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.common; + +import java.util.concurrent.CompletableFuture; +import org.apache.rocketmq.client.consumer.AckResult; + +public class RenewEvent { + protected MessageReceiptHandle messageReceiptHandle; + protected long renewTime; + protected CompletableFuture<AckResult> future; + + public RenewEvent(MessageReceiptHandle messageReceiptHandle, long renewTime, CompletableFuture<AckResult> future) { + this.messageReceiptHandle = messageReceiptHandle; + this.renewTime = renewTime; + this.future = future; + } + + public MessageReceiptHandle getMessageReceiptHandle() { + return messageReceiptHandle; + } + + public long getRenewTime() { + return renewTime; + } + + public CompletableFuture<AckResult> getFuture() { + return future; + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java index 73b764bc4..091e9086e 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java @@ -55,14 +55,12 @@ import org.apache.rocketmq.proxy.grpc.v2.producer.SendMessageActivity; import org.apache.rocketmq.proxy.grpc.v2.route.RouteActivity; import org.apache.rocketmq.proxy.grpc.v2.transaction.EndTransactionActivity; import org.apache.rocketmq.proxy.processor.MessagingProcessor; -import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor; public class DefaultGrpcMessingActivity extends AbstractStartAndShutdown implements GrpcMessingActivity { private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); protected GrpcClientSettingsManager grpcClientSettingsManager; protected GrpcChannelManager grpcChannelManager; - protected ReceiptHandleProcessor receiptHandleProcessor; protected ReceiveMessageActivity receiveMessageActivity; protected AckMessageActivity ackMessageActivity; protected ChangeInvisibleDurationActivity changeInvisibleDurationActivity; @@ -79,18 +77,16 @@ public class DefaultGrpcMessingActivity extends AbstractStartAndShutdown impleme protected void init(MessagingProcessor messagingProcessor) { this.grpcClientSettingsManager = new GrpcClientSettingsManager(messagingProcessor); this.grpcChannelManager = new GrpcChannelManager(messagingProcessor.getProxyRelayService(), this.grpcClientSettingsManager); - this.receiptHandleProcessor = new ReceiptHandleProcessor(messagingProcessor); - this.receiveMessageActivity = new ReceiveMessageActivity(messagingProcessor, receiptHandleProcessor, grpcClientSettingsManager, grpcChannelManager); - this.ackMessageActivity = new AckMessageActivity(messagingProcessor, receiptHandleProcessor, grpcClientSettingsManager, grpcChannelManager); - this.changeInvisibleDurationActivity = new ChangeInvisibleDurationActivity(messagingProcessor, receiptHandleProcessor, grpcClientSettingsManager, grpcChannelManager); + this.receiveMessageActivity = new ReceiveMessageActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); + this.ackMessageActivity = new AckMessageActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); + this.changeInvisibleDurationActivity = new ChangeInvisibleDurationActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); this.sendMessageActivity = new SendMessageActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); - this.forwardMessageToDLQActivity = new ForwardMessageToDLQActivity(messagingProcessor, receiptHandleProcessor, grpcClientSettingsManager, grpcChannelManager); + this.forwardMessageToDLQActivity = new ForwardMessageToDLQActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); this.endTransactionActivity = new EndTransactionActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); this.routeActivity = new RouteActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); this.clientActivity = new ClientActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); - this.appendStartAndShutdown(this.receiptHandleProcessor); this.appendStartAndShutdown(this.grpcClientSettingsManager); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java index 993f069b9..9a3a77201 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java @@ -37,16 +37,12 @@ import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager; import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter; import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder; import org.apache.rocketmq.proxy.processor.MessagingProcessor; -import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor; public class AckMessageActivity extends AbstractMessingActivity { - protected ReceiptHandleProcessor receiptHandleProcessor; - public AckMessageActivity(MessagingProcessor messagingProcessor, ReceiptHandleProcessor receiptHandleProcessor, - GrpcClientSettingsManager grpcClientSettingsManager, + public AckMessageActivity(MessagingProcessor messagingProcessor, GrpcClientSettingsManager grpcClientSettingsManager, GrpcChannelManager grpcChannelManager) { super(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); - this.receiptHandleProcessor = receiptHandleProcessor; } public CompletableFuture<AckMessageResponse> ackMessage(ProxyContext ctx, AckMessageRequest request) { @@ -98,7 +94,7 @@ public class AckMessageActivity extends AbstractMessingActivity { String handleString = ackMessageEntry.getReceiptHandle(); String group = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup()); - MessageReceiptHandle messageReceiptHandle = receiptHandleProcessor.removeReceiptHandle(ctx, grpcChannelManager.getChannel(ctx.getClientID()), group, ackMessageEntry.getMessageId(), ackMessageEntry.getReceiptHandle()); + MessageReceiptHandle messageReceiptHandle = messagingProcessor.removeReceiptHandle(ctx, grpcChannelManager.getChannel(ctx.getClientID()), group, ackMessageEntry.getMessageId(), ackMessageEntry.getReceiptHandle()); if (messageReceiptHandle != null) { handleString = messageReceiptHandle.getReceiptHandleStr(); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java index 9b7e947e0..02356c497 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java @@ -32,16 +32,12 @@ import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager; import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter; import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder; import org.apache.rocketmq.proxy.processor.MessagingProcessor; -import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor; public class ChangeInvisibleDurationActivity extends AbstractMessingActivity { - protected ReceiptHandleProcessor receiptHandleProcessor; public ChangeInvisibleDurationActivity(MessagingProcessor messagingProcessor, - ReceiptHandleProcessor receiptHandleProcessor, GrpcClientSettingsManager grpcClientSettingsManager, GrpcChannelManager grpcChannelManager) { super(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); - this.receiptHandleProcessor = receiptHandleProcessor; } public CompletableFuture<ChangeInvisibleDurationResponse> changeInvisibleDuration(ProxyContext ctx, @@ -55,7 +51,7 @@ public class ChangeInvisibleDurationActivity extends AbstractMessingActivity { ReceiptHandle receiptHandle = ReceiptHandle.decode(request.getReceiptHandle()); String group = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup()); - MessageReceiptHandle messageReceiptHandle = receiptHandleProcessor.removeReceiptHandle(ctx, grpcChannelManager.getChannel(ctx.getClientID()), group, request.getMessageId(), receiptHandle.getReceiptHandle()); + MessageReceiptHandle messageReceiptHandle = messagingProcessor.removeReceiptHandle(ctx, grpcChannelManager.getChannel(ctx.getClientID()), group, request.getMessageId(), receiptHandle.getReceiptHandle()); if (messageReceiptHandle != null) { receiptHandle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr()); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java index 9830e7dac..a504179a9 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java @@ -40,7 +40,6 @@ import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager; import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter; import org.apache.rocketmq.proxy.processor.MessagingProcessor; import org.apache.rocketmq.proxy.processor.QueueSelector; -import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor; import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue; import org.apache.rocketmq.proxy.service.route.MessageQueueSelector; import org.apache.rocketmq.proxy.service.route.MessageQueueView; @@ -48,13 +47,11 @@ import org.apache.rocketmq.remoting.protocol.filter.FilterAPI; import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; public class ReceiveMessageActivity extends AbstractMessingActivity { - protected ReceiptHandleProcessor receiptHandleProcessor; private static final String ILLEGAL_POLLING_TIME_INTRODUCED_CLIENT_VERSION = "5.0.3"; - public ReceiveMessageActivity(MessagingProcessor messagingProcessor, ReceiptHandleProcessor receiptHandleProcessor, + public ReceiveMessageActivity(MessagingProcessor messagingProcessor, GrpcClientSettingsManager grpcClientSettingsManager, GrpcChannelManager grpcChannelManager) { super(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); - this.receiptHandleProcessor = receiptHandleProcessor; } public void receiveMessage(ProxyContext ctx, ReceiveMessageRequest request, @@ -145,7 +142,7 @@ public class ReceiveMessageActivity extends AbstractMessingActivity { MessageReceiptHandle messageReceiptHandle = new MessageReceiptHandle(group, topic, messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(), messageExt.getQueueOffset(), messageExt.getReconsumeTimes()); - receiptHandleProcessor.addReceiptHandle(ctx, grpcChannelManager.getChannel(ctx.getClientID()), group, messageExt.getMsgId(), messageReceiptHandle); + messagingProcessor.addReceiptHandle(ctx, grpcChannelManager.getChannel(ctx.getClientID()), group, messageExt.getMsgId(), messageReceiptHandle); } } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java index 6b5c5c7e0..f1fc5a143 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java @@ -28,16 +28,13 @@ import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager; import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter; import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder; import org.apache.rocketmq.proxy.processor.MessagingProcessor; -import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; public class ForwardMessageToDLQActivity extends AbstractMessingActivity { - protected ReceiptHandleProcessor receiptHandleProcessor; - public ForwardMessageToDLQActivity(MessagingProcessor messagingProcessor, ReceiptHandleProcessor receiptHandleProcessor, + public ForwardMessageToDLQActivity(MessagingProcessor messagingProcessor, GrpcClientSettingsManager grpcClientSettingsManager, GrpcChannelManager grpcChannelManager) { super(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); - this.receiptHandleProcessor = receiptHandleProcessor; } public CompletableFuture<ForwardMessageToDeadLetterQueueResponse> forwardMessageToDeadLetterQueue(ProxyContext ctx, @@ -48,7 +45,7 @@ public class ForwardMessageToDLQActivity extends AbstractMessingActivity { String group = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup()); String handleString = request.getReceiptHandle(); - MessageReceiptHandle messageReceiptHandle = receiptHandleProcessor.removeReceiptHandle(ctx, grpcChannelManager.getChannel(ctx.getClientID()), group, request.getMessageId(), request.getReceiptHandle()); + MessageReceiptHandle messageReceiptHandle = messagingProcessor.removeReceiptHandle(ctx, grpcChannelManager.getChannel(ctx.getClientID()), group, request.getMessageId(), request.getReceiptHandle()); if (messageReceiptHandle != null) { handleString = messageReceiptHandle.getReceiptHandleStr(); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java index e663ae1ba..1b3f0af4e 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java @@ -22,7 +22,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; - import org.apache.rocketmq.acl.common.AclUtils; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ClientChannelInfo; @@ -41,6 +40,7 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.thread.ThreadPoolMonitor; import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; import org.apache.rocketmq.proxy.common.Address; +import org.apache.rocketmq.proxy.common.MessageReceiptHandle; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.ProxyConfig; @@ -64,6 +64,7 @@ public class DefaultMessagingProcessor extends AbstractStartAndShutdown implemen protected TransactionProcessor transactionProcessor; protected ClientProcessor clientProcessor; protected RequestBrokerProcessor requestBrokerProcessor; + protected ReceiptHandleProcessor receiptHandleProcessor; protected ThreadPoolExecutor producerProcessorExecutor; protected ThreadPoolExecutor consumerProcessorExecutor; @@ -95,6 +96,7 @@ public class DefaultMessagingProcessor extends AbstractStartAndShutdown implemen this.transactionProcessor = new TransactionProcessor(this, serviceManager); this.clientProcessor = new ClientProcessor(this, serviceManager); this.requestBrokerProcessor = new RequestBrokerProcessor(this, serviceManager); + this.receiptHandleProcessor = new ReceiptHandleProcessor(this, serviceManager); this.init(); } @@ -308,4 +310,16 @@ public class DefaultMessagingProcessor extends AbstractStartAndShutdown implemen public MetadataService getMetadataService() { return this.serviceManager.getMetadataService(); } + + @Override + public void addReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, + MessageReceiptHandle messageReceiptHandle) { + receiptHandleProcessor.addReceiptHandle(ctx, channel, group, msgID, messageReceiptHandle); + } + + @Override + public MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, + String receiptHandle) { + return receiptHandleProcessor.removeReceiptHandle(ctx, channel, group, msgID, receiptHandle); + } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java index 263068965..d86be0bd8 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java @@ -34,6 +34,7 @@ import org.apache.rocketmq.common.consumer.ReceiptHandle; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.proxy.common.Address; +import org.apache.rocketmq.proxy.common.MessageReceiptHandle; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.common.utils.StartAndShutdown; import org.apache.rocketmq.proxy.service.metadata.MetadataService; @@ -299,4 +300,8 @@ public interface MessagingProcessor extends StartAndShutdown { ProxyRelayService getProxyRelayService(); MetadataService getMetadataService(); + + void addReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, MessageReceiptHandle messageReceiptHandle); + + MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, String receiptHandle); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java index 88c597e99..9c7e8dea9 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java @@ -19,291 +19,51 @@ package org.apache.rocketmq.proxy.processor; import com.google.common.base.MoreObjects; import com.google.common.base.Objects; -import com.google.common.base.Stopwatch; import io.netty.channel.Channel; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import org.apache.rocketmq.broker.client.ClientChannelInfo; -import org.apache.rocketmq.broker.client.ConsumerGroupEvent; -import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener; -import org.apache.rocketmq.client.consumer.AckResult; -import org.apache.rocketmq.client.consumer.AckStatus; -import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.consumer.ReceiptHandle; -import org.apache.rocketmq.common.thread.ThreadPoolMonitor; -import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils; -import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; -import org.apache.rocketmq.proxy.common.MessageReceiptHandle; -import org.apache.rocketmq.proxy.common.ProxyContext; -import org.apache.rocketmq.proxy.common.ProxyException; -import org.apache.rocketmq.proxy.common.ProxyExceptionCode; -import org.apache.rocketmq.proxy.common.ReceiptHandleGroup; -import org.apache.rocketmq.proxy.common.RenewStrategyPolicy; -import org.apache.rocketmq.common.utils.StartAndShutdown; -import org.apache.rocketmq.proxy.common.channel.ChannelHelper; -import org.apache.rocketmq.proxy.common.utils.ExceptionUtils; -import org.apache.rocketmq.proxy.config.ConfigurationManager; -import org.apache.rocketmq.proxy.config.ProxyConfig; -import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy; -import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.common.state.StateEventListener; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.proxy.common.RenewEvent; +import org.apache.rocketmq.proxy.common.MessageReceiptHandle; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.service.receipt.ReceiptHandleManager; +import org.apache.rocketmq.proxy.service.ServiceManager; -public class ReceiptHandleProcessor extends AbstractStartAndShutdown { +public class ReceiptHandleProcessor extends AbstractProcessor { protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); - protected final ConcurrentMap<ReceiptHandleGroupKey, ReceiptHandleGroup> receiptHandleGroupMap; - protected final ScheduledExecutorService scheduledExecutorService = - Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RenewalScheduledThread_")); - protected ThreadPoolExecutor renewalWorkerService; - protected final MessagingProcessor messagingProcessor; - protected final static RetryPolicy RENEW_POLICY = new RenewStrategyPolicy(); - - public ReceiptHandleProcessor(MessagingProcessor messagingProcessor) { - this.messagingProcessor = messagingProcessor; - this.receiptHandleGroupMap = new ConcurrentHashMap<>(); - ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); - this.renewalWorkerService = ThreadPoolMonitor.createAndMonitor( - proxyConfig.getRenewThreadPoolNums(), - proxyConfig.getRenewMaxThreadPoolNums(), - 1, TimeUnit.MINUTES, - "RenewalWorkerThread", - proxyConfig.getRenewThreadPoolQueueCapacity() - ); - this.init(); - } - - protected void init() { - this.registerConsumerListener(); - this.renewalWorkerService.setRejectedExecutionHandler((r, executor) -> log.warn("add renew task failed. queueSize:{}", executor.getQueue().size())); - this.appendStartAndShutdown(new StartAndShutdown() { - @Override - public void start() throws Exception { - scheduledExecutorService.scheduleWithFixedDelay(() -> scheduleRenewTask(), 0, - ConfigurationManager.getProxyConfig().getRenewSchedulePeriodMillis(), TimeUnit.MILLISECONDS); - } - - @Override - public void shutdown() throws Exception { - scheduledExecutorService.shutdown(); - clearAllHandle(); - } - }); - } - - protected void registerConsumerListener() { - this.messagingProcessor.registerConsumerListener(new ConsumerIdsChangeListener() { - @Override - public void handle(ConsumerGroupEvent event, String group, Object... args) { - if (ConsumerGroupEvent.CLIENT_UNREGISTER.equals(event)) { - if (args == null || args.length < 1) { + protected ReceiptHandleManager receiptHandleManager; + + public ReceiptHandleProcessor(MessagingProcessor messagingProcessor, ServiceManager serviceManager) { + super(messagingProcessor, serviceManager); + StateEventListener<RenewEvent> eventListener = event -> { + ProxyContext context = createContext("RenewMessage"); + MessageReceiptHandle messageReceiptHandle = event.getMessageReceiptHandle(); + ReceiptHandle handle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr()); + messagingProcessor.changeInvisibleTime(context, handle, messageReceiptHandle.getMessageId(), + messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), event.getRenewTime()) + .whenComplete((v, t) -> { + if (t != null) { + event.getFuture().completeExceptionally(t); return; } - if (args[0] instanceof ClientChannelInfo) { - ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0]; - if (ChannelHelper.isRemote(clientChannelInfo.getChannel())) { - // if the channel sync from other proxy is expired, not to clear data of connect to current proxy - return; - } - clearGroup(new ReceiptHandleGroupKey(clientChannelInfo.getChannel(), group)); - log.info("clear handle of this client when client unregister. group:{}, clientChannelInfo:{}", group, clientChannelInfo); - } - } - } - - @Override - public void shutdown() { - - } - }); + event.getFuture().complete(v); + }); + }; + this.receiptHandleManager = new ReceiptHandleManager(serviceManager.getMetadataService(), serviceManager.getConsumerManager(), eventListener); } protected ProxyContext createContext(String actionName) { return ProxyContext.createForInner(this.getClass().getSimpleName() + actionName); } - protected void scheduleRenewTask() { - Stopwatch stopwatch = Stopwatch.createStarted(); - try { - ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); - for (Map.Entry<ReceiptHandleGroupKey, ReceiptHandleGroup> entry : receiptHandleGroupMap.entrySet()) { - ReceiptHandleGroupKey key = entry.getKey(); - if (clientIsOffline(key)) { - clearGroup(key); - continue; - } - - ReceiptHandleGroup group = entry.getValue(); - group.scan((msgID, handleStr, v) -> { - long current = System.currentTimeMillis(); - ReceiptHandle handle = ReceiptHandle.decode(v.getReceiptHandleStr()); - if (handle.getNextVisibleTime() - current > proxyConfig.getRenewAheadTimeMillis()) { - return; - } - renewalWorkerService.submit(() -> renewMessage(group, msgID, handleStr)); - }); - } - } catch (Exception e) { - log.error("unexpect error when schedule renew task", e); - } - - log.debug("scan for renewal done. cost:{}ms", stopwatch.elapsed().toMillis()); - } - - protected void renewMessage(ReceiptHandleGroup group, String msgID, String handleStr) { - try { - group.computeIfPresent(msgID, handleStr, this::startRenewMessage); - } catch (Exception e) { - log.error("error when renew message. msgID:{}, handleStr:{}", msgID, handleStr, e); - } - } - - protected CompletableFuture<MessageReceiptHandle> startRenewMessage(MessageReceiptHandle messageReceiptHandle) { - CompletableFuture<MessageReceiptHandle> resFuture = new CompletableFuture<>(); - ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); - ProxyContext context = createContext("RenewMessage"); - ReceiptHandle handle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr()); - long current = System.currentTimeMillis(); - try { - if (messageReceiptHandle.getRenewRetryTimes() >= proxyConfig.getMaxRenewRetryTimes()) { - log.warn("handle has exceed max renewRetryTimes. handle:{}", messageReceiptHandle); - return CompletableFuture.completedFuture(null); - } - if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) { - CompletableFuture<AckResult> future = - messagingProcessor.changeInvisibleTime(context, handle, messageReceiptHandle.getMessageId(), - messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes())); - future.whenComplete((ackResult, throwable) -> { - if (throwable != null) { - log.error("error when renew. handle:{}", messageReceiptHandle, throwable); - if (renewExceptionNeedRetry(throwable)) { - messageReceiptHandle.incrementAndGetRenewRetryTimes(); - resFuture.complete(messageReceiptHandle); - } else { - resFuture.complete(null); - } - } else if (AckStatus.OK.equals(ackResult.getStatus())) { - messageReceiptHandle.updateReceiptHandle(ackResult.getExtraInfo()); - messageReceiptHandle.resetRenewRetryTimes(); - messageReceiptHandle.incrementRenewTimes(); - resFuture.complete(messageReceiptHandle); - } else { - log.error("renew response is not ok. result:{}, handle:{}", ackResult, messageReceiptHandle); - resFuture.complete(null); - } - }); - } else { - SubscriptionGroupConfig subscriptionGroupConfig = - messagingProcessor.getMetadataService().getSubscriptionGroupConfig(context, messageReceiptHandle.getGroup()); - if (subscriptionGroupConfig == null) { - log.error("group's subscriptionGroupConfig is null when renew. handle: {}", messageReceiptHandle); - return CompletableFuture.completedFuture(null); - } - RetryPolicy retryPolicy = subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy(); - CompletableFuture<AckResult> future = messagingProcessor.changeInvisibleTime(context, - handle, messageReceiptHandle.getMessageId(), messageReceiptHandle.getGroup(), - messageReceiptHandle.getTopic(), retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes())); - future.whenComplete((ackResult, throwable) -> { - if (throwable != null) { - log.error("error when nack in renew. handle:{}", messageReceiptHandle, throwable); - } - resFuture.complete(null); - }); - } - } catch (Throwable t) { - log.error("unexpect error when renew message, stop to renew it. handle:{}", messageReceiptHandle, t); - resFuture.complete(null); - } - return resFuture; - } - - protected boolean renewExceptionNeedRetry(Throwable t) { - t = ExceptionUtils.getRealException(t); - if (t instanceof ProxyException) { - ProxyException proxyException = (ProxyException) t; - if (ProxyExceptionCode.INVALID_BROKER_NAME.equals(proxyException.getCode()) || - ProxyExceptionCode.INVALID_RECEIPT_HANDLE.equals(proxyException.getCode())) { - return false; - } - } - return true; - } - - protected boolean clientIsOffline(ReceiptHandleGroupKey groupKey) { - return this.messagingProcessor.findConsumerChannel(createContext("JudgeClientOnline"), groupKey.group, groupKey.channel) == null; - } - public void addReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, MessageReceiptHandle messageReceiptHandle) { - this.addReceiptHandle(ctx, new ReceiptHandleGroupKey(channel, group), msgID, messageReceiptHandle); - } - - protected void addReceiptHandle(ProxyContext ctx, ReceiptHandleGroupKey key, String msgID, MessageReceiptHandle messageReceiptHandle) { - if (key == null) { - return; - } - ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, key, - k -> new ReceiptHandleGroup()).put(msgID, messageReceiptHandle); + receiptHandleManager.addReceiptHandle(channel, group, msgID, messageReceiptHandle); } public MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, String receiptHandle) { - return this.removeReceiptHandle(ctx, new ReceiptHandleGroupKey(channel, group), msgID, receiptHandle); - } - - protected MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, ReceiptHandleGroupKey key, String msgID, String receiptHandle) { - if (key == null) { - return null; - } - ReceiptHandleGroup handleGroup = receiptHandleGroupMap.get(key); - if (handleGroup == null) { - return null; - } - return handleGroup.remove(msgID, receiptHandle); - } - - protected void clearGroup(ReceiptHandleGroupKey key) { - if (key == null) { - return; - } - ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); - ProxyContext context = createContext("ClearGroup"); - ReceiptHandleGroup handleGroup = receiptHandleGroupMap.remove(key); - if (handleGroup == null) { - return; - } - handleGroup.scan((msgID, handle, v) -> { - try { - handleGroup.computeIfPresent(msgID, handle, messageReceiptHandle -> { - ReceiptHandle receiptHandle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr()); - messagingProcessor.changeInvisibleTime( - context, - receiptHandle, - messageReceiptHandle.getMessageId(), - messageReceiptHandle.getGroup(), - messageReceiptHandle.getTopic(), - proxyConfig.getInvisibleTimeMillisWhenClear() - ); - return CompletableFuture.completedFuture(null); - }); - } catch (Exception e) { - log.error("error when clear handle for group. key:{}", key, e); - } - }); - } - - protected void clearAllHandle() { - log.info("start clear all handle in receiptHandleProcessor"); - Set<ReceiptHandleGroupKey> keySet = receiptHandleGroupMap.keySet(); - for (ReceiptHandleGroupKey key : keySet) { - clearGroup(key); - } - log.info("clear all handle in receiptHandleProcessor done"); + return receiptHandleManager.removeReceiptHandle(channel, group, msgID, receiptHandle); } public static class ReceiptHandleGroupKey { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java new file mode 100644 index 000000000..f3b805624 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.service.receipt; + +import com.google.common.base.Stopwatch; +import io.netty.channel.Channel; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.broker.client.ClientChannelInfo; +import org.apache.rocketmq.broker.client.ConsumerGroupEvent; +import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener; +import org.apache.rocketmq.broker.client.ConsumerManager; +import org.apache.rocketmq.client.consumer.AckResult; +import org.apache.rocketmq.client.consumer.AckStatus; +import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.consumer.ReceiptHandle; +import org.apache.rocketmq.common.state.StateEventListener; +import org.apache.rocketmq.common.thread.ThreadPoolMonitor; +import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; +import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils; +import org.apache.rocketmq.common.utils.StartAndShutdown; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.proxy.common.RenewEvent; +import org.apache.rocketmq.proxy.common.MessageReceiptHandle; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.common.ProxyException; +import org.apache.rocketmq.proxy.common.ProxyExceptionCode; +import org.apache.rocketmq.proxy.common.ReceiptHandleGroup; +import org.apache.rocketmq.proxy.common.RenewStrategyPolicy; +import org.apache.rocketmq.proxy.common.channel.ChannelHelper; +import org.apache.rocketmq.proxy.common.utils.ExceptionUtils; +import org.apache.rocketmq.proxy.config.ConfigurationManager; +import org.apache.rocketmq.proxy.config.ProxyConfig; +import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor; +import org.apache.rocketmq.proxy.service.metadata.MetadataService; +import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; + +public class ReceiptHandleManager extends AbstractStartAndShutdown { + protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); + protected final MetadataService metadataService; + protected final ConsumerManager consumerManager; + protected final ConcurrentMap<ReceiptHandleProcessor.ReceiptHandleGroupKey, ReceiptHandleGroup> receiptHandleGroupMap; + protected final StateEventListener<RenewEvent> eventListener; + protected final static RetryPolicy RENEW_POLICY = new RenewStrategyPolicy(); + protected final ScheduledExecutorService scheduledExecutorService = + Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RenewalScheduledThread_")); + protected final ThreadPoolExecutor renewalWorkerService; + + public ReceiptHandleManager(MetadataService metadataService, ConsumerManager consumerManager, StateEventListener<RenewEvent> eventListener) { + this.metadataService = metadataService; + this.consumerManager = consumerManager; + this.eventListener = eventListener; + ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); + this.renewalWorkerService = ThreadPoolMonitor.createAndMonitor( + proxyConfig.getRenewThreadPoolNums(), + proxyConfig.getRenewMaxThreadPoolNums(), + 1, TimeUnit.MINUTES, + "RenewalWorkerThread", + proxyConfig.getRenewThreadPoolQueueCapacity() + ); + consumerManager.appendConsumerIdsChangeListener(new ConsumerIdsChangeListener() { + @Override + public void handle(ConsumerGroupEvent event, String group, Object... args) { + if (ConsumerGroupEvent.CLIENT_UNREGISTER.equals(event)) { + if (args == null || args.length < 1) { + return; + } + if (args[0] instanceof ClientChannelInfo) { + ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0]; + if (ChannelHelper.isRemote(clientChannelInfo.getChannel())) { + // if the channel sync from other proxy is expired, not to clear data of connect to current proxy + return; + } + clearGroup(new ReceiptHandleProcessor.ReceiptHandleGroupKey(clientChannelInfo.getChannel(), group)); + log.info("clear handle of this client when client unregister. group:{}, clientChannelInfo:{}", group, clientChannelInfo); + } + } + } + + @Override + public void shutdown() { + + } + }); + this.receiptHandleGroupMap = new ConcurrentHashMap<>(); + this.renewalWorkerService.setRejectedExecutionHandler((r, executor) -> log.warn("add renew task failed. queueSize:{}", executor.getQueue().size())); + this.appendStartAndShutdown(new StartAndShutdown() { + @Override + public void start() throws Exception { + scheduledExecutorService.scheduleWithFixedDelay(() -> scheduleRenewTask(), 0, + ConfigurationManager.getProxyConfig().getRenewSchedulePeriodMillis(), TimeUnit.MILLISECONDS); + } + + @Override + public void shutdown() throws Exception { + scheduledExecutorService.shutdown(); + clearAllHandle(); + } + }); + } + + public void addReceiptHandle(Channel channel, String group, String msgID, MessageReceiptHandle messageReceiptHandle) { + ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group), + k -> new ReceiptHandleGroup()).put(msgID, messageReceiptHandle); + } + + public MessageReceiptHandle removeReceiptHandle(Channel channel, String group, String msgID, String receiptHandle) { + ReceiptHandleGroup handleGroup = receiptHandleGroupMap.get(new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group)); + if (handleGroup == null) { + return null; + } + return handleGroup.remove(msgID, receiptHandle); + } + + protected boolean clientIsOffline(ReceiptHandleProcessor.ReceiptHandleGroupKey groupKey) { + return this.consumerManager.findChannel(groupKey.getGroup(), groupKey.getChannel()) == null; + } + + public void scheduleRenewTask() { + Stopwatch stopwatch = Stopwatch.createStarted(); + try { + ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); + for (Map.Entry<ReceiptHandleProcessor.ReceiptHandleGroupKey, ReceiptHandleGroup> entry : receiptHandleGroupMap.entrySet()) { + ReceiptHandleProcessor.ReceiptHandleGroupKey key = entry.getKey(); + if (clientIsOffline(key)) { + clearGroup(key); + continue; + } + + ReceiptHandleGroup group = entry.getValue(); + group.scan((msgID, handleStr, v) -> { + long current = System.currentTimeMillis(); + ReceiptHandle handle = ReceiptHandle.decode(v.getReceiptHandleStr()); + if (handle.getNextVisibleTime() - current > proxyConfig.getRenewAheadTimeMillis()) { + return; + } + renewalWorkerService.submit(() -> renewMessage(group, msgID, handleStr)); + }); + } + } catch (Exception e) { + log.error("unexpect error when schedule renew task", e); + } + + log.debug("scan for renewal done. cost:{}ms", stopwatch.elapsed().toMillis()); + } + + protected void renewMessage(ReceiptHandleGroup group, String msgID, String handleStr) { + try { + group.computeIfPresent(msgID, handleStr, this::startRenewMessage); + } catch (Exception e) { + log.error("error when renew message. msgID:{}, handleStr:{}", msgID, handleStr, e); + } + } + + protected CompletableFuture<MessageReceiptHandle> startRenewMessage(MessageReceiptHandle messageReceiptHandle) { + CompletableFuture<MessageReceiptHandle> resFuture = new CompletableFuture<>(); + ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); + long current = System.currentTimeMillis(); + try { + if (messageReceiptHandle.getRenewRetryTimes() >= proxyConfig.getMaxRenewRetryTimes()) { + log.warn("handle has exceed max renewRetryTimes. handle:{}", messageReceiptHandle); + return CompletableFuture.completedFuture(null); + } + if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) { + CompletableFuture<AckResult> future = new CompletableFuture<>(); + eventListener.fireEvent(new RenewEvent(messageReceiptHandle, RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), future)); + future.whenComplete((ackResult, throwable) -> { + if (throwable != null) { + log.error("error when renew. handle:{}", messageReceiptHandle, throwable); + if (renewExceptionNeedRetry(throwable)) { + messageReceiptHandle.incrementAndGetRenewRetryTimes(); + resFuture.complete(messageReceiptHandle); + } else { + resFuture.complete(null); + } + } else if (AckStatus.OK.equals(ackResult.getStatus())) { + messageReceiptHandle.updateReceiptHandle(ackResult.getExtraInfo()); + messageReceiptHandle.resetRenewRetryTimes(); + messageReceiptHandle.incrementRenewTimes(); + resFuture.complete(messageReceiptHandle); + } else { + log.error("renew response is not ok. result:{}, handle:{}", ackResult, messageReceiptHandle); + resFuture.complete(null); + } + }); + } else { + ProxyContext context = createContext("RenewMessage"); + SubscriptionGroupConfig subscriptionGroupConfig = + metadataService.getSubscriptionGroupConfig(context, messageReceiptHandle.getGroup()); + if (subscriptionGroupConfig == null) { + log.error("group's subscriptionGroupConfig is null when renew. handle: {}", messageReceiptHandle); + return CompletableFuture.completedFuture(null); + } + RetryPolicy retryPolicy = subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy(); + CompletableFuture<AckResult> future = new CompletableFuture<>(); + eventListener.fireEvent(new RenewEvent(messageReceiptHandle, retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), future)); + future.whenComplete((ackResult, throwable) -> { + if (throwable != null) { + log.error("error when nack in renew. handle:{}", messageReceiptHandle, throwable); + } + resFuture.complete(null); + }); + } + } catch (Throwable t) { + log.error("unexpect error when renew message, stop to renew it. handle:{}", messageReceiptHandle, t); + resFuture.complete(null); + } + return resFuture; + } + + protected void clearGroup(ReceiptHandleProcessor.ReceiptHandleGroupKey key) { + if (key == null) { + return; + } + ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); + ReceiptHandleGroup handleGroup = receiptHandleGroupMap.remove(key); + if (handleGroup == null) { + return; + } + handleGroup.scan((msgID, handle, v) -> { + try { + handleGroup.computeIfPresent(msgID, handle, messageReceiptHandle -> { + CompletableFuture<AckResult> future = new CompletableFuture<>(); + eventListener.fireEvent(new RenewEvent(messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), future)); + return CompletableFuture.completedFuture(null); + }); + } catch (Exception e) { + log.error("error when clear handle for group. key:{}", key, e); + } + }); + } + + public void clearAllHandle() { + log.info("start clear all handle in receiptHandleProcessor"); + Set<ReceiptHandleProcessor.ReceiptHandleGroupKey> keySet = receiptHandleGroupMap.keySet(); + for (ReceiptHandleProcessor.ReceiptHandleGroupKey key : keySet) { + clearGroup(key); + } + log.info("clear all handle in receiptHandleProcessor done"); + } + + protected boolean renewExceptionNeedRetry(Throwable t) { + t = ExceptionUtils.getRealException(t); + if (t instanceof ProxyException) { + ProxyException proxyException = (ProxyException) t; + if (ProxyExceptionCode.INVALID_BROKER_NAME.equals(proxyException.getCode()) || + ProxyExceptionCode.INVALID_RECEIPT_HANDLE.equals(proxyException.getCode())) { + return false; + } + } + return true; + } + + protected ProxyContext createContext(String actionName) { + return ProxyContext.createForInner(this.getClass().getSimpleName() + actionName); + } +} diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivityTest.java index 4df834bb6..49fdfc6a8 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivityTest.java @@ -47,7 +47,7 @@ public class AckMessageActivityTest extends BaseActivityTest { @Before public void before() throws Throwable { super.before(); - this.ackMessageActivity = new AckMessageActivity(messagingProcessor, receiptHandleProcessor, grpcClientSettingsManager, grpcChannelManager); + this.ackMessageActivity = new AckMessageActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); } @Test diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java index fdd052da7..2de9a066b 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java @@ -49,7 +49,7 @@ public class ChangeInvisibleDurationActivityTest extends BaseActivityTest { @Before public void before() throws Throwable { super.before(); - this.changeInvisibleDurationActivity = new ChangeInvisibleDurationActivity(messagingProcessor, receiptHandleProcessor, + this.changeInvisibleDurationActivity = new ChangeInvisibleDurationActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); } @@ -92,7 +92,7 @@ public class ChangeInvisibleDurationActivityTest extends BaseActivityTest { when(this.messagingProcessor.changeInvisibleTime( any(), receiptHandleCaptor.capture(), anyString(), anyString(), anyString(), invisibleTimeArgumentCaptor.capture() )).thenReturn(CompletableFuture.completedFuture(ackResult)); - when(receiptHandleProcessor.removeReceiptHandle(any(), any(), anyString(), anyString(), anyString())) + when(messagingProcessor.removeReceiptHandle(any(), any(), anyString(), anyString(), anyString())) .thenReturn(new MessageReceiptHandle("group", "topic", 0, savedHandleStr, "msgId", 0, 0)); ChangeInvisibleDurationResponse response = this.changeInvisibleDurationActivity.changeInvisibleDuration( diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java index 535af838c..2e562504a 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java @@ -74,7 +74,7 @@ public class ReceiveMessageActivityTest extends BaseActivityTest { public void before() throws Throwable { super.before(); ConfigurationManager.getProxyConfig().setGrpcClientConsumerMinLongPollingTimeoutMillis(0); - this.receiveMessageActivity = new ReceiveMessageActivity(messagingProcessor, receiptHandleProcessor, + this.receiveMessageActivity = new ReceiveMessageActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java index ec620340c..87824e5b4 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java @@ -44,7 +44,7 @@ public class ForwardMessageToDLQActivityTest extends BaseActivityTest { @Before public void before() throws Throwable { super.before(); - this.forwardMessageToDLQActivity = new ForwardMessageToDLQActivity(messagingProcessor,receiptHandleProcessor, grpcClientSettingsManager, grpcChannelManager); + this.forwardMessageToDLQActivity = new ForwardMessageToDLQActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); } @Test @@ -75,7 +75,7 @@ public class ForwardMessageToDLQActivityTest extends BaseActivityTest { .thenReturn(CompletableFuture.completedFuture(RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, ""))); String savedHandleStr = buildReceiptHandle("topic", System.currentTimeMillis(),3000); - when(receiptHandleProcessor.removeReceiptHandle(any(), any(), anyString(), anyString(), anyString())) + when(messagingProcessor.removeReceiptHandle(any(), any(), anyString(), anyString(), anyString())) .thenReturn(new MessageReceiptHandle("group", "topic", 0, savedHandleStr, "msgId", 0, 0)); ForwardMessageToDeadLetterQueueResponse response = this.forwardMessageToDLQActivity.forwardMessageToDeadLetterQueue( diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java index bfa2cc3e6..717e86fc0 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java @@ -73,7 +73,6 @@ public class ConsumerProcessorTest extends BaseProcessorTest { @Before public void before() throws Throwable { super.before(); - ReceiptHandleProcessor receiptHandleProcessor = new ReceiptHandleProcessor(messagingProcessor); this.consumerProcessor = new ConsumerProcessor(messagingProcessor, serviceManager, Executors.newCachedThreadPool()); } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManagerTest.java similarity index 63% rename from proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java rename to proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManagerTest.java index c76f40f92..877c9fd6f 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManagerTest.java @@ -15,21 +15,10 @@ * limitations under the License. */ -package org.apache.rocketmq.proxy.processor; +package org.apache.rocketmq.proxy.service.receipt; -import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; -import io.netty.channel.ChannelConfig; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelId; -import io.netty.channel.ChannelMetadata; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelProgressivePromise; -import io.netty.channel.ChannelPromise; -import io.netty.channel.EventLoop; -import io.netty.util.Attribute; -import io.netty.util.AttributeKey; -import java.net.SocketAddress; +import io.netty.channel.local.LocalChannel; import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -38,26 +27,34 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerGroupEvent; import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener; +import org.apache.rocketmq.broker.client.ConsumerManager; import org.apache.rocketmq.client.consumer.AckResult; import org.apache.rocketmq.client.consumer.AckStatus; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ReceiptHandle; import org.apache.rocketmq.common.message.MessageClientIDSetter; +import org.apache.rocketmq.common.state.StateEventListener; +import org.apache.rocketmq.proxy.common.RenewEvent; import org.apache.rocketmq.proxy.common.ContextVariable; import org.apache.rocketmq.proxy.common.MessageReceiptHandle; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.common.ProxyException; -import org.apache.rocketmq.proxy.common.RenewStrategyPolicy; import org.apache.rocketmq.proxy.common.ProxyExceptionCode; import org.apache.rocketmq.proxy.common.ReceiptHandleGroup; +import org.apache.rocketmq.proxy.common.RenewStrategyPolicy; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.ProxyConfig; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; +import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor; +import org.apache.rocketmq.proxy.service.BaseServiceTest; +import org.apache.rocketmq.proxy.service.metadata.MetadataService; import org.apache.rocketmq.remoting.protocol.LanguageCode; import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.stubbing.Answer; @@ -65,8 +62,14 @@ import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -public class ReceiptHandleProcessorTest extends BaseProcessorTest { - private ReceiptHandleProcessor receiptHandleProcessor; +public class ReceiptHandleManagerTest extends BaseServiceTest { + private ReceiptHandleManager receiptHandleManager; + @Mock + protected MessagingProcessor messagingProcessor; + @Mock + protected MetadataService metadataService; + @Mock + protected ConsumerManager consumerManager; private static final ProxyContext PROXY_CONTEXT = ProxyContext.create(); private static final String GROUP = "group"; @@ -84,6 +87,22 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { @Before public void setup() { + receiptHandleManager = new ReceiptHandleManager(metadataService, consumerManager, new StateEventListener<RenewEvent>() { + @Override + public void fireEvent(RenewEvent event) { + MessageReceiptHandle messageReceiptHandle = event.getMessageReceiptHandle(); + ReceiptHandle handle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr()); + messagingProcessor.changeInvisibleTime(PROXY_CONTEXT, handle, messageReceiptHandle.getMessageId(), + messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), event.getRenewTime()) + .whenComplete((v, t) -> { + if (t != null) { + event.getFuture().completeExceptionally(t); + return; + } + event.getFuture().complete(v); + }); + } + }); ProxyConfig config = ConfigurationManager.getProxyConfig(); receiptHandle = ReceiptHandle.builder() .startOffset(0L) @@ -97,20 +116,19 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { .commitLogOffset(0L) .build().encode(); PROXY_CONTEXT.withVal(ContextVariable.CLIENT_ID, "channel-id"); - PROXY_CONTEXT.withVal(ContextVariable.CHANNEL, new MockChannel()); - receiptHandleProcessor = new ReceiptHandleProcessor(messagingProcessor); - Mockito.doNothing().when(messagingProcessor).registerConsumerListener(Mockito.any(ConsumerIdsChangeListener.class)); + PROXY_CONTEXT.withVal(ContextVariable.CHANNEL, new LocalChannel()); + Mockito.doNothing().when(consumerManager).appendConsumerIdsChangeListener(Mockito.any(ConsumerIdsChangeListener.class)); messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, receiptHandle, MESSAGE_ID, OFFSET, RECONSUME_TIMES); } @Test public void testAddReceiptHandle() { - Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); - receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + Channel channel = new LocalChannel(); + receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig()); - Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); - receiptHandleProcessor.scheduleRenewTask(); + Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); + receiptHandleManager.scheduleRenewTask(); Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1)) .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills())); @@ -134,12 +152,12 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { .build().encode(); MessageReceiptHandle messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, receiptHandle, MESSAGE_ID, OFFSET, RECONSUME_TIMES); - receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); } - receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig()); - Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); - receiptHandleProcessor.scheduleRenewTask(); + Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); + receiptHandleManager.scheduleRenewTask(); ArgumentCaptor<ReceiptHandle> handleArgumentCaptor = ArgumentCaptor.forClass(ReceiptHandle.class); Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1)) .changeInvisibleTime(Mockito.any(ProxyContext.class), handleArgumentCaptor.capture(), Mockito.eq(MESSAGE_ID), @@ -152,10 +170,10 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { public void testRenewReceiptHandle() { ProxyConfig config = ConfigurationManager.getProxyConfig(); Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); - receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); - Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); + Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); long newInvisibleTime = 18000L; ReceiptHandle newReceiptHandleClass = ReceiptHandle.builder() @@ -179,27 +197,26 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { ackResult.setExtraInfo(newReceiptHandle); Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), - Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.get())))) + Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.get())))) .thenReturn(CompletableFuture.completedFuture(ackResult)); - receiptHandleProcessor.scheduleRenewTask(); + receiptHandleManager.scheduleRenewTask(); Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1)) .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.argThat(r -> r.getInvisibleTime() == INVISIBLE_TIME), Mockito.eq(MESSAGE_ID), Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.get()))); - receiptHandleProcessor.scheduleRenewTask(); + receiptHandleManager.scheduleRenewTask(); Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1)) .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.argThat(r -> r.getInvisibleTime() == newInvisibleTime), Mockito.eq(MESSAGE_ID), Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.incrementAndGet()))); - receiptHandleProcessor.scheduleRenewTask(); + receiptHandleManager.scheduleRenewTask(); } @Test public void testRenewExceedMaxRenewTimes() { - ProxyConfig config = ConfigurationManager.getProxyConfig(); Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); - Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); - receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); + receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>(); ackResultFuture.completeExceptionally(new MQClientException(0, "error")); @@ -207,13 +224,13 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { RetryPolicy retryPolicy = new RenewStrategyPolicy(); Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), - Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(messageReceiptHandle.getRenewTimes())))) + Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(messageReceiptHandle.getRenewTimes())))) .thenReturn(ackResultFuture); await().atMost(Duration.ofSeconds(1)).until(() -> { - receiptHandleProcessor.scheduleRenewTask(); + receiptHandleManager.scheduleRenewTask(); try { - ReceiptHandleGroup receiptHandleGroup = receiptHandleProcessor.receiptHandleGroupMap.values().stream().findFirst().get(); + ReceiptHandleGroup receiptHandleGroup = receiptHandleManager.receiptHandleGroupMap.values().stream().findFirst().get(); return receiptHandleGroup.isEmpty(); } catch (Exception e) { return false; @@ -228,19 +245,19 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { @Test public void testRenewWithInvalidHandle() { Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); - Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); - receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); + receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>(); ackResultFuture.completeExceptionally(new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "error")); Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), - Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()))) + Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()))) .thenReturn(ackResultFuture); await().atMost(Duration.ofSeconds(1)).until(() -> { - receiptHandleProcessor.scheduleRenewTask(); + receiptHandleManager.scheduleRenewTask(); try { - ReceiptHandleGroup receiptHandleGroup = receiptHandleProcessor.receiptHandleGroupMap.values().stream().findFirst().get(); + ReceiptHandleGroup receiptHandleGroup = receiptHandleManager.receiptHandleGroupMap.values().stream().findFirst().get(); return receiptHandleGroup.isEmpty(); } catch (Exception e) { return false; @@ -252,8 +269,8 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { public void testRenewWithErrorThenOK() { ProxyConfig config = ConfigurationManager.getProxyConfig(); Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); - Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); - receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); + receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); AtomicInteger count = new AtomicInteger(0); List<CompletableFuture<AckResult>> futureList = new ArrayList<>(); @@ -297,13 +314,13 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { Mockito.doAnswer((Answer<CompletableFuture<AckResult>>) mock -> { return futureList.get(count.getAndIncrement()); }).when(messagingProcessor).changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), - Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement()))); + Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement()))); } await().pollDelay(Duration.ZERO).pollInterval(Duration.ofMillis(10)).atMost(Duration.ofSeconds(10)).until(() -> { - receiptHandleProcessor.scheduleRenewTask(); + receiptHandleManager.scheduleRenewTask(); try { - ReceiptHandleGroup receiptHandleGroup = receiptHandleProcessor.receiptHandleGroupMap.values().stream().findFirst().get(); + ReceiptHandleGroup receiptHandleGroup = receiptHandleManager.receiptHandleGroupMap.values().stream().findFirst().get(); return receiptHandleGroup.isEmpty(); } catch (Exception e) { return false; @@ -331,19 +348,19 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET, RECONSUME_TIMES); Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); - receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); - Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); + receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); + Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyLong())) .thenReturn(CompletableFuture.completedFuture(new AckResult())); - receiptHandleProcessor.scheduleRenewTask(); + receiptHandleManager.scheduleRenewTask(); Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1)) .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(groupConfig.getGroupRetryPolicy().getRetryPolicy().nextDelayDuration(RECONSUME_TIMES))); await().atMost(Duration.ofSeconds(1)).untilAsserted(() -> { - ReceiptHandleGroup receiptHandleGroup = receiptHandleProcessor.receiptHandleGroupMap.values().stream().findFirst().get(); + ReceiptHandleGroup receiptHandleGroup = receiptHandleManager.receiptHandleGroupMap.values().stream().findFirst().get(); assertTrue(receiptHandleGroup.isEmpty()); }); } @@ -365,15 +382,15 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET, RECONSUME_TIMES); Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); - receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); - Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); + receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); + Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(null); Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyLong())) .thenReturn(CompletableFuture.completedFuture(new AckResult())); - receiptHandleProcessor.scheduleRenewTask(); + receiptHandleManager.scheduleRenewTask(); await().atMost(Duration.ofSeconds(1)).until(() -> { try { - ReceiptHandleGroup receiptHandleGroup = receiptHandleProcessor.receiptHandleGroupMap.values().stream().findFirst().get(); + ReceiptHandleGroup receiptHandleGroup = receiptHandleManager.receiptHandleGroupMap.values().stream().findFirst().get(); return receiptHandleGroup.isEmpty(); } catch (Exception e) { return false; @@ -401,11 +418,11 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET, RECONSUME_TIMES); Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); - receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); - Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); - receiptHandleProcessor.scheduleRenewTask(); + Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); + receiptHandleManager.scheduleRenewTask(); Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(0)) .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyLong()); @@ -414,11 +431,11 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { @Test public void testRemoveReceiptHandle() { Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); - receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); - receiptHandleProcessor.removeReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, receiptHandle); + receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.removeReceiptHandle(channel, GROUP, MSG_ID, receiptHandle); SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); - receiptHandleProcessor.scheduleRenewTask(); + receiptHandleManager.scheduleRenewTask(); Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(0)) .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyLong()); @@ -427,11 +444,11 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { @Test public void testClearGroup() { Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); - receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); - receiptHandleProcessor.clearGroup(new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, GROUP)); + receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.clearGroup(new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, GROUP)); SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); - receiptHandleProcessor.scheduleRenewTask(); + receiptHandleManager.scheduleRenewTask(); Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1)) .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getInvisibleTimeMillisWhenClear())); @@ -440,242 +457,10 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { @Test public void testClientOffline() { ArgumentCaptor<ConsumerIdsChangeListener> listenerArgumentCaptor = ArgumentCaptor.forClass(ConsumerIdsChangeListener.class); - Mockito.verify(messagingProcessor, Mockito.times(1)).registerConsumerListener(listenerArgumentCaptor.capture()); + Mockito.verify(consumerManager, Mockito.times(1)).appendConsumerIdsChangeListener(listenerArgumentCaptor.capture()); Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); - receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); listenerArgumentCaptor.getValue().handle(ConsumerGroupEvent.CLIENT_UNREGISTER, GROUP, new ClientChannelInfo(channel, "", LanguageCode.JAVA, 0)); - assertTrue(receiptHandleProcessor.receiptHandleGroupMap.isEmpty()); - } - - class MockChannel implements Channel { - @Override - public ChannelId id() { - return new ChannelId() { - @Override - public String asShortText() { - return "short"; - } - - @Override - public String asLongText() { - return "long"; - } - - @Override - public int compareTo(ChannelId o) { - return 1; - } - }; - } - - @Override - public EventLoop eventLoop() { - return null; - } - - @Override - public Channel parent() { - return null; - } - - @Override - public ChannelConfig config() { - return null; - } - - @Override - public boolean isOpen() { - return false; - } - - @Override - public boolean isRegistered() { - return false; - } - - @Override - public boolean isActive() { - return false; - } - - @Override - public ChannelMetadata metadata() { - return null; - } - - @Override - public SocketAddress localAddress() { - return null; - } - - @Override - public SocketAddress remoteAddress() { - return null; - } - - @Override - public ChannelFuture closeFuture() { - return null; - } - - @Override - public boolean isWritable() { - return false; - } - - @Override - public long bytesBeforeUnwritable() { - return 0; - } - - @Override - public long bytesBeforeWritable() { - return 0; - } - - @Override - public Unsafe unsafe() { - return null; - } - - @Override - public ChannelPipeline pipeline() { - return null; - } - - @Override - public ByteBufAllocator alloc() { - return null; - } - - @Override - public ChannelFuture bind(SocketAddress localAddress) { - return null; - } - - @Override - public ChannelFuture connect(SocketAddress remoteAddress) { - return null; - } - - @Override - public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { - return null; - } - - @Override - public ChannelFuture disconnect() { - return null; - } - - @Override - public ChannelFuture close() { - return null; - } - - @Override - public ChannelFuture deregister() { - return null; - } - - @Override - public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { - return null; - } - - @Override - public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { - return null; - } - - @Override - public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { - return null; - } - - @Override - public ChannelFuture disconnect(ChannelPromise promise) { - return null; - } - - @Override - public ChannelFuture close(ChannelPromise promise) { - return null; - } - - @Override - public ChannelFuture deregister(ChannelPromise promise) { - return null; - } - - @Override - public Channel read() { - return null; - } - - @Override - public ChannelFuture write(Object msg) { - return null; - } - - @Override - public ChannelFuture write(Object msg, ChannelPromise promise) { - return null; - } - - @Override - public Channel flush() { - return null; - } - - @Override - public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { - return null; - } - - @Override - public ChannelFuture writeAndFlush(Object msg) { - return null; - } - - @Override - public ChannelPromise newPromise() { - return null; - } - - @Override - public ChannelProgressivePromise newProgressivePromise() { - return null; - } - - @Override - public ChannelFuture newSucceededFuture() { - return null; - } - - @Override - public ChannelFuture newFailedFuture(Throwable cause) { - return null; - } - - @Override - public ChannelPromise voidPromise() { - return null; - } - - @Override - public <T> Attribute<T> attr(AttributeKey<T> key) { - return null; - } - - @Override - public <T> boolean hasAttr(AttributeKey<T> key) { - return false; - } - - @Override - public int compareTo(Channel o) { - return 1; - } + assertTrue(receiptHandleManager.receiptHandleGroupMap.isEmpty()); } -} +} \ No newline at end of file -- 2.32.0.windows.2
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.
浙ICP备2022010568号-2