Projects
Eulaceura:Factory
rocketmq
_service:obs_scm:patch011-backport-optimize-con...
Sign Up
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File _service:obs_scm:patch011-backport-optimize-config.patch of Package rocketmq
From 50d1050437ed8748f86ee50261b50a1e1f63162e Mon Sep 17 00:00:00 2001 From: Jixiang Jin <lollipop@apache.org> Date: Wed, 16 Aug 2023 21:15:00 +0800 Subject: [PATCH 1/7] To config the cardinalityLimit for openTelemetry metrics exporting and fix logging config for metrics (#7196) --- WORKSPACE | 14 +++--- .../broker/metrics/BrokerMetricsManager.java | 47 ++++++++++++++----- .../broker/metrics/PopMetricsManager.java | 11 +++-- .../src/main/resources/rmq.broker.logback.xml | 17 ++++--- .../apache/rocketmq/common/BrokerConfig.java | 9 ++++ .../metrics/ControllerMetricsManager.java | 6 +-- pom.xml | 4 +- .../metrics/RemotingMetricsManager.java | 10 ++-- .../rocketmq/store/DefaultMessageStore.java | 24 +++++----- .../apache/rocketmq/store/MessageStore.java | 6 +-- .../metrics/DefaultStoreMetricsManager.java | 4 +- .../plugin/AbstractPluginMessageStore.java | 6 +-- .../tieredstore/TieredMessageStore.java | 6 +-- .../metrics/TieredStoreMetricsManager.java | 23 +++++---- 14 files changed, 110 insertions(+), 77 deletions(-) diff --git a/WORKSPACE b/WORKSPACE index a8a0aafe9..3126f2d1d 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -88,14 +88,14 @@ maven_install( "io.grpc:grpc-api:1.47.0", "io.grpc:grpc-testing:1.47.0", "org.springframework:spring-core:5.3.26", - "io.opentelemetry:opentelemetry-exporter-otlp:1.19.0", - "io.opentelemetry:opentelemetry-exporter-prometheus:1.19.0-alpha", - "io.opentelemetry:opentelemetry-exporter-logging:1.19.0", - "io.opentelemetry:opentelemetry-sdk:1.19.0", + "io.opentelemetry:opentelemetry-exporter-otlp:1.29.0", + "io.opentelemetry:opentelemetry-exporter-prometheus:1.29.0-alpha", + "io.opentelemetry:opentelemetry-exporter-logging:1.29.0", + "io.opentelemetry:opentelemetry-sdk:1.29.0", "com.squareup.okio:okio-jvm:3.0.0", - "io.opentelemetry:opentelemetry-api:1.19.0", - "io.opentelemetry:opentelemetry-sdk-metrics:1.19.0", - "io.opentelemetry:opentelemetry-sdk-common:1.19.0", + "io.opentelemetry:opentelemetry-api:1.29.0", + "io.opentelemetry:opentelemetry-sdk-metrics:1.29.0", + "io.opentelemetry:opentelemetry-sdk-common:1.29.0", "io.github.aliyunmq:rocketmq-slf4j-api:1.0.0", "io.github.aliyunmq:rocketmq-logback-classic:1.0.0", "org.slf4j:jul-to-slf4j:2.0.6", diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java index f0b76107e..6af5afc14 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java @@ -34,8 +34,10 @@ import io.opentelemetry.sdk.metrics.InstrumentType; import io.opentelemetry.sdk.metrics.SdkMeterProvider; import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; import io.opentelemetry.sdk.metrics.View; +import io.opentelemetry.sdk.metrics.ViewBuilder; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; +import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil; import io.opentelemetry.sdk.resources.Resource; import java.util.ArrayList; import java.util.Arrays; @@ -361,22 +363,45 @@ public class BrokerMetricsManager { .setType(InstrumentType.HISTOGRAM) .setName(HISTOGRAM_MESSAGE_SIZE) .build(); - View messageSizeView = View.builder() - .setAggregation(Aggregation.explicitBucketHistogram(messageSizeBuckets)) - .build(); - providerBuilder.registerView(messageSizeSelector, messageSizeView); - - for (Pair<InstrumentSelector, View> selectorViewPair : RemotingMetricsManager.getMetricsView()) { - providerBuilder.registerView(selectorViewPair.getObject1(), selectorViewPair.getObject2()); + ViewBuilder messageSizeViewBuilder = View.builder() + .setAggregation(Aggregation.explicitBucketHistogram(messageSizeBuckets)); + // To config the cardinalityLimit for openTelemetry metrics exporting. + SdkMeterProviderUtil.setCardinalityLimit(messageSizeViewBuilder, brokerConfig.getMetricsOtelCardinalityLimit()); + providerBuilder.registerView(messageSizeSelector, messageSizeViewBuilder.build()); + + for (Pair<InstrumentSelector, ViewBuilder> selectorViewPair : RemotingMetricsManager.getMetricsView()) { + ViewBuilder viewBuilder = selectorViewPair.getObject2(); + SdkMeterProviderUtil.setCardinalityLimit(viewBuilder, brokerConfig.getMetricsOtelCardinalityLimit()); + providerBuilder.registerView(selectorViewPair.getObject1(), viewBuilder.build()); } - for (Pair<InstrumentSelector, View> selectorViewPair : messageStore.getMetricsView()) { - providerBuilder.registerView(selectorViewPair.getObject1(), selectorViewPair.getObject2()); + for (Pair<InstrumentSelector, ViewBuilder> selectorViewPair : messageStore.getMetricsView()) { + ViewBuilder viewBuilder = selectorViewPair.getObject2(); + SdkMeterProviderUtil.setCardinalityLimit(viewBuilder, brokerConfig.getMetricsOtelCardinalityLimit()); + providerBuilder.registerView(selectorViewPair.getObject1(), viewBuilder.build()); } - for (Pair<InstrumentSelector, View> selectorViewPair : PopMetricsManager.getMetricsView()) { - providerBuilder.registerView(selectorViewPair.getObject1(), selectorViewPair.getObject2()); + for (Pair<InstrumentSelector, ViewBuilder> selectorViewPair : PopMetricsManager.getMetricsView()) { + ViewBuilder viewBuilder = selectorViewPair.getObject2(); + SdkMeterProviderUtil.setCardinalityLimit(viewBuilder, brokerConfig.getMetricsOtelCardinalityLimit()); + providerBuilder.registerView(selectorViewPair.getObject1(), viewBuilder.build()); } + + // default view builder for all counter. + InstrumentSelector defaultCounterSelector = InstrumentSelector.builder() + .setType(InstrumentType.COUNTER) + .build(); + ViewBuilder defaultCounterViewBuilder = View.builder().setDescription("default view for counter."); + SdkMeterProviderUtil.setCardinalityLimit(defaultCounterViewBuilder, brokerConfig.getMetricsOtelCardinalityLimit()); + providerBuilder.registerView(defaultCounterSelector, defaultCounterViewBuilder.build()); + + //default view builder for all observable gauge. + InstrumentSelector defaultGaugeSelector = InstrumentSelector.builder() + .setType(InstrumentType.OBSERVABLE_GAUGE) + .build(); + ViewBuilder defaultGaugeViewBuilder = View.builder().setDescription("default view for gauge."); + SdkMeterProviderUtil.setCardinalityLimit(defaultGaugeViewBuilder, brokerConfig.getMetricsOtelCardinalityLimit()); + providerBuilder.registerView(defaultGaugeSelector, defaultGaugeViewBuilder.build()); } private void initStatsMetrics() { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java index 463371d7e..2de220da1 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java @@ -27,6 +27,7 @@ import io.opentelemetry.sdk.metrics.Aggregation; import io.opentelemetry.sdk.metrics.InstrumentSelector; import io.opentelemetry.sdk.metrics.InstrumentType; import io.opentelemetry.sdk.metrics.View; +import io.opentelemetry.sdk.metrics.ViewBuilder; import java.time.Duration; import java.util.Arrays; import java.util.List; @@ -63,7 +64,7 @@ public class PopMetricsManager { private static LongCounter popReviveGetTotal = new NopLongCounter(); private static LongCounter popReviveRetryMessageTotal = new NopLongCounter(); - public static List<Pair<InstrumentSelector, View>> getMetricsView() { + public static List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() { List<Double> rpcCostTimeBuckets = Arrays.asList( (double) Duration.ofMillis(1).toMillis(), (double) Duration.ofMillis(10).toMillis(), @@ -76,10 +77,10 @@ public class PopMetricsManager { .setType(InstrumentType.HISTOGRAM) .setName(HISTOGRAM_POP_BUFFER_SCAN_TIME_CONSUME) .build(); - View popBufferScanTimeConsumeView = View.builder() - .setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets)) - .build(); - return Lists.newArrayList(new Pair<>(popBufferScanTimeConsumeSelector, popBufferScanTimeConsumeView)); + ViewBuilder popBufferScanTimeConsumeViewBuilder = View.builder() + .setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets)); + + return Lists.newArrayList(new Pair<>(popBufferScanTimeConsumeSelector, popBufferScanTimeConsumeViewBuilder)); } public static void initMetrics(Meter meter, BrokerController brokerController, diff --git a/broker/src/main/resources/rmq.broker.logback.xml b/broker/src/main/resources/rmq.broker.logback.xml index 7d49f6664..3c51e59d4 100644 --- a/broker/src/main/resources/rmq.broker.logback.xml +++ b/broker/src/main/resources/rmq.broker.logback.xml @@ -559,27 +559,27 @@ </sift> </appender> - <appender name="RocketmqBrokerMetricsAppender" class="ch.qos.logback.classic.sift.SiftingAppender"> + <appender name="RocketmqBrokerMetricsSiftingAppender_inner" class="ch.qos.logback.classic.sift.SiftingAppender"> <discriminator> <key>brokerContainerLogDir</key> <defaultValue>${file.separator}</defaultValue> </discriminator> <sift> - <appender name="RocketmqCommercialAppender" + <appender name="RocketmqBrokerMetricsAppender" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file> - ${user.home}${file.separator}logs${file.separator}rocketmqlogs${brokerLogDir:-${file.separator}}${brokerContainerLogDir}${file.separator}broker_metric.log + ${user.home}${file.separator}logs${file.separator}rocketmqlogs${brokerLogDir:-${file.separator}}${brokerContainerLogDir}${file.separator}broker_metrics.log </file> <append>true</append> <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy"> <fileNamePattern> - ${user.home}${file.separator}logs${file.separator}rocketmqlogs${brokerLogDir:-${file.separator}}${brokerContainerLogDir}${file.separator}otherdays${file.separator}broker_metric.%i.log.gz + ${user.home}${file.separator}logs${file.separator}rocketmqlogs${brokerLogDir:-${file.separator}}${brokerContainerLogDir}${file.separator}otherdays${file.separator}broker_metrics.%i.log.gz </fileNamePattern> <minIndex>1</minIndex> - <maxIndex>10</maxIndex> + <maxIndex>3</maxIndex> </rollingPolicy> <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> - <maxFileSize>500MB</maxFileSize> + <maxFileSize>512MB</maxFileSize> </triggeringPolicy> <encoder> <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern> @@ -588,6 +588,9 @@ </appender> </sift> </appender> + <appender name="RocketmqBrokerMetricsSiftingAppender" class="ch.qos.logback.classic.AsyncAppender"> + <appender-ref ref="RocketmqBrokerMetricsSiftingAppender_inner"/> + </appender> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder> @@ -670,7 +673,7 @@ </logger> <logger name="io.opentelemetry.exporter.logging.LoggingMetricExporter" additivity="false" level="INFO"> - <appender-ref ref="RocketmqBrokerMetricsAppender"/> + <appender-ref ref="RocketmqBrokerMetricsSiftingAppender"/> </logger> <root level="INFO"> diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 99a5db5ad..45d26b29c 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -350,6 +350,7 @@ public class BrokerConfig extends BrokerIdentity { private MetricsExporterType metricsExporterType = MetricsExporterType.DISABLE; + private int metricsOtelCardinalityLimit = 50 * 1000; private String metricsGrpcExporterTarget = ""; private String metricsGrpcExporterHeader = ""; private long metricGrpcExporterTimeOutInMills = 3 * 1000; @@ -1531,6 +1532,14 @@ public class BrokerConfig extends BrokerIdentity { this.metricsExporterType = MetricsExporterType.valueOf(metricsExporterType); } + public int getMetricsOtelCardinalityLimit() { + return metricsOtelCardinalityLimit; + } + + public void setMetricsOtelCardinalityLimit(int metricsOtelCardinalityLimit) { + this.metricsOtelCardinalityLimit = metricsOtelCardinalityLimit; + } + public String getMetricsGrpcExporterTarget() { return metricsGrpcExporterTarget; } diff --git a/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsManager.java b/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsManager.java index 9b30a3b43..650740bcc 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsManager.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsManager.java @@ -203,7 +203,7 @@ public class ControllerMetricsManager { 10 * s ); - View latecyView = View.builder() + View latencyView = View.builder() .setAggregation(Aggregation.explicitBucketHistogram(latencyBuckets)) .build(); @@ -217,8 +217,8 @@ public class ControllerMetricsManager { .setName(HISTOGRAM_DLEDGER_OP_LATENCY) .build(); - providerBuilder.registerView(requestLatencySelector, latecyView); - providerBuilder.registerView(dLedgerOpLatencySelector, latecyView); + providerBuilder.registerView(requestLatencySelector, latencyView); + providerBuilder.registerView(dLedgerOpLatencySelector, latencyView); } private void initMetric(Meter meter) { diff --git a/pom.xml b/pom.xml index 3a08d75f2..9f0b3eb96 100644 --- a/pom.xml +++ b/pom.xml @@ -133,8 +133,8 @@ <caffeine.version>2.9.3</caffeine.version> <spring.version>5.3.27</spring.version> <okio-jvm.version>3.0.0</okio-jvm.version> - <opentelemetry.version>1.26.0</opentelemetry.version> - <opentelemetry-exporter-prometheus.version>1.26.0-alpha</opentelemetry-exporter-prometheus.version> + <opentelemetry.version>1.29.0</opentelemetry.version> + <opentelemetry-exporter-prometheus.version>1.29.0-alpha</opentelemetry-exporter-prometheus.version> <jul-to-slf4j.version>2.0.6</jul-to-slf4j.version> <s3.version>2.20.29</s3.version> <rocksdb.version>1.0.3</rocksdb.version> diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java index 34136f94f..2e0d70856 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java @@ -26,6 +26,7 @@ import io.opentelemetry.sdk.metrics.Aggregation; import io.opentelemetry.sdk.metrics.InstrumentSelector; import io.opentelemetry.sdk.metrics.InstrumentType; import io.opentelemetry.sdk.metrics.View; +import io.opentelemetry.sdk.metrics.ViewBuilder; import java.time.Duration; import java.util.Arrays; import java.util.List; @@ -61,7 +62,7 @@ public class RemotingMetricsManager { .build(); } - public static List<Pair<InstrumentSelector, View>> getMetricsView() { + public static List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() { List<Double> rpcCostTimeBuckets = Arrays.asList( (double) Duration.ofMillis(1).toMillis(), (double) Duration.ofMillis(3).toMillis(), @@ -77,10 +78,9 @@ public class RemotingMetricsManager { .setType(InstrumentType.HISTOGRAM) .setName(HISTOGRAM_RPC_LATENCY) .build(); - View view = View.builder() - .setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets)) - .build(); - return Lists.newArrayList(new Pair<>(selector, view)); + ViewBuilder viewBuilder = View.builder() + .setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets)); + return Lists.newArrayList(new Pair<>(selector, viewBuilder)); } public static String getWriteAndFlushResult(Future<?> future) { diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 25e4a166f..6115ead59 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -22,7 +22,7 @@ import io.openmessaging.storage.dledger.entry.DLedgerEntry; import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.sdk.metrics.InstrumentSelector; -import io.opentelemetry.sdk.metrics.View; +import io.opentelemetry.sdk.metrics.ViewBuilder; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; @@ -42,23 +42,24 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.AbstractBrokerRunnable; +import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.BrokerIdentity; import org.apache.rocketmq.common.MixAll; @@ -82,7 +83,6 @@ import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.common.utils.CleanupPolicyUtils; import org.apache.rocketmq.common.utils.QueueTypeUtils; import org.apache.rocketmq.common.utils.ServiceProvider; -import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo; @@ -3268,7 +3268,7 @@ public class DefaultMessageStore implements MessageStore { } @Override - public List<Pair<InstrumentSelector, View>> getMetricsView() { + public List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() { return DefaultStoreMetricsManager.getMetricsView(); } diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index 31bbb907f..989cbbe31 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -19,8 +19,7 @@ package org.apache.rocketmq.store; import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.sdk.metrics.InstrumentSelector; -import io.opentelemetry.sdk.metrics.View; - +import io.opentelemetry.sdk.metrics.ViewBuilder; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.LinkedList; @@ -28,7 +27,6 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; - import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.SystemClock; @@ -964,7 +962,7 @@ public interface MessageStore { * * @return List of metrics selector and view pair */ - List<Pair<InstrumentSelector, View>> getMetricsView(); + List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView(); /** * Init store metrics diff --git a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java index ff87f6369..45a6bbc68 100644 --- a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java +++ b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java @@ -23,7 +23,7 @@ import io.opentelemetry.api.metrics.LongCounter; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.ObservableLongGauge; import io.opentelemetry.sdk.metrics.InstrumentSelector; -import io.opentelemetry.sdk.metrics.View; +import io.opentelemetry.sdk.metrics.ViewBuilder; import java.io.File; import java.util.List; import java.util.function.Supplier; @@ -69,7 +69,7 @@ public class DefaultStoreMetricsManager { public static LongCounter timerDequeueTotal = new NopLongCounter(); public static LongCounter timerEnqueueTotal = new NopLongCounter(); - public static List<Pair<InstrumentSelector, View>> getMetricsView() { + public static List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() { return Lists.newArrayList(); } diff --git a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java index 25e947512..ab9fc6da7 100644 --- a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java @@ -20,8 +20,7 @@ package org.apache.rocketmq.store.plugin; import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.sdk.metrics.InstrumentSelector; -import io.opentelemetry.sdk.metrics.View; - +import io.opentelemetry.sdk.metrics.ViewBuilder; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.LinkedList; @@ -29,7 +28,6 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; - import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.SystemClock; import org.apache.rocketmq.common.message.MessageExt; @@ -643,7 +641,7 @@ public abstract class AbstractPluginMessageStore implements MessageStore { } @Override - public List<Pair<InstrumentSelector, View>> getMetricsView() { + public List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() { return next.getMetricsView(); } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java index ced1fb818..5240ac8e9 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java @@ -21,7 +21,7 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.sdk.metrics.InstrumentSelector; -import io.opentelemetry.sdk.metrics.View; +import io.opentelemetry.sdk.metrics.ViewBuilder; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -352,8 +352,8 @@ public class TieredMessageStore extends AbstractPluginMessageStore { } @Override - public List<Pair<InstrumentSelector, View>> getMetricsView() { - List<Pair<InstrumentSelector, View>> res = super.getMetricsView(); + public List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() { + List<Pair<InstrumentSelector, ViewBuilder>> res = super.getMetricsView(); res.addAll(TieredStoreMetricsManager.getMetricsView()); return res; } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java index 3ca0fb614..d8a07f0a7 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java @@ -27,6 +27,7 @@ import io.opentelemetry.sdk.metrics.Aggregation; import io.opentelemetry.sdk.metrics.InstrumentSelector; import io.opentelemetry.sdk.metrics.InstrumentType; import io.opentelemetry.sdk.metrics.View; +import io.opentelemetry.sdk.metrics.ViewBuilder; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -101,8 +102,8 @@ public class TieredStoreMetricsManager { public static ObservableLongGauge storageSize = new NopObservableLongGauge(); public static ObservableLongGauge storageMessageReserveTime = new NopObservableLongGauge(); - public static List<Pair<InstrumentSelector, View>> getMetricsView() { - ArrayList<Pair<InstrumentSelector, View>> res = new ArrayList<>(); + public static List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() { + ArrayList<Pair<InstrumentSelector, ViewBuilder>> res = new ArrayList<>(); InstrumentSelector providerRpcLatencySelector = InstrumentSelector.builder() .setType(InstrumentType.HISTOGRAM) @@ -114,10 +115,9 @@ public class TieredStoreMetricsManager { .setName(HISTOGRAM_API_LATENCY) .build(); - View rpcLatencyView = View.builder() + ViewBuilder rpcLatencyViewBuilder = View.builder() .setAggregation(Aggregation.explicitBucketHistogram(Arrays.asList(1d, 3d, 5d, 7d, 10d, 100d, 200d, 400d, 600d, 800d, 1d * 1000, 1d * 1500, 1d * 3000))) - .setDescription("tiered_store_rpc_latency_view") - .build(); + .setDescription("tiered_store_rpc_latency_view"); InstrumentSelector uploadBufferSizeSelector = InstrumentSelector.builder() .setType(InstrumentType.HISTOGRAM) @@ -129,15 +129,14 @@ public class TieredStoreMetricsManager { .setName(HISTOGRAM_DOWNLOAD_BYTES) .build(); - View bufferSizeView = View.builder() + ViewBuilder bufferSizeViewBuilder = View.builder() .setAggregation(Aggregation.explicitBucketHistogram(Arrays.asList(1d * TieredStoreUtil.KB, 10d * TieredStoreUtil.KB, 100d * TieredStoreUtil.KB, 1d * TieredStoreUtil.MB, 10d * TieredStoreUtil.MB, 32d * TieredStoreUtil.MB, 50d * TieredStoreUtil.MB, 100d * TieredStoreUtil.MB))) - .setDescription("tiered_store_buffer_size_view") - .build(); + .setDescription("tiered_store_buffer_size_view"); - res.add(new Pair<>(rpcLatencySelector, rpcLatencyView)); - res.add(new Pair<>(providerRpcLatencySelector, rpcLatencyView)); - res.add(new Pair<>(uploadBufferSizeSelector, bufferSizeView)); - res.add(new Pair<>(downloadBufferSizeSelector, bufferSizeView)); + res.add(new Pair<>(rpcLatencySelector, rpcLatencyViewBuilder)); + res.add(new Pair<>(providerRpcLatencySelector, rpcLatencyViewBuilder)); + res.add(new Pair<>(uploadBufferSizeSelector, bufferSizeViewBuilder)); + res.add(new Pair<>(downloadBufferSizeSelector, bufferSizeViewBuilder)); return res; } -- 2.32.0.windows.2 From a4bcc2a74d8bec9c9d34565536e87df06e0b11c1 Mon Sep 17 00:00:00 2001 From: Ziyi Tan <tanziyi0925@gmail.com> Date: Thu, 17 Aug 2023 13:53:48 +0800 Subject: [PATCH 2/7] [ISSUE #7178] refresh metadata after broker startup Signed-off-by: Ziy1-Tan <ajb459684460@gmail.com> --- .../rocketmq/broker/BrokerController.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 30b1d2299..13f9d002b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -663,7 +663,7 @@ public class BrokerController { BrokerController.this.getSlaveSynchronize().syncAll(); lastSyncTimeMs = System.currentTimeMillis(); } - + //timer checkpoint, latency-sensitive, so sync it more frequently if (messageStoreConfig.isTimerWheelEnable()) { BrokerController.this.getSlaveSynchronize().syncTimerCheckPoint(); @@ -698,17 +698,6 @@ public class BrokerController { initializeBrokerScheduledTasks(); - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - BrokerController.this.brokerOuterAPI.refreshMetadata(); - } catch (Exception e) { - LOG.error("ScheduledTask refresh metadata exception", e); - } - } - }, 10, 5, TimeUnit.SECONDS); - if (this.brokerConfig.getNamesrvAddr() != null) { this.updateNamesrvAddr(); LOG.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr()); @@ -1682,6 +1671,17 @@ public class BrokerController { if (brokerConfig.isSkipPreOnline()) { startServiceWithoutCondition(); } + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + BrokerController.this.brokerOuterAPI.refreshMetadata(); + } catch (Exception e) { + LOG.error("ScheduledTask refresh metadata exception", e); + } + } + }, 10, 5, TimeUnit.SECONDS); } protected void scheduleSendHeartbeat() { -- 2.32.0.windows.2 From 3df1b9232af99944cb3d4d4d2d00c5a85cd3b57d Mon Sep 17 00:00:00 2001 From: guyinyou <36399867+guyinyou@users.noreply.github.com> Date: Thu, 17 Aug 2023 13:59:04 +0800 Subject: [PATCH 3/7] [ISSUE #7201] Remove the DefaultMessageStore.class dependency in TransientStorePool Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com> --- .../rocketmq/store/AllocateMappedFileService.java | 6 +++--- .../apache/rocketmq/store/DefaultMessageStore.java | 7 +++++-- .../apache/rocketmq/store/TransientStorePool.java | 13 ++++--------- 3 files changed, 12 insertions(+), 14 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java index dca7d5325..c8420fea1 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java +++ b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java @@ -55,7 +55,7 @@ public class AllocateMappedFileService extends ServiceThread { if (this.messageStore.isTransientStorePoolEnable()) { if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool() && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool - canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size(); + canSubmitRequests = this.messageStore.remainTransientStoreBufferNumbs() - this.requestQueue.size(); } } @@ -65,7 +65,7 @@ public class AllocateMappedFileService extends ServiceThread { if (nextPutOK) { if (canSubmitRequests <= 0) { log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " + - "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums()); + "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.remainTransientStoreBufferNumbs()); this.requestTable.remove(nextFilePath); return null; } @@ -81,7 +81,7 @@ public class AllocateMappedFileService extends ServiceThread { if (nextNextPutOK) { if (canSubmitRequests <= 0) { log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " + - "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums()); + "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.remainTransientStoreBufferNumbs()); this.requestTable.remove(nextNextFilePath); } else { boolean offerOK = this.requestQueue.offer(nextNextReq); diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 6115ead59..f2a54ddf6 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -250,7 +250,7 @@ public class DefaultMessageStore implements MessageStore { this.reputMessageService = new ConcurrentReputMessageService(); } - this.transientStorePool = new TransientStorePool(this); + this.transientStorePool = new TransientStorePool(messageStoreConfig.getTransientStorePoolSize(), messageStoreConfig.getMappedFileSizeCommitLog()); this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread", getBrokerIdentity())); @@ -1983,7 +1983,10 @@ public class DefaultMessageStore implements MessageStore { } public int remainTransientStoreBufferNumbs() { - return this.transientStorePool.availableBufferNums(); + if (this.isTransientStorePoolEnable()) { + return this.transientStorePool.availableBufferNums(); + } + return Integer.MAX_VALUE; } @Override diff --git a/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java b/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java index 8c1a5338b..0d42ee69e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java +++ b/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java @@ -33,13 +33,11 @@ public class TransientStorePool { private final int poolSize; private final int fileSize; private final Deque<ByteBuffer> availableBuffers; - private final DefaultMessageStore messageStore; private volatile boolean isRealCommit = true; - public TransientStorePool(final DefaultMessageStore messageStore) { - this.messageStore = messageStore; - this.poolSize = messageStore.getMessageStoreConfig().getTransientStorePoolSize(); - this.fileSize = messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(); + public TransientStorePool(final int poolSize, final int fileSize) { + this.poolSize = poolSize; + this.fileSize = fileSize; this.availableBuffers = new ConcurrentLinkedDeque<>(); } @@ -81,10 +79,7 @@ public class TransientStorePool { } public int availableBufferNums() { - if (messageStore.isTransientStorePoolEnable()) { - return availableBuffers.size(); - } - return Integer.MAX_VALUE; + return availableBuffers.size(); } public boolean isRealCommit() { -- 2.32.0.windows.2 From 2b93e1e32fd458d9df2091e89ea259ddd4d54061 Mon Sep 17 00:00:00 2001 From: iamgd67 <iamgd67@sina.com> Date: Thu, 17 Aug 2023 15:31:14 +0800 Subject: [PATCH 4/7] Update mqbroker to use runbroker.sh instead of runserver.sh when use --enable-proxy (#7150) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Update mqbroker to use runbroker.sh instead of runserver.sh when enabling `--enable-proxy` this allow JVM `heap` and `gc` configuration using broker's settings instead of other common serverices'(proxy,namenode, etc). our main purpose, like the filename `mqbroker` suggest, is to start broker (which embeds a proxy), so use broker's config is reasonable chinese version mqbroker的--enable-proxy选项是启动内嵌了proxy的broker,而不是内嵌broker的proxy,而且broker的工作量和重要程度大于proxy,所以使用broker的gc和heap配置更合适 --- distribution/bin/mqbroker | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distribution/bin/mqbroker b/distribution/bin/mqbroker index 3758ed597..35eb93c44 100644 --- a/distribution/bin/mqbroker +++ b/distribution/bin/mqbroker @@ -68,11 +68,11 @@ if [ "$enable_proxy" = true ]; then if [ "$broker_config" != "" ]; then args_for_proxy=${args_for_proxy}" -bc "${broker_config} fi - sh ${ROCKETMQ_HOME}/bin/runserver.sh -Drmq.logback.configurationFile=$ROCKETMQ_HOME/conf/rmq.proxy.logback.xml org.apache.rocketmq.proxy.ProxyStartup ${args_for_proxy} + sh ${ROCKETMQ_HOME}/bin/runbroker.sh -Drmq.logback.configurationFile=$ROCKETMQ_HOME/conf/rmq.proxy.logback.xml org.apache.rocketmq.proxy.ProxyStartup ${args_for_proxy} else args_for_broker=$other_args if [ "$broker_config" != "" ]; then args_for_broker=${args_for_broker}" -c "${broker_config} fi sh ${ROCKETMQ_HOME}/bin/runbroker.sh -Drmq.logback.configurationFile=$ROCKETMQ_HOME/conf/rmq.broker.logback.xml org.apache.rocketmq.broker.BrokerStartup ${args_for_broker} -fi \ No newline at end of file +fi -- 2.32.0.windows.2 From 05e7cde610255ed9410fffb0f153efe7c2c8a326 Mon Sep 17 00:00:00 2001 From: yao-wenbin <ywb992134@163.com> Date: Fri, 18 Aug 2023 09:49:59 +0800 Subject: [PATCH 5/7] [ISSUE #7042] maven-compile job failed, Because TlsTest's serverRejectsSSLClient test case will throw TooLongFrameException (#7179) --- .../remoting/netty/NettyRemotingServer.java | 2 +- .../java/org/apache/rocketmq/remoting/TlsTest.java | 14 ++++++++++++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java index 90e358ce3..17f138f86 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java @@ -502,7 +502,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti case DISABLED: ctx.close(); log.warn("Clients intend to establish an SSL connection while this server is running in SSL disabled mode"); - break; + throw new UnsupportedOperationException("The NettyRemotingServer in SSL disabled mode doesn't support ssl client"); case PERMISSIVE: case ENFORCING: if (null != sslContext) { diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java index de7edbbfb..a4890d73d 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java @@ -144,8 +144,13 @@ public class TlsTest { tlsClientKeyPath = ""; tlsClientCertPath = ""; clientConfig.setUseTLS(false); - } else if ("serverRejectsSSLClient".equals(name.getMethodName())) { + } else if ("disabledServerRejectsSSLClient".equals(name.getMethodName())) { tlsMode = TlsMode.DISABLED; + } else if ("disabledServerAcceptUnAuthClient".equals(name.getMethodName())) { + tlsMode = TlsMode.DISABLED; + tlsClientKeyPath = ""; + tlsClientCertPath = ""; + clientConfig.setUseTLS(false); } else if ("reloadSslContextForServer".equals(name.getMethodName())) { tlsClientAuthServer = false; tlsServerNeedClientAuth = "none"; @@ -211,7 +216,7 @@ public class TlsTest { } @Test - public void serverRejectsSSLClient() throws Exception { + public void disabledServerRejectsSSLClient() throws Exception { try { RemotingCommand response = remotingClient.invokeSync(getServerAddress(), createRequest(), 1000 * 5); failBecauseExceptionWasNotThrown(RemotingSendRequestException.class); @@ -219,6 +224,11 @@ public class TlsTest { } } + @Test + public void disabledServerAcceptUnAuthClient() throws Exception { + requestThenAssertResponse(); + } + /** * Tests that a server configured to require client authentication refuses to accept connections * from a client that has an untrusted certificate. -- 2.32.0.windows.2 From 72d796f2b20b3ec6aebca8c004d9275d7c749a95 Mon Sep 17 00:00:00 2001 From: lk <xdkxlk@outlook.com> Date: Fri, 18 Aug 2023 11:55:39 +0800 Subject: [PATCH 6/7] [ISSUE #7205] support batch ack for pop orderly (#7206) --- .../broker/processor/AckMessageProcessor.java | 99 ++++++----- .../rocketmq/client/impl/MQClientAPIImpl.java | 91 ++++++++-- .../test/client/rmq/RMQPopClient.java | 22 +++ .../client/consumer/pop/BasePopNormally.java | 6 + .../test/client/consumer/pop/BatchAckIT.java | 159 ++++++++++++++++++ 5 files changed, 322 insertions(+), 55 deletions(-) create mode 100644 test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BatchAckIT.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java index 687811409..244b459d6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.processor; import com.alibaba.fastjson.JSON; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; +import java.util.BitSet; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.metrics.PopMetricsManager; import org.apache.rocketmq.common.KeyBuilder; @@ -186,46 +187,7 @@ public class AckMessageProcessor implements NettyRequestProcessor { invisibleTime = ExtraInfoUtil.getInvisibleTime(extraInfo); if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) { - // order - String lockKey = topic + PopAckConstants.SPLIT + consumeGroup + PopAckConstants.SPLIT + qId; - long oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, topic, qId); - if (ackOffset < oldOffset) { - return; - } - while (!this.brokerController.getPopMessageProcessor().getQueueLockManager().tryLock(lockKey)) { - } - try { - oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, topic, qId); - if (ackOffset < oldOffset) { - return; - } - long nextOffset = brokerController.getConsumerOrderInfoManager().commitAndNext( - topic, consumeGroup, - qId, ackOffset, - popTime); - if (nextOffset > -1) { - if (!this.brokerController.getConsumerOffsetManager().hasOffsetReset( - topic, consumeGroup, qId)) { - this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), - consumeGroup, topic, qId, nextOffset); - } - if (!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, topic, - consumeGroup, qId, invisibleTime)) { - this.brokerController.getPopMessageProcessor().notifyMessageArriving( - topic, consumeGroup, qId); - } - } else if (nextOffset == -1) { - String errorInfo = String.format("offset is illegal, key:%s, old:%d, commit:%d, next:%d, %s", - lockKey, oldOffset, ackOffset, nextOffset, channel.remoteAddress()); - POP_LOGGER.warn(errorInfo); - response.setCode(ResponseCode.MESSAGE_ILLEGAL); - response.setRemark(errorInfo); - return; - } - } finally { - this.brokerController.getPopMessageProcessor().getQueueLockManager().unLock(lockKey); - } - brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, ackCount); + ackOrderly(topic, consumeGroup, qId, ackOffset, popTime, invisibleTime, channel, response); return; } @@ -250,17 +212,22 @@ public class AckMessageProcessor implements NettyRequestProcessor { } BatchAckMsg batchAckMsg = new BatchAckMsg(); - for (int i = 0; batchAck.getBitSet() != null && i < batchAck.getBitSet().length(); i++) { - if (!batchAck.getBitSet().get(i)) { - continue; + BitSet bitSet = batchAck.getBitSet(); + for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) { + if (i == Integer.MAX_VALUE) { + break; } long offset = startOffset + i; if (offset < minOffset || offset > maxOffset) { continue; } - batchAckMsg.getAckOffsetList().add(offset); + if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) { + ackOrderly(topic, consumeGroup, qId, offset, popTime, invisibleTime, channel, response); + } else { + batchAckMsg.getAckOffsetList().add(offset); + } } - if (batchAckMsg.getAckOffsetList().isEmpty()) { + if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE || batchAckMsg.getAckOffsetList().isEmpty()) { return; } @@ -311,4 +278,46 @@ public class AckMessageProcessor implements NettyRequestProcessor { PopMetricsManager.incPopReviveAckPutCount(ackMsg, putMessageResult.getPutMessageStatus()); brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, ackCount); } + + protected void ackOrderly(String topic, String consumeGroup, int qId, long ackOffset, long popTime, long invisibleTime, Channel channel, RemotingCommand response) { + String lockKey = topic + PopAckConstants.SPLIT + consumeGroup + PopAckConstants.SPLIT + qId; + long oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, topic, qId); + if (ackOffset < oldOffset) { + return; + } + while (!this.brokerController.getPopMessageProcessor().getQueueLockManager().tryLock(lockKey)) { + } + try { + oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, topic, qId); + if (ackOffset < oldOffset) { + return; + } + long nextOffset = brokerController.getConsumerOrderInfoManager().commitAndNext( + topic, consumeGroup, + qId, ackOffset, + popTime); + if (nextOffset > -1) { + if (!this.brokerController.getConsumerOffsetManager().hasOffsetReset( + topic, consumeGroup, qId)) { + this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), + consumeGroup, topic, qId, nextOffset); + } + if (!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, topic, + consumeGroup, qId, invisibleTime)) { + this.brokerController.getPopMessageProcessor().notifyMessageArriving( + topic, consumeGroup, qId); + } + } else if (nextOffset == -1) { + String errorInfo = String.format("offset is illegal, key:%s, old:%d, commit:%d, next:%d, %s", + lockKey, oldOffset, ackOffset, nextOffset, channel.remoteAddress()); + POP_LOGGER.warn(errorInfo); + response.setCode(ResponseCode.MESSAGE_ILLEGAL); + response.setRemark(errorInfo); + return; + } + } finally { + this.brokerController.getPopMessageProcessor().getQueueLockManager().unLock(lockKey); + } + brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, 1); + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 5101ffc8e..213c26fd6 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -21,6 +21,7 @@ import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.BitSet; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -54,6 +55,7 @@ import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.AclConfig; +import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.Pair; @@ -76,7 +78,8 @@ import org.apache.rocketmq.common.namesrv.NameServerUpdateCallback; import org.apache.rocketmq.common.namesrv.TopAddressing; import org.apache.rocketmq.common.sysflag.PullSysFlag; import org.apache.rocketmq.common.topic.TopicValidator; -import org.apache.rocketmq.common.BoundaryType; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RPCHook; @@ -101,7 +104,10 @@ import org.apache.rocketmq.remoting.protocol.RequestCode; import org.apache.rocketmq.remoting.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats; import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; +import org.apache.rocketmq.remoting.protocol.body.BatchAck; +import org.apache.rocketmq.remoting.protocol.body.BatchAckMessageRequestBody; import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup; +import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo; import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData; import org.apache.rocketmq.remoting.protocol.body.CheckClientRequestBody; import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo; @@ -114,7 +120,6 @@ import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache; import org.apache.rocketmq.remoting.protocol.body.GetConsumerStatusBody; import org.apache.rocketmq.remoting.protocol.body.GroupList; import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo; -import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo; import org.apache.rocketmq.remoting.protocol.body.KVTable; import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody; import org.apache.rocketmq.remoting.protocol.body.LockBatchResponseBody; @@ -196,6 +201,10 @@ import org.apache.rocketmq.remoting.protocol.header.UpdateGlobalWhiteAddrsConfig import org.apache.rocketmq.remoting.protocol.header.UpdateGroupForbiddenRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ViewBrokerStatsDataRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ViewMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader; import org.apache.rocketmq.remoting.protocol.header.namesrv.AddWritePermOfBrokerRequestHeader; import org.apache.rocketmq.remoting.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader; import org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteKVConfigRequestHeader; @@ -207,10 +216,6 @@ import org.apache.rocketmq.remoting.protocol.header.namesrv.GetRouteInfoRequestH import org.apache.rocketmq.remoting.protocol.header.namesrv.PutKVConfigRequestHeader; import org.apache.rocketmq.remoting.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader; import org.apache.rocketmq.remoting.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader; -import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader; -import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader; -import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; -import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader; import org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; @@ -221,8 +226,6 @@ import org.apache.rocketmq.remoting.protocol.subscription.GroupForbidden; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.remoting.rpchook.DynamicalExtFieldRPCHook; import org.apache.rocketmq.remoting.rpchook.StreamTypeRPCHook; -import org.apache.rocketmq.logging.org.slf4j.Logger; -import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import static org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS; @@ -885,9 +888,77 @@ public class MQClientAPIImpl implements NameServerUpdateCallback { final String addr, final long timeOut, final AckCallback ackCallback, - final AckMessageRequestHeader requestHeader // + final AckMessageRequestHeader requestHeader + ) throws RemotingException, MQBrokerException, InterruptedException { + ackMessageAsync(addr, timeOut, ackCallback, requestHeader, null); + } + + public void batchAckMessageAsync( + final String addr, + final long timeOut, + final AckCallback ackCallback, + final String topic, + final String consumerGroup, + final List<String> extraInfoList + ) throws RemotingException, MQBrokerException, InterruptedException { + String brokerName = null; + Map<String, BatchAck> batchAckMap = new HashMap<>(); + for (String extraInfo : extraInfoList) { + String[] extraInfoData = ExtraInfoUtil.split(extraInfo); + if (brokerName == null) { + brokerName = ExtraInfoUtil.getBrokerName(extraInfoData); + } + String mergeKey = ExtraInfoUtil.getRetry(extraInfoData) + "@" + + ExtraInfoUtil.getQueueId(extraInfoData) + "@" + + ExtraInfoUtil.getCkQueueOffset(extraInfoData) + "@" + + ExtraInfoUtil.getPopTime(extraInfoData); + BatchAck bAck = batchAckMap.computeIfAbsent(mergeKey, k -> { + BatchAck newBatchAck = new BatchAck(); + newBatchAck.setConsumerGroup(consumerGroup); + newBatchAck.setTopic(topic); + newBatchAck.setRetry(ExtraInfoUtil.getRetry(extraInfoData)); + newBatchAck.setStartOffset(ExtraInfoUtil.getCkQueueOffset(extraInfoData)); + newBatchAck.setQueueId(ExtraInfoUtil.getQueueId(extraInfoData)); + newBatchAck.setReviveQueueId(ExtraInfoUtil.getReviveQid(extraInfoData)); + newBatchAck.setPopTime(ExtraInfoUtil.getPopTime(extraInfoData)); + newBatchAck.setInvisibleTime(ExtraInfoUtil.getInvisibleTime(extraInfoData)); + newBatchAck.setBitSet(new BitSet()); + return newBatchAck; + }); + bAck.getBitSet().set((int) (ExtraInfoUtil.getQueueOffset(extraInfoData) - ExtraInfoUtil.getCkQueueOffset(extraInfoData))); + } + + BatchAckMessageRequestBody requestBody = new BatchAckMessageRequestBody(); + requestBody.setBrokerName(brokerName); + requestBody.setAcks(new ArrayList<>(batchAckMap.values())); + batchAckMessageAsync(addr, timeOut, ackCallback, requestBody); + } + + public void batchAckMessageAsync( + final String addr, + final long timeOut, + final AckCallback ackCallback, + final BatchAckMessageRequestBody requestBody ) throws RemotingException, MQBrokerException, InterruptedException { - final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader); + ackMessageAsync(addr, timeOut, ackCallback, null, requestBody); + } + + protected void ackMessageAsync( + final String addr, + final long timeOut, + final AckCallback ackCallback, + final AckMessageRequestHeader requestHeader, + final BatchAckMessageRequestBody requestBody + ) throws RemotingException, MQBrokerException, InterruptedException { + RemotingCommand request; + if (requestHeader != null) { + request = RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader); + } else { + request = RemotingCommand.createRequestCommand(RequestCode.BATCH_ACK_MESSAGE, null); + if (requestBody != null) { + request.setBody(requestBody.encode()); + } + } this.remotingClient.invokeAsync(addr, request, timeOut, new BaseInvokeCallback(MQClientAPIImpl.this) { @Override diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java index 496bd6da4..09c60c0b4 100644 --- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java +++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.test.client.rmq; +import java.util.List; import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.consumer.AckCallback; @@ -140,6 +141,27 @@ public class RMQPopClient implements MQConsumer { return future; } + public CompletableFuture<AckResult> batchAckMessageAsync(String brokerAddr, String topic, String consumerGroup, + List<String> extraInfoList) { + CompletableFuture<AckResult> future = new CompletableFuture<>(); + try { + this.mqClientAPI.batchAckMessageAsync(brokerAddr, DEFAULT_TIMEOUT, new AckCallback() { + @Override + public void onSuccess(AckResult ackResult) { + future.complete(ackResult); + } + + @Override + public void onException(Throwable e) { + future.completeExceptionally(e); + } + }, topic, consumerGroup, extraInfoList); + } catch (Throwable t) { + future.completeExceptionally(t); + } + return future; + } + public CompletableFuture<AckResult> changeInvisibleTimeAsync(String brokerAddr, String brokerName, String topic, String consumerGroup, String extraInfo, long invisibleTime) { String[] extraInfoStrs = ExtraInfoUtil.split(extraInfo); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java index 952fbe3f5..2e29b95a5 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java @@ -63,4 +63,10 @@ public class BasePopNormally extends BasePop { brokerAddr, messageQueue, invisibleTime, maxNums, group, timeout, true, ConsumeInitMode.MIN, false, ExpressionType.TAG, "*"); } + + protected CompletableFuture<PopResult> popMessageAsync(long invisibleTime, int maxNums) { + return client.popMessageAsync( + brokerAddr, messageQueue, invisibleTime, maxNums, group, 3000, false, + ConsumeInitMode.MIN, false, ExpressionType.TAG, "*"); + } } diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BatchAckIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BatchAckIT.java new file mode 100644 index 000000000..ec9153ccc --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BatchAckIT.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.consumer.pop; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import org.apache.rocketmq.client.consumer.AckResult; +import org.apache.rocketmq.client.consumer.AckStatus; +import org.apache.rocketmq.client.consumer.PopResult; +import org.apache.rocketmq.client.consumer.PopStatus; +import org.apache.rocketmq.common.attribute.CQType; +import org.apache.rocketmq.common.attribute.TopicMessageType; +import org.apache.rocketmq.common.constant.ConsumeInitMode; +import org.apache.rocketmq.common.filter.ExpressionType; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.test.base.IntegrationTestBase; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.client.rmq.RMQPopClient; +import org.apache.rocketmq.test.util.MQRandomUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.awaitility.Awaitility.await; +import static org.junit.Assert.assertEquals; + +public class BatchAckIT extends BasePop { + + protected String topic; + protected String group; + protected RMQNormalProducer producer = null; + protected RMQPopClient client = null; + protected String brokerAddr; + protected MessageQueue messageQueue; + + @Before + public void setUp() { + brokerAddr = brokerController1.getBrokerAddr(); + topic = MQRandomUtils.getRandomTopic(); + group = initConsumerGroup(); + IntegrationTestBase.initTopic(topic, NAMESRV_ADDR, BROKER1_NAME, 8, CQType.SimpleCQ, TopicMessageType.NORMAL); + producer = getProducer(NAMESRV_ADDR, topic); + client = getRMQPopClient(); + messageQueue = new MessageQueue(topic, BROKER1_NAME, -1); + } + + @After + public void tearDown() { + shutdown(); + } + + @Test + public void testBatchAckNormallyWithPopBuffer() throws Throwable { + brokerController1.getBrokerConfig().setEnablePopBufferMerge(true); + brokerController2.getBrokerConfig().setEnablePopBufferMerge(true); + + testBatchAck(() -> { + try { + return popMessageAsync().get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Test + public void testBatchAckNormallyWithOutPopBuffer() throws Throwable { + brokerController1.getBrokerConfig().setEnablePopBufferMerge(false); + brokerController2.getBrokerConfig().setEnablePopBufferMerge(false); + + testBatchAck(() -> { + try { + return popMessageAsync().get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Test + public void testBatchAckOrderly() throws Throwable { + testBatchAck(() -> { + try { + return popMessageOrderlyAsync().get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + public void testBatchAck(Supplier<PopResult> popResultSupplier) throws Throwable { + // Send 10 messages but do not ack, let them enter the retry topic + producer.send(10); + AtomicInteger firstMsgRcvNum = new AtomicInteger(); + await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> { + PopResult popResult = popResultSupplier.get(); + if (popResult.getPopStatus().equals(PopStatus.FOUND)) { + firstMsgRcvNum.addAndGet(popResult.getMsgFoundList().size()); + } + assertEquals(10, firstMsgRcvNum.get()); + }); + // sleep 6s, expect messages to enter the retry topic + TimeUnit.SECONDS.sleep(6); + + producer.send(20); + List<String> extraInfoList = new ArrayList<>(); + await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> { + PopResult popResult = popResultSupplier.get(); + if (popResult.getPopStatus().equals(PopStatus.FOUND)) { + for (MessageExt messageExt : popResult.getMsgFoundList()) { + extraInfoList.add(messageExt.getProperty(MessageConst.PROPERTY_POP_CK)); + } + } + assertEquals(30, extraInfoList.size()); + }); + + AckResult ackResult = client.batchAckMessageAsync(brokerAddr, topic, group, extraInfoList).get(); + assertEquals(AckStatus.OK, ackResult.getStatus()); + + // sleep 6s, expected that messages that have been acked will not be re-consumed + TimeUnit.SECONDS.sleep(6); + PopResult popResult = popResultSupplier.get(); + assertEquals(PopStatus.POLLING_NOT_FOUND, popResult.getPopStatus()); + } + + private CompletableFuture<PopResult> popMessageAsync() { + return client.popMessageAsync( + brokerAddr, messageQueue, Duration.ofSeconds(3).toMillis(), 30, group, 3000, false, + ConsumeInitMode.MIN, false, ExpressionType.TAG, "*"); + } + + private CompletableFuture<PopResult> popMessageOrderlyAsync() { + return client.popMessageAsync( + brokerAddr, messageQueue, Duration.ofSeconds(3).toMillis(), 30, group, 3000, false, + ConsumeInitMode.MIN, true, ExpressionType.TAG, "*", null); + } +} -- 2.32.0.windows.2 From cc16a1b51216e1e80c22011b8b01e060bb4af8b3 Mon Sep 17 00:00:00 2001 From: rongtong <jinrongtong5@163.com> Date: Tue, 22 Aug 2023 10:42:25 +0800 Subject: [PATCH 7/7] Set table reference the same object for setSubscriptionGroupTable method (#7204) --- .../broker/subscription/SubscriptionGroupManager.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java index 74e39c0fe..e63b93058 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java @@ -341,10 +341,7 @@ public class SubscriptionGroupManager extends ConfigManager { public void setSubscriptionGroupTable(ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable) { - this.subscriptionGroupTable.clear(); - for (String key : subscriptionGroupTable.keySet()) { - putSubscriptionGroupConfig(subscriptionGroupTable.get(key)); - } + this.subscriptionGroupTable = subscriptionGroupTable; } public boolean containsSubscriptionGroup(String group) { -- 2.32.0.windows.2
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.
浙ICP备2022010568号-2