Projects
home:Eustace:branches:Eulaceura:Factory
rocketmq
_service:obs_scm:patch034-backport-Let-consumer...
Sign Up
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File _service:obs_scm:patch034-backport-Let-consumer-be-aware-of-message-queue-assignment-change.patch of Package rocketmq
From 27759f3556c279f63c13bc94fe3ad6ca55558114 Mon Sep 17 00:00:00 2001 From: Allon Murienik <mureinik@gmail.com> Date: Thu, 9 Nov 2023 06:33:34 +0200 Subject: [PATCH 1/2] Fix unstable UtilAllTest#testCalculateFileSizeInPath on Windows (#7419) This patch offers an alternative approach to 5d492c338258d07613103e6ae16df4c6fa5b3838. Instead of manually setting up the directory #testCalculateFileSizeInPath needs and then recursively deleting it, it uses JUnit's TemporaryFolder Rule to handle all of this work, and allows the code to concentrate on the business logic. Closes #7418 --- .../apache/rocketmq/common/UtilAllTest.java | 86 ++++++++----------- 1 file changed, 36 insertions(+), 50 deletions(-) diff --git a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java index a0653d7fc..94bb390eb 100644 --- a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java @@ -26,7 +26,10 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Properties; + +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.within; @@ -34,6 +37,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; public class UtilAllTest { + @Rule + public TemporaryFolder tempDir = new TemporaryFolder(); @Test public void testCurrentStackTrace() { @@ -236,56 +241,37 @@ public class UtilAllTest { * - file_1_2_0 * - dir_2 */ - String basePath = System.getProperty("java.io.tmpdir") + File.separator + "testCalculateFileSizeInPath"; - File baseFile = new File(basePath); - try { - // test empty path - assertEquals(0, UtilAll.calculateFileSizeInPath(baseFile)); - - // create baseDir - assertTrue(baseFile.mkdirs()); - - File file0 = new File(baseFile, "file_0"); - assertTrue(file0.createNewFile()); - writeFixedBytesToFile(file0, 1313); - - assertEquals(1313, UtilAll.calculateFileSizeInPath(baseFile)); - - // build a file tree like above - File dir1 = new File(baseFile, "dir_1"); - dir1.mkdirs(); - File file10 = new File(dir1, "file_1_0"); - File file11 = new File(dir1, "file_1_1"); - File dir12 = new File(dir1, "dir_1_2"); - dir12.mkdirs(); - File file120 = new File(dir12, "file_1_2_0"); - File dir2 = new File(baseFile, "dir_2"); - dir2.mkdirs(); - - // write all file with 1313 bytes data - assertTrue(file10.createNewFile()); - writeFixedBytesToFile(file10, 1313); - assertTrue(file11.createNewFile()); - writeFixedBytesToFile(file11, 1313); - assertTrue(file120.createNewFile()); - writeFixedBytesToFile(file120, 1313); - - assertEquals(1313 * 4, UtilAll.calculateFileSizeInPath(baseFile)); - } finally { - deleteFolder(baseFile); - } - } - - public static void deleteFolder(File folder) { - if (folder.isDirectory()) { - File[] files = folder.listFiles(); - if (files != null) { - for (File file : files) { - deleteFolder(file); - } - } - } - folder.delete(); + File baseFile = tempDir.getRoot(); + + // test empty path + assertEquals(0, UtilAll.calculateFileSizeInPath(baseFile)); + + File file0 = new File(baseFile, "file_0"); + assertTrue(file0.createNewFile()); + writeFixedBytesToFile(file0, 1313); + + assertEquals(1313, UtilAll.calculateFileSizeInPath(baseFile)); + + // build a file tree like above + File dir1 = new File(baseFile, "dir_1"); + dir1.mkdirs(); + File file10 = new File(dir1, "file_1_0"); + File file11 = new File(dir1, "file_1_1"); + File dir12 = new File(dir1, "dir_1_2"); + dir12.mkdirs(); + File file120 = new File(dir12, "file_1_2_0"); + File dir2 = new File(baseFile, "dir_2"); + dir2.mkdirs(); + + // write all file with 1313 bytes data + assertTrue(file10.createNewFile()); + writeFixedBytesToFile(file10, 1313); + assertTrue(file11.createNewFile()); + writeFixedBytesToFile(file11, 1313); + assertTrue(file120.createNewFile()); + writeFixedBytesToFile(file120, 1313); + + assertEquals(1313 * 4, UtilAll.calculateFileSizeInPath(baseFile)); } private void writeFixedBytesToFile(File file, int size) throws Exception { -- 2.32.0.windows.2 From 2ed27214d84799c62a6f3180d2b01075412e4ef8 Mon Sep 17 00:00:00 2001 From: Zhanhui Li <lizhanhui@apache.org> Date: Mon, 13 Nov 2023 09:44:25 +0800 Subject: [PATCH 2/2] [ISSUE #7547] Let consumer be aware of message queue assignment change (#7548) * let consumer be aware of message queue assignment change Signed-off-by: Li Zhanhui <lizhanhui@gmail.com> * add unit test for DefaultMQPushConsumer#setMessageQueueListener Signed-off-by: Li Zhanhui <lizhanhui@gmail.com> * fix: bazel build warnings Signed-off-by: Zhanhui Li <lizhanhui@apache.org> * fix: set MixCommitlogTest test size as medium Signed-off-by: Zhanhui Li <lizhanhui@apache.org> * allow cache bazel test results Signed-off-by: Li Zhanhui <lizhanhui@gmail.com> * fix code style issue by removing unused imports Signed-off-by: Li Zhanhui <lizhanhui@gmail.com> * fix #7552 Signed-off-by: Zhanhui Li <lizhanhui@apache.org> --------- Signed-off-by: Li Zhanhui <lizhanhui@gmail.com> Signed-off-by: Zhanhui Li <lizhanhui@apache.org> --- .../client/consumer/DefaultMQPushConsumer.java | 13 +++++++++++++ .../rocketmq/client/consumer/MQConsumer.java | 8 +++++--- .../client/consumer/MessageQueueListener.java | 5 ++--- .../consumer/DefaultMQPushConsumerImpl.java | 10 +++++++++- .../client/impl/consumer/RebalancePushImpl.java | 8 +++++++- .../service/message/LocalRemotingCommand.java | 1 + .../balance/NormalMsgDynamicBalanceIT.java | 17 +++++++++++++++++ 7 files changed, 54 insertions(+), 8 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 1afb9113e..e593a17c9 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -150,6 +150,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume */ private MessageListener messageListener; + /** + * Listener to call if message queue assignment is changed. + */ + private MessageQueueListener messageQueueListener; + /** * Offset Storage */ @@ -987,4 +992,12 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume public void setClientRebalance(boolean clientRebalance) { this.clientRebalance = clientRebalance; } + + public MessageQueueListener getMessageQueueListener() { + return messageQueueListener; + } + + public void setMessageQueueListener(MessageQueueListener messageQueueListener) { + this.messageQueueListener = messageQueueListener; + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java index f4a8eda23..81e06ee41 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java @@ -29,20 +29,22 @@ import org.apache.rocketmq.remoting.exception.RemotingException; */ public interface MQConsumer extends MQAdmin { /** - * If consuming failure,message will be send back to the brokers,and delay consuming some time + * If consuming of messages failed, they will be sent back to the brokers for another delivery attempt after + * interval specified in delay level. */ @Deprecated void sendMessageBack(final MessageExt msg, final int delayLevel) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; /** - * If consuming failure,message will be send back to the broker,and delay consuming some time + * If consuming of messages failed, they will be sent back to the brokers for another delivery attempt after + * interval specified in delay level. */ void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; /** - * Fetch message queues from consumer cache according to the topic + * Fetch message queues from consumer cache pertaining to the given topic. * * @param topic message topic * @return queue set diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java index 63795a6ee..74510f4c3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java @@ -26,8 +26,7 @@ public interface MessageQueueListener { /** * @param topic message topic * @param mqAll all queues in this message topic - * @param mqDivided collection of queues,assigned to the current consumer + * @param mqAssigned collection of queues, assigned to the current consumer */ - void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll, - final Set<MessageQueue> mqDivided); + void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll, final Set<MessageQueue> mqAssigned); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index e57579321..cfb89b5c8 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -35,6 +35,7 @@ import org.apache.rocketmq.client.consumer.AckCallback; import org.apache.rocketmq.client.consumer.AckResult; import org.apache.rocketmq.client.consumer.AckStatus; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.MessageQueueListener; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.PopCallback; import org.apache.rocketmq.client.consumer.PopResult; @@ -132,7 +133,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { private long queueMaxSpanFlowControlTimes = 0; //10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h - private int[] popDelayLevel = new int[] {10, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200}; + private final int[] popDelayLevel = new int[] {10, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200}; private static final int MAX_POP_INVISIBLE_TIME = 300000; private static final int MIN_POP_INVISIBLE_TIME = 5000; @@ -1553,4 +1554,11 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { int[] getPopDelayLevel() { return popDelayLevel; } + + public MessageQueueListener getMessageQueueListener() { + if (null == defaultMQPushConsumer) { + return null; + } + return defaultMQPushConsumer.getMessageQueueListener(); + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java index df509f371..f9cf429c6 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; +import org.apache.rocketmq.client.consumer.MessageQueueListener; import org.apache.rocketmq.client.consumer.store.OffsetStore; import org.apache.rocketmq.client.consumer.store.ReadOffsetType; import org.apache.rocketmq.client.exception.MQClientException; @@ -52,7 +53,7 @@ public class RebalancePushImpl extends RebalanceImpl { @Override public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { - /** + /* * When rebalance result changed, should update subscription's version to notify broker. * Fix: inconsistency subscription may lead to consumer miss messages. */ @@ -82,6 +83,11 @@ public class RebalancePushImpl extends RebalanceImpl { // notify broker this.getmQClientFactory().sendHeartbeatToAllBrokerWithLockV2(true); + + MessageQueueListener messageQueueListener = defaultMQPushConsumerImpl.getMessageQueueListener(); + if (null != messageQueueListener) { + messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided); + } } @Override diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java index 915cafcd5..7bf4a1698 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java @@ -32,6 +32,7 @@ public class LocalRemotingCommand extends RemotingCommand { cmd.writeCustomHeader(customHeader); cmd.setExtFields(new HashMap<>()); setCmdVersion(cmd); + cmd.makeCustomHeaderToNet(); return cmd; } diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java index b2c9b0658..684b718ae 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java @@ -17,6 +17,8 @@ package org.apache.rocketmq.test.client.consumer.balance; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.test.base.BaseConf; @@ -112,4 +114,19 @@ public class NormalMsgDynamicBalanceIT extends BaseConf { consumer2.getListener().getAllUndupMsgBody()).size()); assertThat(balance).isEqualTo(true); } + + @Test + public void testMessageQueueListener() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + + RMQNormalConsumer consumer1 = getConsumer(NAMESRV_ADDR, topic, "*", new RMQNormalListener()); + // Register message queue listener + consumer1.getConsumer().setMessageQueueListener((topic, mqAll, mqAssigned) -> latch.countDown()); + + // Without message queue listener + RMQNormalConsumer consumer2 = getConsumer(NAMESRV_ADDR, consumer1.getConsumerGroup(), topic, + "*", new RMQNormalListener()); + + Assert.assertTrue(latch.await(30, TimeUnit.SECONDS)); + } } -- 2.32.0.windows.2
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.
浙ICP备2022010568号-2