Projects
Eulaceura:Factory
rocketmq
_service:obs_scm:patch035-backport-fix-some-bug...
Sign Up
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File _service:obs_scm:patch035-backport-fix-some-bugs.patch of Package rocketmq
From 1be5ebc7363e4bc6503c80688160a354f5a12f78 Mon Sep 17 00:00:00 2001 From: Zhanhui Li <lizhanhui@apache.org> Date: Mon, 13 Nov 2023 09:45:37 +0800 Subject: [PATCH 1/5] [ISSUE #7551] Reuse helper methods from Netty to free direct byte buffer (#7550) * Reuse helper methods from Netty to free direct byte buffer, making codebase JDK 9+ compatible Signed-off-by: Li Zhanhui <lizhanhui@gmail.com> * Guard against null Signed-off-by: Li Zhanhui <lizhanhui@gmail.com> * fix #7552 Signed-off-by: Li Zhanhui <lizhanhui@gmail.com> --------- Signed-off-by: Li Zhanhui <lizhanhui@gmail.com> --- .../org/apache/rocketmq/common/UtilAll.java | 60 +------------------ .../apache/rocketmq/common/UtilAllTest.java | 10 ---- 2 files changed, 3 insertions(+), 67 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java index 95b6b09b4..2808f106a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java @@ -16,21 +16,18 @@ */ package org.apache.rocketmq.common; +import io.netty.util.internal.PlatformDependent; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.lang.management.ManagementFactory; -import java.lang.reflect.Field; -import java.lang.reflect.Method; import java.net.Inet4Address; import java.net.Inet6Address; import java.net.InetAddress; import java.net.NetworkInterface; import java.nio.ByteBuffer; import java.nio.file.Files; -import java.security.AccessController; -import java.security.PrivilegedAction; import java.text.NumberFormat; import java.text.ParseException; import java.text.SimpleDateFormat; @@ -46,15 +43,11 @@ import java.util.function.Supplier; import java.util.zip.CRC32; import java.util.zip.DeflaterOutputStream; import java.util.zip.InflaterInputStream; -import org.apache.commons.lang3.JavaVersion; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.SystemUtils; import org.apache.commons.validator.routines.InetAddressValidator; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; -import sun.misc.Unsafe; -import sun.nio.ch.DirectBuffer; public class UtilAll { private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); @@ -707,57 +700,10 @@ public class UtilAll { } public static void cleanBuffer(final ByteBuffer buffer) { - if (buffer == null || !buffer.isDirect() || buffer.capacity() == 0) { + if (null == buffer) { return; } - if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) { - try { - Field field = Unsafe.class.getDeclaredField("theUnsafe"); - field.setAccessible(true); - Unsafe unsafe = (Unsafe) field.get(null); - Method cleaner = method(unsafe, "invokeCleaner", new Class[] {ByteBuffer.class}); - cleaner.invoke(unsafe, viewed(buffer)); - } catch (Exception e) { - throw new IllegalStateException(e); - } - } else { - invoke(invoke(viewed(buffer), "cleaner"), "clean"); - } - } - - public static Object invoke(final Object target, final String methodName, final Class<?>... args) { - return AccessController.doPrivileged(new PrivilegedAction<Object>() { - @Override - public Object run() { - try { - Method method = method(target, methodName, args); - method.setAccessible(true); - return method.invoke(target); - } catch (Exception e) { - throw new IllegalStateException(e); - } - } - }); - } - - public static Method method(Object target, String methodName, Class<?>[] args) throws NoSuchMethodException { - try { - return target.getClass().getMethod(methodName, args); - } catch (NoSuchMethodException e) { - return target.getClass().getDeclaredMethod(methodName, args); - } - } - - private static ByteBuffer viewed(ByteBuffer buffer) { - if (!buffer.isDirect()) { - throw new IllegalArgumentException("buffer is non-direct"); - } - ByteBuffer viewedBuffer = (ByteBuffer) ((DirectBuffer) buffer).attachment(); - if (viewedBuffer == null) { - return buffer; - } else { - return viewed(viewedBuffer); - } + PlatformDependent.freeDirectBuffer(buffer); } public static void ensureDirOK(final String dirName) { diff --git a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java index 94bb390eb..cb288578c 100644 --- a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java @@ -219,16 +219,6 @@ public class UtilAllTest { UtilAll.cleanBuffer(ByteBuffer.allocate(0)); } - @Test(expected = NoSuchMethodException.class) - public void testMethod() throws NoSuchMethodException { - UtilAll.method(new Object(), "noMethod", null); - } - - @Test(expected = IllegalStateException.class) - public void testInvoke() throws Exception { - UtilAll.invoke(new Object(), "noMethod"); - } - @Test public void testCalculateFileSizeInPath() throws Exception { /** -- 2.32.0.windows.2 From 4791d9a1f1a7c39e005da15f228473c04eafd007 Mon Sep 17 00:00:00 2001 From: lizhimins <707364882@qq.com> Date: Tue, 14 Nov 2023 17:59:47 +0800 Subject: [PATCH 2/5] [ISSUE #5923] Revert "Fix tiered store README.md error about Configuration (#7436)" (#7557) This reverts commit 70dc93abbcb9bf161378d66fcaca55bedc78b905. --- .../tieredstore/common/TieredMessageStoreConfig.java | 10 +++++----- .../tieredstore/provider/posix/PosixFileSegment.java | 4 ++-- .../rocketmq/tieredstore/file/TieredCommitLogTest.java | 2 +- .../provider/posix/PosixFileSegmentTest.java | 2 +- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java index a112ea6b1..595db6b86 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java @@ -115,7 +115,7 @@ public class TieredMessageStoreConfig { private long readAheadCacheExpireDuration = 10 * 1000; private double readAheadCacheSizeThresholdRate = 0.3; - private String tieredStoreFilepath = ""; + private String tieredStoreFilePath = ""; private String objectStoreEndpoint = ""; @@ -350,12 +350,12 @@ public class TieredMessageStoreConfig { this.readAheadCacheSizeThresholdRate = rate; } - public String getTieredStoreFilepath() { - return tieredStoreFilepath; + public String getTieredStoreFilePath() { + return tieredStoreFilePath; } - public void setTieredStoreFilepath(String tieredStoreFilepath) { - this.tieredStoreFilepath = tieredStoreFilepath; + public void setTieredStoreFilePath(String tieredStoreFilePath) { + this.tieredStoreFilePath = tieredStoreFilePath; } public void setObjectStoreEndpoint(String objectStoreEndpoint) { diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java index 708ce33f9..7e949cb28 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java @@ -66,8 +66,8 @@ public class PosixFileSegment extends TieredFileSegment { super(storeConfig, fileType, filePath, baseOffset); // basePath - String basePath = StringUtils.defaultString(storeConfig.getTieredStoreFilepath(), - StringUtils.appendIfMissing(storeConfig.getTieredStoreFilepath(), File.separator)); + String basePath = StringUtils.defaultString(storeConfig.getTieredStoreFilePath(), + StringUtils.appendIfMissing(storeConfig.getTieredStoreFilePath(), File.separator)); // fullPath: basePath/hash_cluster/broker/topic/queueId/fileType/baseOffset String brokerClusterName = storeConfig.getBrokerClusterName(); diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java index 80cdba977..6693d3cb7 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java @@ -49,7 +49,7 @@ public class TieredCommitLogTest { TieredMessageStoreConfig storeConfig = new TieredMessageStoreConfig(); storeConfig.setBrokerName("brokerName"); storeConfig.setStorePathRootDir(storePath); - storeConfig.setTieredStoreFilepath(storePath + File.separator); + storeConfig.setTieredStoreFilePath(storePath + File.separator); storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment"); storeConfig.setCommitLogRollingInterval(0); storeConfig.setTieredStoreCommitLogMaxSize(1000); diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java index ede62b8ce..db33ae847 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java @@ -42,7 +42,7 @@ public class PosixFileSegmentTest { @Before public void setUp() { storeConfig = new TieredMessageStoreConfig(); - storeConfig.setTieredStoreFilepath(storePath); + storeConfig.setTieredStoreFilePath(storePath); mq = new MessageQueue("OSSFileSegmentTest", "broker", 0); TieredStoreExecutor.init(); } -- 2.32.0.windows.2 From 651a5ca992988b90c7e4884e9975db0938557def Mon Sep 17 00:00:00 2001 From: Jixiang Jin <lollipop@apache.org> Date: Thu, 16 Nov 2023 10:16:16 +0800 Subject: [PATCH 3/5] [ISSUE #7562] BugFix for estimating message accumulation correctly (#7563) --- .../broker/metrics/ConsumerLagCalculator.java | 11 +++++--- .../proxy/common/utils/FilterUtilTest.java | 25 +++++++++++++++++++ .../remoting/protocol/filter/FilterAPI.java | 8 ++++++ 3 files changed, 40 insertions(+), 4 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java index 7a5f1f765..af08a83c7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java @@ -41,6 +41,7 @@ import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.remoting.protocol.filter.FilterAPI; import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.remoting.protocol.subscription.SimpleSubscriptionData; @@ -435,10 +436,12 @@ public class ConsumerLagCalculator { if (subscriptionGroupConfig != null) { for (SimpleSubscriptionData simpleSubscriptionData : subscriptionGroupConfig.getSubscriptionDataSet()) { if (topic.equals(simpleSubscriptionData.getTopic())) { - subscriptionData = new SubscriptionData(); - subscriptionData.setTopic(simpleSubscriptionData.getTopic()); - subscriptionData.setExpressionType(simpleSubscriptionData.getExpressionType()); - subscriptionData.setSubString(simpleSubscriptionData.getExpression()); + try { + subscriptionData = FilterAPI.buildSubscriptionData(simpleSubscriptionData.getTopic(), + simpleSubscriptionData.getExpression(), simpleSubscriptionData.getExpressionType()); + } catch (Exception e) { + LOGGER.error("Try to build subscription for group:{}, topic:{} exception.", group, topic, e); + } break; } } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/common/utils/FilterUtilTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/common/utils/FilterUtilTest.java index 23389e9d3..7c9d84015 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/common/utils/FilterUtilTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/common/utils/FilterUtilTest.java @@ -48,4 +48,29 @@ public class FilterUtilTest { assertThat(FilterUtils.isTagMatched(subscriptionData.getTagsSet(), null)).isFalse(); } + @Test + public void testBuildSubscriptionData() throws Exception { + // Test case 1: expressionType is null, will use TAG as default. + String topic = "topic"; + String subString = "substring"; + String expressionType = null; + SubscriptionData result = FilterAPI.buildSubscriptionData(topic, subString, expressionType); + assertThat(result).isNotNull(); + assertThat(topic).isEqualTo(result.getTopic()); + assertThat(subString).isEqualTo(result.getSubString()); + assertThat(result.getExpressionType()).isEqualTo("TAG"); + assertThat(result.getCodeSet().size()).isEqualTo(1); + + // Test case 2: expressionType is not null + topic = "topic"; + subString = "substring1||substring2"; + expressionType = "SQL92"; + result = FilterAPI.buildSubscriptionData(topic, subString, expressionType); + assertThat(result).isNotNull(); + assertThat(topic).isEqualTo(result.getTopic()); + assertThat(subString).isEqualTo(result.getSubString()); + assertThat(result.getExpressionType()).isEqualTo(expressionType); + assertThat(result.getCodeSet().size()).isEqualTo(2); + } + } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/filter/FilterAPI.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/filter/FilterAPI.java index 10a6bb463..f291bfccf 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/filter/FilterAPI.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/filter/FilterAPI.java @@ -46,6 +46,14 @@ public class FilterAPI { return subscriptionData; } + public static SubscriptionData buildSubscriptionData(String topic, String subString, String expressionType) throws Exception { + final SubscriptionData subscriptionData = buildSubscriptionData(topic, subString); + if (StringUtils.isNotBlank(expressionType)) { + subscriptionData.setExpressionType(expressionType); + } + return subscriptionData; + } + public static SubscriptionData build(final String topic, final String subString, final String type) throws Exception { if (ExpressionType.TAG.equals(type) || type == null) { -- 2.32.0.windows.2 From 01a2aef96bdfb17c5f82415141ef421efb4e3bc7 Mon Sep 17 00:00:00 2001 From: cnScarb <jjhfen00@163.com> Date: Fri, 17 Nov 2023 15:58:14 +0800 Subject: [PATCH 4/5] [ISSUE #7570] Add default value for lastPopTimestamp (#7571) --- .../apache/rocketmq/client/impl/consumer/PopProcessQueue.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PopProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PopProcessQueue.java index 3b39b86cc..50827545b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PopProcessQueue.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PopProcessQueue.java @@ -26,7 +26,7 @@ public class PopProcessQueue { private final static long PULL_MAX_IDLE_TIME = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000")); - private long lastPopTimestamp; + private long lastPopTimestamp = System.currentTimeMillis(); private AtomicInteger waitAckCounter = new AtomicInteger(0); private volatile boolean dropped = false; -- 2.32.0.windows.2 From 8e7e2b5f50e0db14b77462ef1574d4020c0fd986 Mon Sep 17 00:00:00 2001 From: guyinyou <36399867+guyinyou@users.noreply.github.com> Date: Mon, 20 Nov 2023 19:32:57 +0800 Subject: [PATCH 5/5] [ISSUE #7574] Fix RunningFlags conflict Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com> --- store/src/main/java/org/apache/rocketmq/store/RunningFlags.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java index 91fcb155a..88b398a77 100644 --- a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java +++ b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java @@ -30,7 +30,7 @@ public class RunningFlags { private static final int FENCED_BIT = 1 << 5; - private static final int LOGIC_DISK_FULL_BIT = 1 << 5; + private static final int LOGIC_DISK_FULL_BIT = 1 << 6; private volatile int flagBits = 0; -- 2.32.0.windows.2
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.
浙ICP备2022010568号-2