Projects
home:Eustace:branches:Eulaceura:Factory
rocketmq
_service:obs_scm:patch009-backport-Support-KV-S...
Sign Up
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File _service:obs_scm:patch009-backport-Support-KV-Storage.patch of Package rocketmq
From 3a6ef0400c8f3dc420b8781c619e66d47d1c4336 Mon Sep 17 00:00:00 2001 From: fujian-zfj <2573259572@qq.com> Date: Sat, 5 Aug 2023 00:32:11 +0800 Subject: [PATCH 1/4] [ISSUE #7064] [RIP-66-1] Support KV(RocksDB) Storage for Metadata (#7092) * typo int readme[ecosystem] * rocksdb metadata * add unit test * fix testOffsetPersistInMemory * fix unit test * fix unit test * remove unused import * move RocksDBOffsetSerialize to broker moudle * Fix bazel build scripts Signed-off-by: Li Zhanhui <lizhanhui@gmail.com> * Flag QueryMsgByKeyIT as flaky as it fails at frequency: 5 out of 32 Signed-off-by: Li Zhanhui <lizhanhui@gmail.com> * change public to private of some inner method --------- Signed-off-by: Li Zhanhui <lizhanhui@gmail.com> Co-authored-by: Li Zhanhui <lizhanhui@gmail.com> --- WORKSPACE | 1 + broker/BUILD.bazel | 3 + .../rocketmq/broker/BrokerController.java | 41 +- .../broker/offset/ConsumerOffsetManager.java | 20 +- .../offset/RocksDBConsumerOffsetManager.java | 102 +++ .../RocksDBLmqConsumerOffsetManager.java | 103 +++ .../offset/RocksDBOffsetSerializeWrapper.java | 34 + .../schedule/ScheduleMessageService.java | 5 +- .../RocksDBLmqSubscriptionGroupManager.java | 46 ++ .../RocksDBSubscriptionGroupManager.java | 112 ++++ .../SubscriptionGroupManager.java | 64 +- .../topic/RocksDBLmqTopicConfigManager.java | 57 ++ .../topic/RocksDBTopicConfigManager.java | 95 +++ .../broker/topic/TopicConfigManager.java | 110 ++-- .../src/main/resources/rmq.broker.logback.xml | 37 ++ .../RocksDBConsumerOffsetManagerTest.java | 113 ++++ .../processor/AdminBrokerProcessorTest.java | 126 +++- .../ForbiddenTest.java | 3 +- .../SubscriptionGroupManagerTest.java | 25 + .../topic/RocksdbTopicConfigManagerTest.java | 375 +++++++++++ client/BUILD.bazel | 1 + common/BUILD.bazel | 1 + common/pom.xml | 4 + .../apache/rocketmq/common/ConfigManager.java | 22 +- .../common/config/AbstractRocksDBStorage.java | 613 ++++++++++++++++++ .../common/config/ConfigRocksDBStorage.java | 250 +++++++ .../common/config/RocksDBConfigManager.java | 108 +++ .../rocketmq/common/constant/LoggerName.java | 1 + .../rocketmq/example/quickstart/Consumer.java | 3 +- pom.xml | 6 + remoting/BUILD.bazel | 1 + .../org/apache/rocketmq/store/StoreType.java | 32 + .../store/config/MessageStoreConfig.java | 41 ++ test/BUILD.bazel | 3 + tieredstore/BUILD.bazel | 1 + 35 files changed, 2473 insertions(+), 86 deletions(-) create mode 100644 broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java create mode 100644 broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManager.java create mode 100644 broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java create mode 100644 broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBLmqSubscriptionGroupManager.java create mode 100644 broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java create mode 100644 broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBLmqTopicConfigManager.java create mode 100644 broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java create mode 100644 broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManagerTest.java rename broker/src/test/java/org/apache/rocketmq/broker/{substription => subscription}/ForbiddenTest.java (95%) create mode 100644 broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java create mode 100644 common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java create mode 100644 common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java create mode 100644 common/src/main/java/org/apache/rocketmq/common/config/RocksDBConfigManager.java create mode 100644 store/src/main/java/org/apache/rocketmq/store/StoreType.java diff --git a/WORKSPACE b/WORKSPACE index e3a8f37dc..a8a0aafe9 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -105,6 +105,7 @@ maven_install( "com.fasterxml.jackson.core:jackson-databind:2.13.4.2", "com.adobe.testing:s3mock-junit4:2.11.0", "io.github.aliyunmq:rocketmq-grpc-netty-codec-haproxy:1.0.0", + "io.github.aliyunmq:rocketmq-rocksdb:1.0.3", ], fetch_sources = True, repositories = [ diff --git a/broker/BUILD.bazel b/broker/BUILD.bazel index d0d3a2f96..6adcdc7b9 100644 --- a/broker/BUILD.bazel +++ b/broker/BUILD.bazel @@ -53,6 +53,8 @@ java_library( "@maven//:io_github_aliyunmq_rocketmq_logback_classic", "@maven//:org_slf4j_jul_to_slf4j", "@maven//:io_github_aliyunmq_rocketmq_shaded_slf4j_api_bridge", + "@maven//:io_github_aliyunmq_rocketmq_rocksdb", + "@maven//:net_java_dev_jna_jna", ], ) @@ -81,6 +83,7 @@ java_library( "@maven//:org_apache_commons_commons_lang3", "@maven//:io_github_aliyunmq_rocketmq_slf4j_api", "@maven//:org_powermock_powermock_core", + "@maven//:io_opentelemetry_opentelemetry_api", ], ) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 972457194..30b1d2299 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -45,6 +45,8 @@ import org.apache.rocketmq.broker.offset.BroadcastOffsetManager; import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager; import org.apache.rocketmq.broker.offset.LmqConsumerOffsetManager; +import org.apache.rocketmq.broker.offset.RocksDBConsumerOffsetManager; +import org.apache.rocketmq.broker.offset.RocksDBLmqConsumerOffsetManager; import org.apache.rocketmq.broker.out.BrokerOuterAPI; import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin; import org.apache.rocketmq.broker.processor.AckMessageProcessor; @@ -66,8 +68,12 @@ import org.apache.rocketmq.broker.processor.SendMessageProcessor; import org.apache.rocketmq.broker.schedule.ScheduleMessageService; import org.apache.rocketmq.broker.slave.SlaveSynchronize; import org.apache.rocketmq.broker.subscription.LmqSubscriptionGroupManager; +import org.apache.rocketmq.broker.subscription.RocksDBLmqSubscriptionGroupManager; +import org.apache.rocketmq.broker.subscription.RocksDBSubscriptionGroupManager; import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; import org.apache.rocketmq.broker.topic.LmqTopicConfigManager; +import org.apache.rocketmq.broker.topic.RocksDBLmqTopicConfigManager; +import org.apache.rocketmq.broker.topic.RocksDBTopicConfigManager; import org.apache.rocketmq.broker.topic.TopicConfigManager; import org.apache.rocketmq.broker.topic.TopicQueueMappingCleanService; import org.apache.rocketmq.broker.topic.TopicQueueMappingManager; @@ -120,6 +126,7 @@ import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.MessageArrivingListener; import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.StoreType; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.dledger.DLedgerCommitLog; @@ -301,9 +308,16 @@ public class BrokerController { this.messageStoreConfig = messageStoreConfig; this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), getListenPort())); this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new LmqBrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat()) : new BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat()); - this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new LmqConsumerOffsetManager(this) : new ConsumerOffsetManager(this); this.broadcastOffsetManager = new BroadcastOffsetManager(this); - this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new LmqTopicConfigManager(this) : new TopicConfigManager(this); + if (isEnableRocksDBStore()) { + this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqTopicConfigManager(this) : new RocksDBTopicConfigManager(this); + this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqSubscriptionGroupManager(this) : new RocksDBSubscriptionGroupManager(this); + this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqConsumerOffsetManager(this) : new RocksDBConsumerOffsetManager(this); + } else { + this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new LmqTopicConfigManager(this) : new TopicConfigManager(this); + this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new LmqSubscriptionGroupManager(this) : new SubscriptionGroupManager(this); + this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new LmqConsumerOffsetManager(this) : new ConsumerOffsetManager(this); + } this.topicQueueMappingManager = new TopicQueueMappingManager(this); this.pullMessageProcessor = new PullMessageProcessor(this); this.peekMessageProcessor = new PeekMessageProcessor(this); @@ -324,7 +338,6 @@ public class BrokerController { this.popInflightMessageCounter = new PopInflightMessageCounter(this); this.clientHousekeepingService = new ClientHousekeepingService(this); this.broker2Client = new Broker2Client(this); - this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new LmqSubscriptionGroupManager(this) : new SubscriptionGroupManager(this); this.scheduleMessageService = new ScheduleMessageService(this); this.coldDataPullRequestHoldService = new ColdDataPullRequestHoldService(this); this.coldDataCgCtrService = new ColdDataCgCtrService(this); @@ -1383,8 +1396,6 @@ public class BrokerController { this.adminBrokerExecutor.shutdown(); } - this.consumerOffsetManager.persist(); - if (this.brokerFastFailure != null) { this.brokerFastFailure.shutdown(); } @@ -1449,8 +1460,20 @@ public class BrokerController { shutdownScheduledExecutorService(this.syncBrokerMemberGroupExecutorService); shutdownScheduledExecutorService(this.brokerHeartbeatExecutorService); - this.topicConfigManager.persist(); - this.subscriptionGroupManager.persist(); + if (this.topicConfigManager != null) { + this.topicConfigManager.persist(); + this.topicConfigManager.stop(); + } + + if (this.subscriptionGroupManager != null) { + this.subscriptionGroupManager.persist(); + this.subscriptionGroupManager.stop(); + } + + if (this.consumerOffsetManager != null) { + this.consumerOffsetManager.persist(); + this.consumerOffsetManager.stop(); + } for (BrokerAttachedPlugin brokerAttachedPlugin : brokerAttachedPlugins) { if (brokerAttachedPlugin != null) { @@ -2375,4 +2398,8 @@ public class BrokerController { public void setColdDataCgCtrService(ColdDataCgCtrService coldDataCgCtrService) { this.coldDataCgCtrService = coldDataCgCtrService; } + + public boolean isEnableRocksDBStore() { + return StoreType.DEFAULT_ROCKSDB.getStoreType().equalsIgnoreCase(this.messageStoreConfig.getStoreType()); + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java index 8bf4e9a59..21f20dde3 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.broker.offset; -import com.google.common.base.Strings; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -26,6 +25,9 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.base.Strings; + import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.common.ConfigManager; @@ -37,12 +39,12 @@ import org.apache.rocketmq.remoting.protocol.DataVersion; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; public class ConsumerOffsetManager extends ConfigManager { - private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + protected static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); public static final String TOPIC_GROUP_SEPARATOR = "@"; private DataVersion dataVersion = new DataVersion(); - private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable = + protected ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable = new ConcurrentHashMap<>(512); private final ConcurrentMap<String, ConcurrentMap<Integer, Long>> resetOffsetTable = @@ -62,6 +64,10 @@ public class ConsumerOffsetManager extends ConfigManager { this.brokerController = brokerController; } + protected void removeConsumerOffset(String topicAtGroup) { + + } + public void cleanOffset(String group) { Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator(); while (it.hasNext()) { @@ -71,6 +77,7 @@ public class ConsumerOffsetManager extends ConfigManager { String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); if (arrays.length == 2 && group.equals(arrays[1])) { it.remove(); + removeConsumerOffset(topicAtGroup); LOG.warn("Clean group's offset, {}, {}", topicAtGroup, next.getValue()); } } @@ -86,6 +93,7 @@ public class ConsumerOffsetManager extends ConfigManager { String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); if (arrays.length == 2 && topic.equals(arrays[0])) { it.remove(); + removeConsumerOffset(topicAtGroup); LOG.warn("Clean topic's offset, {}, {}", topicAtGroup, next.getValue()); } } @@ -105,6 +113,7 @@ public class ConsumerOffsetManager extends ConfigManager { if (null == brokerController.getConsumerManager().findSubscriptionData(group, topic) && this.offsetBehindMuchThanData(topic, next.getValue())) { it.remove(); + removeConsumerOffset(topicAtGroup); LOG.warn("remove topic offset, {}", topicAtGroup); } } @@ -313,8 +322,10 @@ public class ConsumerOffsetManager extends ConfigManager { for (String group : filterGroups.split(",")) { Iterator<String> it = topicGroups.iterator(); while (it.hasNext()) { - if (group.equals(it.next().split(TOPIC_GROUP_SEPARATOR)[1])) { + String topicAtGroup = it.next(); + if (group.equals(topicAtGroup.split(TOPIC_GROUP_SEPARATOR)[1])) { it.remove(); + removeConsumerOffset(topicAtGroup); } } } @@ -371,6 +382,7 @@ public class ConsumerOffsetManager extends ConfigManager { String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); if (arrays.length == 2 && group.equals(arrays[1])) { it.remove(); + removeConsumerOffset(topicAtGroup); LOG.warn("clean group offset {}", topicAtGroup); } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java new file mode 100644 index 000000000..5695a3356 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.offset; + +import java.io.File; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentMap; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.config.RocksDBConfigManager; +import org.apache.rocketmq.common.utils.DataConverter; +import org.rocksdb.WriteBatch; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; + +public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager { + + public RocksDBConsumerOffsetManager(BrokerController brokerController) { + super(brokerController); + this.rocksDBConfigManager = new RocksDBConfigManager(this.brokerController.getMessageStoreConfig().getMemTableFlushInterval()); + } + + @Override + public boolean load() { + return this.rocksDBConfigManager.load(configFilePath(), this::decode0); + } + + @Override + public boolean stop() { + return this.rocksDBConfigManager.stop(); + } + + @Override + protected void removeConsumerOffset(String topicAtGroup) { + try { + byte[] keyBytes = topicAtGroup.getBytes(DataConverter.charset); + this.rocksDBConfigManager.delete(keyBytes); + } catch (Exception e) { + LOG.error("kv remove consumerOffset Failed, {}", topicAtGroup); + } + } + + @Override + protected void decode0(final byte[] key, final byte[] body) { + String topicAtGroup = new String(key, DataConverter.charset); + RocksDBOffsetSerializeWrapper wrapper = JSON.parseObject(body, RocksDBOffsetSerializeWrapper.class); + + this.offsetTable.put(topicAtGroup, wrapper.getOffsetTable()); + LOG.info("load exist local offset, {}, {}", topicAtGroup, wrapper.getOffsetTable()); + } + + @Override + public String configFilePath() { + return this.brokerController.getMessageStoreConfig().getStorePathRootDir() + File.separator + "config" + File.separator + "consumerOffsets" + File.separator; + } + + @Override + public synchronized void persist() { + WriteBatch writeBatch = new WriteBatch(); + try { + Iterator<Entry<String, ConcurrentMap<Integer, Long>>> iterator = this.offsetTable.entrySet().iterator(); + while (iterator.hasNext()) { + Entry<String, ConcurrentMap<Integer, Long>> entry = iterator.next(); + putWriteBatch(writeBatch, entry.getKey(), entry.getValue()); + + if (writeBatch.getDataSize() >= 4 * 1024) { + this.rocksDBConfigManager.batchPutWithWal(writeBatch); + } + } + this.rocksDBConfigManager.batchPutWithWal(writeBatch); + this.rocksDBConfigManager.flushWAL(); + } catch (Exception e) { + LOG.error("consumer offset persist Failed", e); + } finally { + writeBatch.close(); + } + } + + private void putWriteBatch(final WriteBatch writeBatch, final String topicGroupName, final ConcurrentMap<Integer, Long> offsetMap) throws Exception { + byte[] keyBytes = topicGroupName.getBytes(DataConverter.charset); + RocksDBOffsetSerializeWrapper wrapper = new RocksDBOffsetSerializeWrapper(); + wrapper.setOffsetTable(offsetMap); + byte[] valueBytes = JSON.toJSONBytes(wrapper, SerializerFeature.BrowserCompatible); + writeBatch.put(keyBytes, valueBytes); + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManager.java new file mode 100644 index 000000000..d0faa6614 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManager.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.offset; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +public class RocksDBLmqConsumerOffsetManager extends RocksDBConsumerOffsetManager { + private ConcurrentHashMap<String, Long> lmqOffsetTable = new ConcurrentHashMap<>(512); + + public RocksDBLmqConsumerOffsetManager(BrokerController brokerController) { + super(brokerController); + } + + @Override + public long queryOffset(final String group, final String topic, final int queueId) { + if (!MixAll.isLmq(group)) { + return super.queryOffset(group, topic, queueId); + } + // topic@group + String key = topic + TOPIC_GROUP_SEPARATOR + group; + Long offset = lmqOffsetTable.get(key); + if (offset != null) { + return offset; + } + return -1; + } + + @Override + public Map<Integer, Long> queryOffset(final String group, final String topic) { + if (!MixAll.isLmq(group)) { + return super.queryOffset(group, topic); + } + Map<Integer, Long> map = new HashMap<>(); + // topic@group + String key = topic + TOPIC_GROUP_SEPARATOR + group; + Long offset = lmqOffsetTable.get(key); + if (offset != null) { + map.put(0, offset); + } + return map; + } + + @Override + public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, + final long offset) { + if (!MixAll.isLmq(group)) { + super.commitOffset(clientHost, group, topic, queueId, offset); + return; + } + // topic@group + String key = topic + TOPIC_GROUP_SEPARATOR + group; + lmqOffsetTable.put(key, offset); + } + + @Override + public String encode() { + return this.encode(false); + } + + @Override + public void decode(String jsonString) { + if (jsonString != null) { + RocksDBLmqConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, RocksDBLmqConsumerOffsetManager.class); + if (obj != null) { + super.setOffsetTable(obj.getOffsetTable()); + this.lmqOffsetTable = obj.lmqOffsetTable; + } + } + } + + @Override + public String encode(final boolean prettyFormat) { + return RemotingSerializable.toJson(this, prettyFormat); + } + + public ConcurrentHashMap<String, Long> getLmqOffsetTable() { + return lmqOffsetTable; + } + + public void setLmqOffsetTable(ConcurrentHashMap<String, Long> lmqOffsetTable) { + this.lmqOffsetTable = lmqOffsetTable; + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java new file mode 100644 index 000000000..7a90fd62f --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.offset; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +public class RocksDBOffsetSerializeWrapper extends RemotingSerializable { + private ConcurrentMap<Integer, Long> offsetTable = new ConcurrentHashMap(16); + + public ConcurrentMap<Integer, Long> getOffsetTable() { + return offsetTable; + } + + public void setOffsetTable(ConcurrentMap<Integer, Long> offsetTable) { + this.offsetTable = offsetTable; + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java index 26f09dcd0..aed0ee19f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java @@ -92,7 +92,7 @@ public class ScheduleMessageService extends ConfigManager { this.brokerController = brokerController; this.enableAsyncDeliver = brokerController.getMessageStoreConfig().isEnableScheduleAsyncDeliver(); scheduledPersistService = new ScheduledThreadPoolExecutor(1, - new ThreadFactoryImpl("ScheduleMessageServicePersistThread", true, brokerController.getBrokerConfig())); + new ThreadFactoryImpl("ScheduleMessageServicePersistThread", true, brokerController.getBrokerConfig())); } public static int queueId2DelayLevel(final int queueId) { @@ -169,7 +169,7 @@ public class ScheduleMessageService extends ConfigManager { ThreadUtils.shutdown(scheduledPersistService); } - public void stop() { + public boolean stop() { if (this.started.compareAndSet(true, false) && null != this.deliverExecutorService) { this.deliverExecutorService.shutdown(); try { @@ -193,6 +193,7 @@ public class ScheduleMessageService extends ConfigManager { this.persist(); } + return true; } public boolean isStarted() { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBLmqSubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBLmqSubscriptionGroupManager.java new file mode 100644 index 000000000..8c05d0bd9 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBLmqSubscriptionGroupManager.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.subscription; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; + +public class RocksDBLmqSubscriptionGroupManager extends RocksDBSubscriptionGroupManager { + + public RocksDBLmqSubscriptionGroupManager(BrokerController brokerController) { + super(brokerController); + } + + @Override + public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) { + if (MixAll.isLmq(group)) { + SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); + subscriptionGroupConfig.setGroupName(group); + return subscriptionGroupConfig; + } + return super.findSubscriptionGroupConfig(group); + } + + @Override + public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) { + if (config == null || MixAll.isLmq(config.getGroupName())) { + return; + } + super.updateSubscriptionGroupConfig(config); + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java new file mode 100644 index 000000000..6503970af --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.subscription; + +import java.io.File; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.config.RocksDBConfigManager; +import org.apache.rocketmq.common.utils.DataConverter; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; + +public class RocksDBSubscriptionGroupManager extends SubscriptionGroupManager { + + public RocksDBSubscriptionGroupManager(BrokerController brokerController) { + super(brokerController, false); + this.rocksDBConfigManager = new RocksDBConfigManager(this.brokerController.getMessageStoreConfig().getMemTableFlushInterval()); + } + + @Override + public boolean load() { + if (!this.rocksDBConfigManager.load(configFilePath(), this::decode0)) { + return false; + } + this.init(); + return true; + } + + @Override + public boolean stop() { + return this.rocksDBConfigManager.stop(); + } + + @Override + protected SubscriptionGroupConfig putSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig) { + String groupName = subscriptionGroupConfig.getGroupName(); + SubscriptionGroupConfig oldConfig = this.subscriptionGroupTable.put(groupName, subscriptionGroupConfig); + + try { + byte[] keyBytes = groupName.getBytes(DataConverter.charset); + byte[] valueBytes = JSON.toJSONBytes(subscriptionGroupConfig, SerializerFeature.BrowserCompatible); + this.rocksDBConfigManager.put(keyBytes, keyBytes.length, valueBytes); + } catch (Exception e) { + log.error("kv put sub Failed, {}", subscriptionGroupConfig.toString()); + } + return oldConfig; + } + + @Override + protected SubscriptionGroupConfig putSubscriptionGroupConfigIfAbsent(SubscriptionGroupConfig subscriptionGroupConfig) { + String groupName = subscriptionGroupConfig.getGroupName(); + SubscriptionGroupConfig oldConfig = this.subscriptionGroupTable.putIfAbsent(groupName, subscriptionGroupConfig); + if (oldConfig == null) { + try { + byte[] keyBytes = groupName.getBytes(DataConverter.charset); + byte[] valueBytes = JSON.toJSONBytes(subscriptionGroupConfig, SerializerFeature.BrowserCompatible); + this.rocksDBConfigManager.put(keyBytes, keyBytes.length, valueBytes); + } catch (Exception e) { + log.error("kv put sub Failed, {}", subscriptionGroupConfig.toString()); + } + } + return oldConfig; + } + + @Override + protected SubscriptionGroupConfig removeSubscriptionGroupConfig(String groupName) { + SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.remove(groupName); + try { + this.rocksDBConfigManager.delete(groupName.getBytes(DataConverter.charset)); + } catch (Exception e) { + log.error("kv delete sub Failed, {}", subscriptionGroupConfig.toString()); + } + return subscriptionGroupConfig; + } + + @Override + protected void decode0(byte[] key, byte[] body) { + String groupName = new String(key, DataConverter.charset); + SubscriptionGroupConfig subscriptionGroupConfig = JSON.parseObject(body, SubscriptionGroupConfig.class); + + this.subscriptionGroupTable.put(groupName, subscriptionGroupConfig); + log.info("load exist local sub, {}", subscriptionGroupConfig.toString()); + } + + @Override + public synchronized void persist() { + if (brokerController.getMessageStoreConfig().isRealTimePersistRocksDBConfig()) { + this.rocksDBConfigManager.flushWAL(); + } + } + + @Override + public String configFilePath() { + return this.brokerController.getMessageStoreConfig().getStorePathRootDir() + File.separator + "config" + File.separator + "subscriptionGroups" + File.separator; + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java index 0ae11313f..74e39c0fe 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java @@ -40,81 +40,103 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; public class SubscriptionGroupManager extends ConfigManager { - private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + protected static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - private ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable = + protected ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable = new ConcurrentHashMap<>(1024); private ConcurrentMap<String, ConcurrentMap<String, Integer>> forbiddenTable = new ConcurrentHashMap<>(4); private final DataVersion dataVersion = new DataVersion(); - private transient BrokerController brokerController; + protected transient BrokerController brokerController; public SubscriptionGroupManager() { this.init(); } public SubscriptionGroupManager(BrokerController brokerController) { + this(brokerController, true); + } + + public SubscriptionGroupManager(BrokerController brokerController, boolean init) { this.brokerController = brokerController; - this.init(); + if (init) { + init(); + } } - private void init() { + protected void init() { { SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); subscriptionGroupConfig.setGroupName(MixAll.TOOLS_CONSUMER_GROUP); - this.subscriptionGroupTable.put(MixAll.TOOLS_CONSUMER_GROUP, subscriptionGroupConfig); + putSubscriptionGroupConfig(subscriptionGroupConfig); } { SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); subscriptionGroupConfig.setGroupName(MixAll.FILTERSRV_CONSUMER_GROUP); - this.subscriptionGroupTable.put(MixAll.FILTERSRV_CONSUMER_GROUP, subscriptionGroupConfig); + putSubscriptionGroupConfig(subscriptionGroupConfig); } { SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); subscriptionGroupConfig.setGroupName(MixAll.SELF_TEST_CONSUMER_GROUP); - this.subscriptionGroupTable.put(MixAll.SELF_TEST_CONSUMER_GROUP, subscriptionGroupConfig); + putSubscriptionGroupConfig(subscriptionGroupConfig); } { SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); subscriptionGroupConfig.setGroupName(MixAll.ONS_HTTP_PROXY_GROUP); subscriptionGroupConfig.setConsumeBroadcastEnable(true); - this.subscriptionGroupTable.put(MixAll.ONS_HTTP_PROXY_GROUP, subscriptionGroupConfig); + putSubscriptionGroupConfig(subscriptionGroupConfig); } { SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PULL_GROUP); subscriptionGroupConfig.setConsumeBroadcastEnable(true); - this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PULL_GROUP, subscriptionGroupConfig); + putSubscriptionGroupConfig(subscriptionGroupConfig); } { SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PERMISSION_GROUP); subscriptionGroupConfig.setConsumeBroadcastEnable(true); - this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PERMISSION_GROUP, subscriptionGroupConfig); + putSubscriptionGroupConfig(subscriptionGroupConfig); } { SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_OWNER_GROUP); subscriptionGroupConfig.setConsumeBroadcastEnable(true); - this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_OWNER_GROUP, subscriptionGroupConfig); + putSubscriptionGroupConfig(subscriptionGroupConfig); } { SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); subscriptionGroupConfig.setGroupName(MixAll.CID_SYS_RMQ_TRANS); subscriptionGroupConfig.setConsumeBroadcastEnable(true); - this.subscriptionGroupTable.put(MixAll.CID_SYS_RMQ_TRANS, subscriptionGroupConfig); + putSubscriptionGroupConfig(subscriptionGroupConfig); } } + protected SubscriptionGroupConfig putSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig) { + return this.subscriptionGroupTable.put(subscriptionGroupConfig.getGroupName(), subscriptionGroupConfig); + } + + protected SubscriptionGroupConfig putSubscriptionGroupConfigIfAbsent(SubscriptionGroupConfig subscriptionGroupConfig) { + return this.subscriptionGroupTable.putIfAbsent(subscriptionGroupConfig.getGroupName(), subscriptionGroupConfig); + } + + protected SubscriptionGroupConfig getSubscriptionGroupConfig(String groupName) { + return this.subscriptionGroupTable.get(groupName); + } + + protected SubscriptionGroupConfig removeSubscriptionGroupConfig(String groupName) { + return this.subscriptionGroupTable.remove(groupName); + } + public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) { Map<String, String> newAttributes = request(config); Map<String, String> currentAttributes = current(config.getGroupName()); @@ -127,7 +149,7 @@ public class SubscriptionGroupManager extends ConfigManager { config.setAttributes(finalAttributes); - SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config); + SubscriptionGroupConfig old = putSubscriptionGroupConfig(config); if (old != null) { log.info("update subscription group config, old: {} new: {}", old, config); } else { @@ -218,7 +240,7 @@ public class SubscriptionGroupManager extends ConfigManager { } public void disableConsume(final String groupName) { - SubscriptionGroupConfig old = this.subscriptionGroupTable.get(groupName); + SubscriptionGroupConfig old = getSubscriptionGroupConfig(groupName); if (old != null) { old.setConsumeEnable(false); long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; @@ -227,7 +249,7 @@ public class SubscriptionGroupManager extends ConfigManager { } public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) { - SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group); + SubscriptionGroupConfig subscriptionGroupConfig = getSubscriptionGroupConfig(group); if (null == subscriptionGroupConfig) { if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) { if (group.length() > Validators.CHARACTER_MAX_LENGTH || TopicValidator.isTopicOrGroupIllegal(group)) { @@ -235,7 +257,7 @@ public class SubscriptionGroupManager extends ConfigManager { } subscriptionGroupConfig = new SubscriptionGroupConfig(); subscriptionGroupConfig.setGroupName(group); - SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig); + SubscriptionGroupConfig preConfig = putSubscriptionGroupConfigIfAbsent(subscriptionGroupConfig); if (null == preConfig) { log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString()); } @@ -305,7 +327,7 @@ public class SubscriptionGroupManager extends ConfigManager { } public void deleteSubscriptionGroupConfig(final String groupName) { - SubscriptionGroupConfig old = this.subscriptionGroupTable.remove(groupName); + SubscriptionGroupConfig old = removeSubscriptionGroupConfig(groupName); this.forbiddenTable.remove(groupName); if (old != null) { log.info("delete subscription group OK, subscription group:{}", old); @@ -317,8 +339,12 @@ public class SubscriptionGroupManager extends ConfigManager { } } + public void setSubscriptionGroupTable(ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable) { - this.subscriptionGroupTable = subscriptionGroupTable; + this.subscriptionGroupTable.clear(); + for (String key : subscriptionGroupTable.keySet()) { + putSubscriptionGroupConfig(subscriptionGroupTable.get(key)); + } } public boolean containsSubscriptionGroup(String group) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBLmqTopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBLmqTopicConfigManager.java new file mode 100644 index 000000000..d049a8dbc --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBLmqTopicConfigManager.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.topic; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.constant.PermName; + +public class RocksDBLmqTopicConfigManager extends RocksDBTopicConfigManager { + + public RocksDBLmqTopicConfigManager(BrokerController brokerController) { + super(brokerController); + } + + @Override + public TopicConfig selectTopicConfig(final String topic) { + if (MixAll.isLmq(topic)) { + return simpleLmqTopicConfig(topic); + } + return super.selectTopicConfig(topic); + } + + @Override + public void updateTopicConfig(final TopicConfig topicConfig) { + if (topicConfig == null || MixAll.isLmq(topicConfig.getTopicName())) { + return; + } + super.updateTopicConfig(topicConfig); + } + + @Override + public boolean containsTopic(String topic) { + if (MixAll.isLmq(topic)) { + return true; + } + return super.containsTopic(topic); + } + + private TopicConfig simpleLmqTopicConfig(String topic) { + return new TopicConfig(topic, 1, 1, PermName.PERM_READ | PermName.PERM_WRITE); + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java new file mode 100644 index 000000000..7da0d7c8a --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.topic; + +import java.io.File; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.config.RocksDBConfigManager; +import org.apache.rocketmq.common.utils.DataConverter; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; + +public class RocksDBTopicConfigManager extends TopicConfigManager { + + public RocksDBTopicConfigManager(BrokerController brokerController) { + super(brokerController, false); + this.rocksDBConfigManager = new RocksDBConfigManager(this.brokerController.getMessageStoreConfig().getMemTableFlushInterval()); + } + + @Override + public boolean load() { + if (!this.rocksDBConfigManager.load(configFilePath(), this::decode0)) { + return false; + } + this.init(); + return true; + } + + @Override + public boolean stop() { + return this.rocksDBConfigManager.stop(); + } + + @Override + protected void decode0(byte[] key, byte[] body) { + String topicName = new String(key, DataConverter.charset); + TopicConfig topicConfig = JSON.parseObject(body, TopicConfig.class); + + this.topicConfigTable.put(topicName, topicConfig); + log.info("load exist local topic, {}", topicConfig.toString()); + } + + @Override + public String configFilePath() { + return this.brokerController.getMessageStoreConfig().getStorePathRootDir() + File.separator + "config" + File.separator + "topics" + File.separator; + } + + @Override + protected TopicConfig putTopicConfig(TopicConfig topicConfig) { + String topicName = topicConfig.getTopicName(); + TopicConfig oldTopicConfig = this.topicConfigTable.put(topicName, topicConfig); + try { + byte[] keyBytes = topicName.getBytes(DataConverter.charset); + byte[] valueBytes = JSON.toJSONBytes(topicConfig, SerializerFeature.BrowserCompatible); + this.rocksDBConfigManager.put(keyBytes, keyBytes.length, valueBytes); + } catch (Exception e) { + log.error("kv put topic Failed, {}", topicConfig.toString(), e); + } + return oldTopicConfig; + } + + @Override + protected TopicConfig removeTopicConfig(String topicName) { + TopicConfig topicConfig = this.topicConfigTable.remove(topicName); + try { + this.rocksDBConfigManager.delete(topicName.getBytes(DataConverter.charset)); + } catch (Exception e) { + log.error("kv remove topic Failed, {}", topicConfig.toString()); + } + return topicConfig; + } + + @Override + public synchronized void persist() { + if (brokerController.getMessageStoreConfig().isRealTimePersistRocksDBConfig()) { + this.rocksDBConfigManager.flushWAL(); + } + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index e90530512..1c3b9711f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.broker.topic; -import com.google.common.collect.ImmutableMap; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -27,6 +26,9 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; + +import com.google.common.collect.ImmutableMap; + import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; @@ -50,27 +52,38 @@ import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper; import static com.google.common.base.Preconditions.checkNotNull; public class TopicConfigManager extends ConfigManager { - private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + protected static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final long LOCK_TIMEOUT_MILLIS = 3000; private static final int SCHEDULE_TOPIC_QUEUE_NUM = 18; private transient final Lock topicConfigTableLock = new ReentrantLock(); - private ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>(1024); + protected ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>(1024); private DataVersion dataVersion = new DataVersion(); - private transient BrokerController brokerController; + protected transient BrokerController brokerController; public TopicConfigManager() { + } public TopicConfigManager(BrokerController brokerController) { + this(brokerController, true); + } + + public TopicConfigManager(BrokerController brokerController, boolean init) { this.brokerController = brokerController; + if (init) { + init(); + } + } + + protected void init() { { String topic = TopicValidator.RMQ_SYS_SELF_TEST_TOPIC; TopicConfig topicConfig = new TopicConfig(topic); TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(1); topicConfig.setWriteQueueNums(1); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + putTopicConfig(topicConfig); } { if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) { @@ -83,7 +96,7 @@ public class TopicConfigManager extends ConfigManager { .getDefaultTopicQueueNums()); int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE; topicConfig.setPerm(perm); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + putTopicConfig(topicConfig); } } { @@ -92,7 +105,7 @@ public class TopicConfigManager extends ConfigManager { TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(1024); topicConfig.setWriteQueueNums(1024); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + putTopicConfig(topicConfig); } { String topic = this.brokerController.getBrokerConfig().getBrokerClusterName(); @@ -103,7 +116,7 @@ public class TopicConfigManager extends ConfigManager { perm |= PermName.PERM_READ | PermName.PERM_WRITE; } topicConfig.setPerm(perm); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + putTopicConfig(topicConfig); } { @@ -117,7 +130,7 @@ public class TopicConfigManager extends ConfigManager { topicConfig.setReadQueueNums(1); topicConfig.setWriteQueueNums(1); topicConfig.setPerm(perm); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + putTopicConfig(topicConfig); } { String topic = TopicValidator.RMQ_SYS_OFFSET_MOVED_EVENT; @@ -125,7 +138,7 @@ public class TopicConfigManager extends ConfigManager { TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(1); topicConfig.setWriteQueueNums(1); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + putTopicConfig(topicConfig); } { String topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; @@ -133,7 +146,7 @@ public class TopicConfigManager extends ConfigManager { TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(SCHEDULE_TOPIC_QUEUE_NUM); topicConfig.setWriteQueueNums(SCHEDULE_TOPIC_QUEUE_NUM); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + putTopicConfig(topicConfig); } { if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) { @@ -142,7 +155,7 @@ public class TopicConfigManager extends ConfigManager { TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(1); topicConfig.setWriteQueueNums(1); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + putTopicConfig(topicConfig); } } { @@ -151,7 +164,7 @@ public class TopicConfigManager extends ConfigManager { TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(1); topicConfig.setWriteQueueNums(1); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + putTopicConfig(topicConfig); } { // PopAckConstants.REVIVE_TOPIC @@ -160,7 +173,7 @@ public class TopicConfigManager extends ConfigManager { TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig().getReviveQueueNum()); topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig().getReviveQueueNum()); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + putTopicConfig(topicConfig); } { // sync broker member group topic @@ -170,7 +183,7 @@ public class TopicConfigManager extends ConfigManager { topicConfig.setReadQueueNums(1); topicConfig.setWriteQueueNums(1); topicConfig.setPerm(PermName.PERM_INHERIT); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + putTopicConfig(topicConfig); } { // TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC @@ -179,7 +192,7 @@ public class TopicConfigManager extends ConfigManager { TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(1); topicConfig.setWriteQueueNums(1); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + putTopicConfig(topicConfig); } { @@ -189,12 +202,24 @@ public class TopicConfigManager extends ConfigManager { TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(1); topicConfig.setWriteQueueNums(1); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + putTopicConfig(topicConfig); } } + protected TopicConfig putTopicConfig(TopicConfig topicConfig) { + return this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + } + + protected TopicConfig getTopicConfig(String topicName) { + return this.topicConfigTable.get(topicName); + } + + protected TopicConfig removeTopicConfig(String topicName) { + return this.topicConfigTable.remove(topicName); + } + public TopicConfig selectTopicConfig(final String topic) { - return this.topicConfigTable.get(topic); + return getTopicConfig(topic); } public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic, @@ -205,12 +230,12 @@ public class TopicConfigManager extends ConfigManager { try { if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { - topicConfig = this.topicConfigTable.get(topic); + topicConfig = getTopicConfig(topic); if (topicConfig != null) { return topicConfig; } - TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic); + TopicConfig defaultTopicConfig = getTopicConfig(defaultTopic); if (defaultTopicConfig != null) { if (defaultTopic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) { if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) { @@ -247,7 +272,7 @@ public class TopicConfigManager extends ConfigManager { log.info("Create new topic by default topic:[{}] config:[{}] producer:[{}]", defaultTopic, topicConfig, remoteAddress); - this.topicConfigTable.put(topic, topicConfig); + putTopicConfig(topicConfig); long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; dataVersion.nextVersion(stateMachineVersion); @@ -287,12 +312,12 @@ public class TopicConfigManager extends ConfigManager { try { if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { - TopicConfig existedTopicConfig = this.topicConfigTable.get(topicConfig.getTopicName()); + TopicConfig existedTopicConfig = getTopicConfig(topicConfig.getTopicName()); if (existedTopicConfig != null) { return existedTopicConfig; } log.info("Create new topic [{}] config:[{}]", topicConfig.getTopicName(), topicConfig); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + putTopicConfig(topicConfig); long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; dataVersion.nextVersion(stateMachineVersion); createNew = true; @@ -305,13 +330,9 @@ public class TopicConfigManager extends ConfigManager { log.error("createTopicIfAbsent ", e); } if (createNew && register) { - if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { - this.brokerController.registerSingleTopicAll(topicConfig); - } else { - this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); - } + this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); } - return this.topicConfigTable.get(topicConfig.getTopicName()); + return getTopicConfig(topicConfig.getTopicName()); } public TopicConfig createTopicInSendMessageBackMethod( @@ -328,7 +349,7 @@ public class TopicConfigManager extends ConfigManager { final int perm, final boolean isOrder, final int topicSysFlag) { - TopicConfig topicConfig = this.topicConfigTable.get(topic); + TopicConfig topicConfig = getTopicConfig(topic); if (topicConfig != null) { if (isOrder != topicConfig.isOrder()) { topicConfig.setOrder(isOrder); @@ -342,7 +363,7 @@ public class TopicConfigManager extends ConfigManager { try { if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { - topicConfig = this.topicConfigTable.get(topic); + topicConfig = getTopicConfig(topic); if (topicConfig != null) { return topicConfig; } @@ -355,7 +376,7 @@ public class TopicConfigManager extends ConfigManager { topicConfig.setOrder(isOrder); log.info("create new topic {}", topicConfig); - this.topicConfigTable.put(topic, topicConfig); + putTopicConfig(topicConfig); createNew = true; long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; dataVersion.nextVersion(stateMachineVersion); @@ -376,7 +397,7 @@ public class TopicConfigManager extends ConfigManager { } public TopicConfig createTopicOfTranCheckMaxTime(final int clientDefaultTopicQueueNums, final int perm) { - TopicConfig topicConfig = this.topicConfigTable.get(TopicValidator.RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC); + TopicConfig topicConfig = getTopicConfig(TopicValidator.RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC); if (topicConfig != null) return topicConfig; @@ -385,7 +406,7 @@ public class TopicConfigManager extends ConfigManager { try { if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { - topicConfig = this.topicConfigTable.get(TopicValidator.RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC); + topicConfig = getTopicConfig(TopicValidator.RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC); if (topicConfig != null) return topicConfig; @@ -396,7 +417,7 @@ public class TopicConfigManager extends ConfigManager { topicConfig.setTopicSysFlag(0); log.info("create new topic {}", topicConfig); - this.topicConfigTable.put(TopicValidator.RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC, topicConfig); + putTopicConfig(topicConfig); createNew = true; long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; dataVersion.nextVersion(stateMachineVersion); @@ -418,7 +439,7 @@ public class TopicConfigManager extends ConfigManager { public void updateTopicUnitFlag(final String topic, final boolean unit) { - TopicConfig topicConfig = this.topicConfigTable.get(topic); + TopicConfig topicConfig = getTopicConfig(topic); if (topicConfig != null) { int oldTopicSysFlag = topicConfig.getTopicSysFlag(); if (unit) { @@ -430,7 +451,7 @@ public class TopicConfigManager extends ConfigManager { log.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag={}", oldTopicSysFlag, topicConfig.getTopicSysFlag()); - this.topicConfigTable.put(topic, topicConfig); + putTopicConfig(topicConfig); long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; dataVersion.nextVersion(stateMachineVersion); @@ -441,7 +462,7 @@ public class TopicConfigManager extends ConfigManager { } public void updateTopicUnitSubFlag(final String topic, final boolean hasUnitSub) { - TopicConfig topicConfig = this.topicConfigTable.get(topic); + TopicConfig topicConfig = getTopicConfig(topic); if (topicConfig != null) { int oldTopicSysFlag = topicConfig.getTopicSysFlag(); if (hasUnitSub) { @@ -453,7 +474,7 @@ public class TopicConfigManager extends ConfigManager { log.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag={}", oldTopicSysFlag, topicConfig.getTopicSysFlag()); - this.topicConfigTable.put(topic, topicConfig); + putTopicConfig(topicConfig); long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; dataVersion.nextVersion(stateMachineVersion); @@ -469,6 +490,7 @@ public class TopicConfigManager extends ConfigManager { Map<String, String> newAttributes = request(topicConfig); Map<String, String> currentAttributes = current(topicConfig.getTopicName()); + Map<String, String> finalAttributes = AttributeUtil.alterCurrentAttributes( this.topicConfigTable.get(topicConfig.getTopicName()) == null, TopicAttributes.ALL, @@ -477,7 +499,7 @@ public class TopicConfigManager extends ConfigManager { topicConfig.setAttributes(finalAttributes); - TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + TopicConfig old = putTopicConfig(topicConfig); if (old != null) { log.info("update topic config, old:[{}] new:[{}]", old, topicConfig); } else { @@ -496,7 +518,7 @@ public class TopicConfigManager extends ConfigManager { boolean isChange = false; Set<String> orderTopics = orderKVTableFromNs.getTable().keySet(); for (String topic : orderTopics) { - TopicConfig topicConfig = this.topicConfigTable.get(topic); + TopicConfig topicConfig = getTopicConfig(topic); if (topicConfig != null && !topicConfig.isOrder()) { topicConfig.setOrder(true); isChange = true; @@ -534,7 +556,7 @@ public class TopicConfigManager extends ConfigManager { } public boolean isOrderTopic(final String topic) { - TopicConfig topicConfig = this.topicConfigTable.get(topic); + TopicConfig topicConfig = getTopicConfig(topic); if (topicConfig == null) { return false; } else { @@ -543,7 +565,7 @@ public class TopicConfigManager extends ConfigManager { } public void deleteTopicConfig(final String topic) { - TopicConfig old = this.topicConfigTable.remove(topic); + TopicConfig old = removeTopicConfig(topic); if (old != null) { log.info("delete topic config OK, topic: {}", old); long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; @@ -619,7 +641,7 @@ public class TopicConfigManager extends ConfigManager { } private Map<String, String> current(String topic) { - TopicConfig topicConfig = this.topicConfigTable.get(topic); + TopicConfig topicConfig = getTopicConfig(topic); if (topicConfig == null) { return new HashMap<>(); } else { diff --git a/broker/src/main/resources/rmq.broker.logback.xml b/broker/src/main/resources/rmq.broker.logback.xml index 78b1aea41..7d49f6664 100644 --- a/broker/src/main/resources/rmq.broker.logback.xml +++ b/broker/src/main/resources/rmq.broker.logback.xml @@ -145,6 +145,39 @@ <appender-ref ref="RocketmqWaterMarkSiftingAppender_inner"/> </appender> + <appender name="RocketmqRocksDBSiftingAppender_inner" class="ch.qos.logback.classic.sift.SiftingAppender"> + <discriminator> + <key>brokerContainerLogDir</key> + <defaultValue>${file.separator}</defaultValue> + </discriminator> + <sift> + <appender name="RocketmqStoreAppender" + class="ch.qos.logback.core.rolling.RollingFileAppender"> + <file> + ${user.home}${file.separator}logs${file.separator}rocketmqlogs${brokerLogDir:-${file.separator}}${brokerContainerLogDir}${file.separator}rocksdb.log + </file> + <append>true</append> + <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy"> + <fileNamePattern> + ${user.home}${file.separator}logs${file.separator}rocketmqlogs${brokerLogDir:-${file.separator}}${brokerContainerLogDir}${file.separator}otherdays${file.separator}rocksdb.%i.log.gz + </fileNamePattern> + <minIndex>1</minIndex> + <maxIndex>10</maxIndex> + </rollingPolicy> + <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + <maxFileSize>128MB</maxFileSize> + </triggeringPolicy> + <encoder> + <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern> + <charset class="java.nio.charset.Charset">UTF-8</charset> + </encoder> + </appender> + </sift> + </appender> + <appender name="RocketmqRocksDBSiftingAppender" class="ch.qos.logback.classic.AsyncAppender"> + <appender-ref ref="RocketmqRocksDBSiftingAppender_inner"/> + </appender> + <appender name="RocketmqStoreSiftingAppender_inner" class="ch.qos.logback.classic.sift.SiftingAppender"> <discriminator> <key>brokerContainerLogDir</key> @@ -579,6 +612,10 @@ <appender-ref ref="RocketmqBrokerSiftingAppender"/> </logger> + <logger name="RocketmqRocksDB" additivity="false" level="INFO"> + <appender-ref ref="RocketmqRocksDBSiftingAppender"/> + </logger> + <logger name="RocketmqStore" additivity="false" level="INFO"> <appender-ref ref="RocketmqStoreSiftingAppender"/> </logger> diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManagerTest.java new file mode 100644 index 000000000..58b690c9a --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManagerTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.offset; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.assertj.core.api.Assertions.assertThat; + +public class RocksDBConsumerOffsetManagerTest { + + private static final String KEY = "FooBar@FooBarGroup"; + + private BrokerController brokerController; + + private ConsumerOffsetManager consumerOffsetManager; + + @Before + public void init() { + if (notToBeExecuted()) { + return; + } + brokerController = Mockito.mock(BrokerController.class); + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + Mockito.when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig); + + consumerOffsetManager = new RocksDBConsumerOffsetManager(brokerController); + consumerOffsetManager.load(); + + ConcurrentHashMap<String, ConcurrentMap<Integer, Long>> offsetTable = new ConcurrentHashMap<>(512); + offsetTable.put(KEY,new ConcurrentHashMap<Integer, Long>() {{ + put(1,2L); + put(2,3L); + }}); + consumerOffsetManager.setOffsetTable(offsetTable); + } + + @After + public void destroy() { + if (notToBeExecuted()) { + return; + } + if (consumerOffsetManager != null) { + consumerOffsetManager.stop(); + } + } + + @Test + public void cleanOffsetByTopic_NotExist() { + if (notToBeExecuted()) { + return; + } + consumerOffsetManager.cleanOffsetByTopic("InvalidTopic"); + assertThat(consumerOffsetManager.getOffsetTable().containsKey(KEY)).isTrue(); + } + + @Test + public void cleanOffsetByTopic_Exist() { + if (notToBeExecuted()) { + return; + } + consumerOffsetManager.cleanOffsetByTopic("FooBar"); + assertThat(!consumerOffsetManager.getOffsetTable().containsKey(KEY)).isTrue(); + } + + @Test + public void testOffsetPersistInMemory() { + if (notToBeExecuted()) { + return; + } + ConcurrentMap<String, ConcurrentMap<Integer, Long>> offsetTable = consumerOffsetManager.getOffsetTable(); + ConcurrentMap<Integer, Long> table = new ConcurrentHashMap<>(); + table.put(0, 1L); + table.put(1, 3L); + String group = "G1"; + offsetTable.put(group, table); + + consumerOffsetManager.persist(); + consumerOffsetManager.stop(); + consumerOffsetManager.load(); + + ConcurrentMap<Integer, Long> offsetTableLoaded = consumerOffsetManager.getOffsetTable().get(group); + Assert.assertEquals(table, offsetTableLoaded); + } + + private boolean notToBeExecuted() { + return MixAll.isMac(); + } +} diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java index a470c0cf2..d33a217f7 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java @@ -36,6 +36,8 @@ import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.broker.client.ConsumerManager; import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; import org.apache.rocketmq.broker.schedule.ScheduleMessageService; +import org.apache.rocketmq.broker.subscription.RocksDBSubscriptionGroupManager; +import org.apache.rocketmq.broker.topic.RocksDBTopicConfigManager; import org.apache.rocketmq.broker.topic.TopicConfigManager; import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.BrokerConfig; @@ -76,6 +78,7 @@ import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.logfile.DefaultMappedFile; import org.apache.rocketmq.store.stats.BrokerStats; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -114,7 +117,7 @@ public class AdminBrokerProcessorTest { private SendMessageProcessor sendMessageProcessor; @Mock - private ConcurrentMap<TopicQueueId, LongAdder> inFlyWritingCouterMap; + private ConcurrentMap<TopicQueueId, LongAdder> inFlyWritingCounterMap; private Set<String> systemTopicSet; private String topic; @@ -162,6 +165,40 @@ public class AdminBrokerProcessorTest { brokerController.getMessageStoreConfig().setTimerWheelEnable(false); } + @After + public void destroy() { + if (notToBeExecuted()) { + return; + } + if (brokerController.getSubscriptionGroupManager() != null) { + brokerController.getSubscriptionGroupManager().stop(); + } + if (brokerController.getTopicConfigManager() != null) { + brokerController.getTopicConfigManager().stop(); + } + if (brokerController.getConsumerOffsetManager() != null) { + brokerController.getConsumerOffsetManager().stop(); + } + } + + private void initRocksdbTopicManager() { + if (notToBeExecuted()) { + return; + } + RocksDBTopicConfigManager rocksDBTopicConfigManager = new RocksDBTopicConfigManager(brokerController); + brokerController.setTopicConfigManager(rocksDBTopicConfigManager); + rocksDBTopicConfigManager.load(); + } + + private void initRocksdbSubscriptionManager() { + if (notToBeExecuted()) { + return; + } + RocksDBSubscriptionGroupManager rocksDBSubscriptionGroupManager = new RocksDBSubscriptionGroupManager(brokerController); + brokerController.setSubscriptionGroupManager(rocksDBSubscriptionGroupManager); + rocksDBSubscriptionGroupManager.load(); + } + @Test public void testProcessRequest_success() throws RemotingCommandException, UnknownHostException { RemotingCommand request = createUpdateBrokerConfigCommand(); @@ -177,6 +214,15 @@ public class AdminBrokerProcessorTest { assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); } + @Test + public void testUpdateAndCreateTopicInRocksdb() throws Exception { + if (notToBeExecuted()) { + return; + } + initRocksdbTopicManager(); + testUpdateAndCreateTopic(); + } + @Test public void testUpdateAndCreateTopic() throws Exception { //test system topic @@ -197,7 +243,15 @@ public class AdminBrokerProcessorTest { request = buildCreateTopicRequest(topic); response = adminBrokerProcessor.processRequest(handlerContext, request); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + @Test + public void testUpdateAndCreateTopicOnSlaveInRocksdb() throws Exception { + if (notToBeExecuted()) { + return; + } + initRocksdbTopicManager(); + testUpdateAndCreateTopicOnSlave(); } @Test @@ -217,6 +271,15 @@ public class AdminBrokerProcessorTest { "please execute it from master broker."); } + @Test + public void testDeleteTopicInRocksdb() throws Exception { + if (notToBeExecuted()) { + return; + } + initRocksdbTopicManager(); + testDeleteTopic(); + } + @Test public void testDeleteTopic() throws Exception { //test system topic @@ -233,6 +296,15 @@ public class AdminBrokerProcessorTest { assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } + @Test + public void testDeleteTopicOnSlaveInRocksdb() throws Exception { + if (notToBeExecuted()) { + return; + } + initRocksdbTopicManager(); + testDeleteTopicOnSlave(); + } + @Test public void testDeleteTopicOnSlave() throws Exception { // setup @@ -249,6 +321,15 @@ public class AdminBrokerProcessorTest { "please execute it from master broker."); } + @Test + public void testGetAllTopicConfigInRocksdb() throws Exception { + if (notToBeExecuted()) { + return; + } + initRocksdbTopicManager(); + testGetAllTopicConfig(); + } + @Test public void testGetAllTopicConfig() throws Exception { GetAllTopicConfigResponseHeader getAllTopicConfigResponseHeader = new GetAllTopicConfigResponseHeader(); @@ -400,6 +481,12 @@ public class AdminBrokerProcessorTest { assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } + @Test + public void testUpdateAndCreateSubscriptionGroupInRocksdb() throws Exception { + initRocksdbSubscriptionManager(); + testUpdateAndCreateSubscriptionGroup(); + } + @Test public void testUpdateAndCreateSubscriptionGroup() throws RemotingCommandException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null); @@ -415,6 +502,12 @@ public class AdminBrokerProcessorTest { assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } + @Test + public void testUpdateAndCreateSubscriptionGroupOnSlaveInRocksdb() throws Exception { + initRocksdbSubscriptionManager(); + testUpdateAndCreateSubscriptionGroupOnSlave(); + } + @Test public void testUpdateAndCreateSubscriptionGroupOnSlave() throws RemotingCommandException { // Setup @@ -439,6 +532,12 @@ public class AdminBrokerProcessorTest { "please execute it from master broker."); } + @Test + public void testGetAllSubscriptionGroupInRocksdb() throws Exception { + initRocksdbSubscriptionManager(); + testGetAllSubscriptionGroup(); + } + @Test public void testGetAllSubscriptionGroup() throws RemotingCommandException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null); @@ -446,6 +545,12 @@ public class AdminBrokerProcessorTest { assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } + @Test + public void testDeleteSubscriptionGroupInRocksdb() throws Exception { + initRocksdbSubscriptionManager(); + testDeleteSubscriptionGroup(); + } + @Test public void testDeleteSubscriptionGroup() throws RemotingCommandException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP, null); @@ -455,6 +560,12 @@ public class AdminBrokerProcessorTest { assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } + @Test + public void testDeleteSubscriptionGroupOnSlaveInRocksdb() throws Exception { + initRocksdbSubscriptionManager(); + testDeleteSubscriptionGroupOnSlave(); + } + @Test public void testDeleteSubscriptionGroupOnSlave() throws RemotingCommandException { // Setup @@ -547,6 +658,15 @@ public class AdminBrokerProcessorTest { assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } + @Test + public void testGetTopicConfigInRocksdb() throws Exception { + if (notToBeExecuted()) { + return; + } + initRocksdbTopicManager(); + testGetTopicConfig(); + } + @Test public void testGetTopicConfig() throws Exception { String topic = "foobar"; @@ -630,4 +750,8 @@ public class AdminBrokerProcessorTest { request.makeCustomHeaderToNet(); return request; } + + private boolean notToBeExecuted() { + return MixAll.isMac(); + } } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/substription/ForbiddenTest.java b/broker/src/test/java/org/apache/rocketmq/broker/subscription/ForbiddenTest.java similarity index 95% rename from broker/src/test/java/org/apache/rocketmq/broker/substription/ForbiddenTest.java rename to broker/src/test/java/org/apache/rocketmq/broker/subscription/ForbiddenTest.java index 2ac5ee320..bdaee3b3c 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/substription/ForbiddenTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/subscription/ForbiddenTest.java @@ -15,12 +15,11 @@ * limitations under the License. */ -package org.apache.rocketmq.broker.substription; +package org.apache.rocketmq.broker.subscription; import static org.junit.Assert.assertEquals; import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyServerConfig; diff --git a/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java index 6337c69ea..3c829437c 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java @@ -20,9 +20,12 @@ package org.apache.rocketmq.broker.subscription; import com.google.common.collect.ImmutableMap; import java.util.Map; import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.SubscriptionGroupAttributes; import org.apache.rocketmq.common.attribute.BooleanAttribute; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -54,6 +57,28 @@ public class SubscriptionGroupManagerTest { doNothing().when(subscriptionGroupManager).persist(); } + @After + public void destroy() { + if (MixAll.isMac()) { + return; + } + if (subscriptionGroupManager != null) { + subscriptionGroupManager.stop(); + } + } + + @Test + public void testUpdateAndCreateSubscriptionGroupInRocksdb() { + if (MixAll.isMac()) { + return; + } + when(brokerControllerMock.getMessageStoreConfig()).thenReturn(new MessageStoreConfig()); + subscriptionGroupManager = spy(new RocksDBSubscriptionGroupManager(brokerControllerMock)); + subscriptionGroupManager.load(); + group += System.currentTimeMillis(); + updateSubscriptionGroupConfig(); + } + @Test public void updateSubscriptionGroupConfig() { SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java new file mode 100644 index 000000000..ed71a3313 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.topic; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.TopicAttributes; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.attribute.Attribute; +import org.apache.rocketmq.common.attribute.BooleanAttribute; +import org.apache.rocketmq.common.attribute.CQType; +import org.apache.rocketmq.common.attribute.EnumAttribute; +import org.apache.rocketmq.common.attribute.LongRangeAttribute; +import org.apache.rocketmq.common.utils.QueueTypeUtils; +import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static com.google.common.collect.Sets.newHashSet; +import static java.util.Arrays.asList; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class RocksdbTopicConfigManagerTest { + private RocksDBTopicConfigManager topicConfigManager; + @Mock + private BrokerController brokerController; + + @Mock + private DefaultMessageStore defaultMessageStore; + + @Before + public void init() { + if (notToBeExecuted()) { + return; + } + BrokerConfig brokerConfig = new BrokerConfig(); + when(brokerController.getBrokerConfig()).thenReturn(brokerConfig); + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig); + when(brokerController.getMessageStore()).thenReturn(defaultMessageStore); + when(defaultMessageStore.getStateMachineVersion()).thenReturn(0L); + topicConfigManager = new RocksDBTopicConfigManager(brokerController); + topicConfigManager.load(); + } + + @After + public void destroy() { + if (notToBeExecuted()) { + return; + } + if (topicConfigManager != null) { + topicConfigManager.stop(); + } + } + + @Test + public void testAddUnsupportedKeyOnCreating() { + if (notToBeExecuted()) { + return; + } + String unsupportedKey = "key4"; + String topicName = "testAddUnsupportedKeyOnCreating-" + System.currentTimeMillis(); + + supportAttributes(asList( + new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"), + new BooleanAttribute("bool.key", false, false), + new LongRangeAttribute("long.range.key", true, 10, 20, 15) + )); + + Map<String, String> attributes = new HashMap<>(); + attributes.put("+enum.key", "enum-2"); + attributes.put("+" + unsupportedKey, "value1"); + + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName(topicName); + topicConfig.setAttributes(attributes); + + RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> topicConfigManager.updateTopicConfig(topicConfig)); + Assert.assertEquals("unsupported key: " + unsupportedKey, runtimeException.getMessage()); + } + + @Test + public void testAddWrongFormatKeyOnCreating() { + if (notToBeExecuted()) { + return; + } + String topicName = "testAddWrongFormatKeyOnCreating-" + System.currentTimeMillis(); + + supportAttributes(asList( + new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"), + new BooleanAttribute("bool.key", false, false), + new LongRangeAttribute("long.range.key", true, 10, 20, 15) + )); + + Map<String, String> attributes = new HashMap<>(); + attributes.put("++enum.key", "value1"); + + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName(topicName); + topicConfig.setAttributes(attributes); + + RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> topicConfigManager.updateTopicConfig(topicConfig)); + Assert.assertEquals("kv string format wrong.", runtimeException.getMessage()); + } + + @Test + public void testDeleteKeyOnCreating() { + if (notToBeExecuted()) { + return; + } + String topicName = "testDeleteKeyOnCreating-" + System.currentTimeMillis(); + + String key = "enum.key"; + supportAttributes(asList( + new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"), + new BooleanAttribute("bool.key", false, false), + new LongRangeAttribute("long.range.key", true, 10, 20, 15) + )); + + Map<String, String> attributes = new HashMap<>(); + attributes.put("-" + key, ""); + + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName(topicName); + topicConfig.setAttributes(attributes); + + RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> topicConfigManager.updateTopicConfig(topicConfig)); + Assert.assertEquals("only add attribute is supported while creating topic. key: " + key, runtimeException.getMessage()); + } + + @Test + public void testAddWrongValueOnCreating() { + if (notToBeExecuted()) { + return; + } + String topicName = "testAddWrongValueOnCreating-" + System.currentTimeMillis(); + + Map<String, String> attributes = new HashMap<>(); + attributes.put("+" + TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), "wrong-value"); + + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName(topicName); + topicConfig.setAttributes(attributes); + + RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> topicConfigManager.updateTopicConfig(topicConfig)); + Assert.assertEquals("value is not in set: [SimpleCQ, BatchCQ]", runtimeException.getMessage()); + } + + @Test + public void testNormalAddKeyOnCreating() { + if (notToBeExecuted()) { + return; + } + String topic = "testNormalAddKeyOnCreating-" + System.currentTimeMillis(); + + supportAttributes(asList( + new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"), + new BooleanAttribute("bool.key", false, false), + new LongRangeAttribute("long.range.key", true, 10, 20, 15) + )); + + Map<String, String> attributes = new HashMap<>(); + attributes.put("+enum.key", "enum-2"); + attributes.put("+long.range.key", "16"); + + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName(topic); + topicConfig.setAttributes(attributes); + topicConfigManager.updateTopicConfig(topicConfig); + + TopicConfig existingTopicConfig = topicConfigManager.getTopicConfigTable().get(topic); + Assert.assertEquals("enum-2", existingTopicConfig.getAttributes().get("enum.key")); + Assert.assertEquals("16", existingTopicConfig.getAttributes().get("long.range.key")); + // assert file + } + + @Test + public void testAddDuplicatedKeyOnUpdating() { + if (notToBeExecuted()) { + return; + } + String duplicatedKey = "long.range.key"; + String topicName = "testAddDuplicatedKeyOnUpdating-" + System.currentTimeMillis(); + + supportAttributes(asList( + new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"), + new BooleanAttribute("bool.key", false, false), + new LongRangeAttribute("long.range.key", true, 10, 20, 15) + )); + + Map<String, String> attributes = new HashMap<>(); + attributes.put("+enum.key", "enum-3"); + attributes.put("+bool.key", "true"); + attributes.put("+long.range.key", "12"); + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName(topicName); + topicConfig.setAttributes(attributes); + topicConfigManager.updateTopicConfig(topicConfig); + + + + attributes = new HashMap<>(); + attributes.put("+" + duplicatedKey, "11"); + attributes.put("-" + duplicatedKey, ""); + TopicConfig duplicateTopicConfig = new TopicConfig(); + duplicateTopicConfig.setTopicName(topicName); + duplicateTopicConfig.setAttributes(attributes); + + RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> topicConfigManager.updateTopicConfig(duplicateTopicConfig)); + Assert.assertEquals("alter duplication key. key: " + duplicatedKey, runtimeException.getMessage()); + } + + @Test + public void testDeleteNonexistentKeyOnUpdating() { + if (notToBeExecuted()) { + return; + } + String key = "nonexisting.key"; + String topicName = "testDeleteNonexistentKeyOnUpdating-" + System.currentTimeMillis(); + + supportAttributes(asList( + new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"), + new BooleanAttribute("bool.key", false, false), + new LongRangeAttribute("long.range.key", true, 10, 20, 15) + )); + + Map<String, String> attributes = new HashMap<>(); + attributes.put("+enum.key", "enum-2"); + attributes.put("+bool.key", "true"); + + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName(topicName); + topicConfig.setAttributes(attributes); + + topicConfigManager.updateTopicConfig(topicConfig); + + attributes = new HashMap<>(); + attributes.clear(); + attributes.put("-" + key, ""); + topicConfig.setAttributes(attributes); + RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> topicConfigManager.updateTopicConfig(topicConfig)); + Assert.assertEquals("attempt to delete a nonexistent key: " + key, runtimeException.getMessage()); + } + + @Test + public void testAlterTopicWithoutChangingAttributes() { + if (notToBeExecuted()) { + return; + } + String topic = "testAlterTopicWithoutChangingAttributes-" + System.currentTimeMillis(); + + supportAttributes(asList( + new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"), + new BooleanAttribute("bool.key", false, false), + new LongRangeAttribute("long.range.key", true, 10, 20, 15) + )); + + Map<String, String> attributes = new HashMap<>(); + attributes.put("+enum.key", "enum-2"); + attributes.put("+bool.key", "true"); + + TopicConfig topicConfigInit = new TopicConfig(); + topicConfigInit.setTopicName(topic); + topicConfigInit.setAttributes(attributes); + + topicConfigManager.updateTopicConfig(topicConfigInit); + Assert.assertEquals("enum-2", topicConfigManager.getTopicConfigTable().get(topic).getAttributes().get("enum.key")); + Assert.assertEquals("true", topicConfigManager.getTopicConfigTable().get(topic).getAttributes().get("bool.key")); + + TopicConfig topicConfigAlter = new TopicConfig(); + topicConfigAlter.setTopicName(topic); + topicConfigAlter.setReadQueueNums(10); + topicConfigAlter.setWriteQueueNums(10); + topicConfigManager.updateTopicConfig(topicConfigAlter); + Assert.assertEquals("enum-2", topicConfigManager.getTopicConfigTable().get(topic).getAttributes().get("enum.key")); + Assert.assertEquals("true", topicConfigManager.getTopicConfigTable().get(topic).getAttributes().get("bool.key")); + } + + @Test + public void testNormalUpdateUnchangeableKeyOnUpdating() { + if (notToBeExecuted()) { + return; + } + String topic = "testNormalUpdateUnchangeableKeyOnUpdating-" + System.currentTimeMillis(); + + supportAttributes(asList( + new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"), + new BooleanAttribute("bool.key", true, false), + new LongRangeAttribute("long.range.key", false, 10, 20, 15) + )); + + Map<String, String> attributes = new HashMap<>(); + attributes.put("+long.range.key", "14"); + + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName(topic); + topicConfig.setAttributes(attributes); + + topicConfigManager.updateTopicConfig(topicConfig); + + attributes.put("+long.range.key", "16"); + topicConfig.setAttributes(attributes); + RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> topicConfigManager.updateTopicConfig(topicConfig)); + Assert.assertEquals("attempt to update an unchangeable attribute. key: long.range.key", runtimeException.getMessage()); + } + + @Test + public void testNormalQueryKeyOnGetting() { + if (notToBeExecuted()) { + return; + } + String topic = "testNormalQueryKeyOnGetting-" + System.currentTimeMillis(); + String unchangeable = "bool.key"; + + supportAttributes(asList( + new EnumAttribute("enum.key", true, newHashSet("enum-1", "enum-2", "enum-3"), "enum-1"), + new BooleanAttribute("bool.key", false, false), + new LongRangeAttribute("long.range.key", true, 10, 20, 15) + )); + + Map<String, String> attributes = new HashMap<>(); + attributes.put("+" + unchangeable, "true"); + + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName(topic); + topicConfig.setAttributes(attributes); + + topicConfigManager.updateTopicConfig(topicConfig); + + TopicConfig topicConfigUpdated = topicConfigManager.getTopicConfigTable().get(topic); + Assert.assertEquals(CQType.SimpleCQ, QueueTypeUtils.getCQType(Optional.of(topicConfigUpdated))); + + Assert.assertEquals("true", topicConfigUpdated.getAttributes().get(unchangeable)); + } + + private void supportAttributes(List<Attribute> supportAttributes) { + Map<String, Attribute> supportedAttributes = new HashMap<>(); + + for (Attribute supportAttribute : supportAttributes) { + supportedAttributes.put(supportAttribute.getName(), supportAttribute); + } + + TopicAttributes.ALL.putAll(supportedAttributes); + } + + private boolean notToBeExecuted() { + return MixAll.isMac(); + } +} diff --git a/client/BUILD.bazel b/client/BUILD.bazel index e491cfcef..46e29452b 100644 --- a/client/BUILD.bazel +++ b/client/BUILD.bazel @@ -33,6 +33,7 @@ java_library( "@maven//:commons_collections_commons_collections", "@maven//:io_github_aliyunmq_rocketmq_slf4j_api", "@maven//:io_github_aliyunmq_rocketmq_logback_classic", + "@maven//:com_google_guava_guava", ], ) diff --git a/common/BUILD.bazel b/common/BUILD.bazel index 831c85e3d..a95a19ccd 100644 --- a/common/BUILD.bazel +++ b/common/BUILD.bazel @@ -39,6 +39,7 @@ java_library( "@maven//:org_lz4_lz4_java", "@maven//:io_github_aliyunmq_rocketmq_slf4j_api", "@maven//:io_github_aliyunmq_rocketmq_logback_classic", + "@maven//:io_github_aliyunmq_rocketmq_rocksdb", ], ) diff --git a/common/pom.xml b/common/pom.xml index 9796d1b2d..31eb0f087 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -104,5 +104,9 @@ <groupId>io.github.aliyunmq</groupId> <artifactId>rocketmq-logback-classic</artifactId> </dependency> + <dependency> + <groupId>io.github.aliyunmq</groupId> + <artifactId>rocketmq-rocksdb</artifactId> + </dependency> </dependencies> </project> diff --git a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java index f712e1694..6c3bed47c 100644 --- a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java +++ b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java @@ -18,6 +18,8 @@ package org.apache.rocketmq.common; import java.io.IOException; import java.util.Map; + +import org.apache.rocketmq.common.config.RocksDBConfigManager; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; @@ -25,7 +27,7 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; public abstract class ConfigManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); - public abstract String encode(); + protected RocksDBConfigManager rocksDBConfigManager; public boolean load() { String fileName = null; @@ -46,8 +48,6 @@ public abstract class ConfigManager { } } - public abstract String configFilePath(); - private boolean loadBak() { String fileName = null; try { @@ -66,8 +66,6 @@ public abstract class ConfigManager { return true; } - public abstract void decode(final String jsonString); - public synchronized <T> void persist(String topicName, T t) { // stub for future this.persist(); @@ -90,5 +88,19 @@ public abstract class ConfigManager { } } + protected void decode0(final byte[] key, final byte[] body) { + + } + + public boolean stop() { + return true; + } + + public abstract String configFilePath(); + + public abstract String encode(); + public abstract String encode(final boolean prettyFormat); + + public abstract void decode(final String jsonString); } diff --git a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java new file mode 100644 index 000000000..e3673baad --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java @@ -0,0 +1,613 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.config; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompactRangeOptions; +import org.rocksdb.CompactionOptions; +import org.rocksdb.DBOptions; +import org.rocksdb.FlushOptions; +import org.rocksdb.LiveFileMetaData; +import org.rocksdb.Priority; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.Statistics; +import org.rocksdb.Status; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import static org.rocksdb.RocksDB.NOT_FOUND; + +public abstract class AbstractRocksDBStorage { + protected static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.ROCKSDB_LOGGER_NAME); + + private static final Charset CHARSET_UTF8 = Charset.forName("UTF-8"); + private static final String SPACE = " | "; + + protected String dbPath; + protected boolean readOnly; + protected RocksDB db; + protected DBOptions options; + + protected WriteOptions writeOptions; + protected WriteOptions ableWalWriteOptions; + + protected ReadOptions readOptions; + protected ReadOptions totalOrderReadOptions; + + protected CompactionOptions compactionOptions; + protected CompactRangeOptions compactRangeOptions; + + protected ColumnFamilyHandle defaultCFHandle; + protected final List<ColumnFamilyOptions> cfOptions = new ArrayList(); + + protected volatile boolean loaded; + private volatile boolean closed; + + private final Semaphore reloadPermit = new Semaphore(1); + private final ScheduledExecutorService reloadScheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("RocksDBStorageReloadService_")); + private final ThreadPoolExecutor manualCompactionThread = new ThreadPoolExecutor( + 1, 1, 1000 * 60, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue(1), + new ThreadFactoryImpl("RocksDBManualCompactionService_"), + new ThreadPoolExecutor.DiscardOldestPolicy()); + + static { + RocksDB.loadLibrary(); + } + + public boolean hold() { + if (!this.loaded || this.db == null || this.closed) { + LOGGER.error("hold rocksdb Failed. {}", this.dbPath); + return false; + } else { + return true; + } + } + + public void release() { + } + + protected void put(ColumnFamilyHandle cfHandle, WriteOptions writeOptions, + final byte[] keyBytes, final int keyLen, + final byte[] valueBytes, final int valueLen) throws RocksDBException { + if (!hold()) { + throw new IllegalStateException("rocksDB:" + this + " is not ready"); + } + try { + this.db.put(cfHandle, writeOptions, keyBytes, 0, keyLen, valueBytes, 0, valueLen); + } catch (RocksDBException e) { + scheduleReloadRocksdb(e); + LOGGER.error("put Failed. {}, {}", this.dbPath, getStatusError(e)); + throw e; + } finally { + release(); + } + } + + protected void put(ColumnFamilyHandle cfHandle, WriteOptions writeOptions, + final ByteBuffer keyBB, final ByteBuffer valueBB) throws RocksDBException { + if (!hold()) { + throw new IllegalStateException("rocksDB:" + this + " is not ready"); + } + try { + this.db.put(cfHandle, writeOptions, keyBB, valueBB); + } catch (RocksDBException e) { + scheduleReloadRocksdb(e); + LOGGER.error("put Failed. {}, {}", this.dbPath, getStatusError(e)); + throw e; + } finally { + release(); + } + } + + protected void batchPut(WriteOptions writeOptions, final WriteBatch batch) throws RocksDBException { + try { + this.db.write(writeOptions, batch); + } catch (RocksDBException e) { + scheduleReloadRocksdb(e); + LOGGER.error("batchPut Failed. {}, {}", this.dbPath, getStatusError(e)); + throw e; + } finally { + batch.clear(); + } + } + + protected byte[] get(ColumnFamilyHandle cfHandle, ReadOptions readOptions, byte[] keyBytes) throws RocksDBException { + if (!hold()) { + throw new IllegalStateException("rocksDB:" + this + " is not ready"); + } + try { + return this.db.get(cfHandle, readOptions, keyBytes); + } catch (RocksDBException e) { + LOGGER.error("get Failed. {}, {}", this.dbPath, getStatusError(e)); + throw e; + } finally { + release(); + } + } + + protected boolean get(ColumnFamilyHandle cfHandle, ReadOptions readOptions, + final ByteBuffer keyBB, final ByteBuffer valueBB) throws RocksDBException { + if (!hold()) { + throw new IllegalStateException("rocksDB:" + this + " is not ready"); + } + try { + return this.db.get(cfHandle, readOptions, keyBB, valueBB) != NOT_FOUND; + } catch (RocksDBException e) { + LOGGER.error("get Failed. {}, {}", this.dbPath, getStatusError(e)); + throw e; + } finally { + release(); + } + } + + protected List<byte[]> multiGet(final ReadOptions readOptions, + final List<ColumnFamilyHandle> columnFamilyHandleList, + final List<byte[]> keys) throws RocksDBException { + if (!hold()) { + throw new IllegalStateException("rocksDB:" + this + " is not ready"); + } + try { + return this.db.multiGetAsList(readOptions, columnFamilyHandleList, keys); + } catch (RocksDBException e) { + LOGGER.error("multiGet Failed. {}, {}", this.dbPath, getStatusError(e)); + throw e; + } finally { + release(); + } + } + + protected void delete(ColumnFamilyHandle cfHandle, WriteOptions writeOptions, byte[] keyBytes) throws RocksDBException { + if (!hold()) { + throw new IllegalStateException("rocksDB:" + this + " is not ready"); + } + try { + this.db.delete(cfHandle, writeOptions, keyBytes); + } catch (RocksDBException e) { + LOGGER.error("delete Failed. {}, {}", this.dbPath, getStatusError(e)); + throw e; + } finally { + release(); + } + } + + protected void delete(ColumnFamilyHandle cfHandle, WriteOptions writeOptions, ByteBuffer keyBB) throws RocksDBException { + if (!hold()) { + throw new IllegalStateException("rocksDB:" + this + " is not ready"); + } + try { + this.db.delete(cfHandle, writeOptions, keyBB); + } catch (RocksDBException e) { + LOGGER.error("delete Failed. {}, {}", this.dbPath, getStatusError(e)); + throw e; + } finally { + release(); + } + } + + protected WrappedRocksIterator newIterator(ColumnFamilyHandle cfHandle, ReadOptions readOptions) { + return new WrappedRocksIterator(this.db.newIterator(cfHandle, readOptions)); + } + + protected void rangeDelete(ColumnFamilyHandle cfHandle, WriteOptions writeOptions, + final byte[] startKey, final byte[] endKey) throws RocksDBException { + if (!hold()) { + throw new IllegalStateException("rocksDB:" + this + " is not ready"); + } + try { + this.db.deleteRange(cfHandle, writeOptions, startKey, endKey); + } catch (RocksDBException e) { + scheduleReloadRocksdb(e); + LOGGER.error("rangeDelete Failed. {}, {}", this.dbPath, getStatusError(e)); + throw e; + } finally { + release(); + } + } + + protected void manualCompactionDefaultCfMaxLevel(final CompactionOptions compactionOptions) throws Exception { + final ColumnFamilyHandle defaultCFHandle = this.defaultCFHandle; + final byte[] defaultCFName = defaultCFHandle.getName(); + List<LiveFileMetaData> fileMetaDataList = this.db.getLiveFilesMetaData(); + if (fileMetaDataList == null || fileMetaDataList.isEmpty()) { + return; + } + + List<LiveFileMetaData> defaultLiveFileDataList = Lists.newArrayList(); + List<String> inputFileNames = Lists.newArrayList(); + int maxLevel = 0; + for (LiveFileMetaData fileMetaData : fileMetaDataList) { + if (compareTo(fileMetaData.columnFamilyName(), defaultCFName) != 0) { + continue; + } + defaultLiveFileDataList.add(fileMetaData); + if (fileMetaData.level() > maxLevel) { + maxLevel = fileMetaData.level(); + } + } + if (maxLevel == 0) { + LOGGER.info("manualCompactionDefaultCfFiles skip level 0."); + return; + } + + for (LiveFileMetaData fileMetaData : defaultLiveFileDataList) { + if (fileMetaData.level() != maxLevel || fileMetaData.beingCompacted()) { + continue; + } + inputFileNames.add(fileMetaData.path() + fileMetaData.fileName()); + } + if (!inputFileNames.isEmpty()) { + List<String> outputLists = this.db.compactFiles(compactionOptions, defaultCFHandle, + inputFileNames, maxLevel, -1, null); + LOGGER.info("manualCompactionDefaultCfFiles OK. src: {}, dst: {}", inputFileNames, outputLists); + } else { + LOGGER.info("manualCompactionDefaultCfFiles Empty."); + } + } + + protected void manualCompactionDefaultCfRange(CompactRangeOptions compactRangeOptions) { + if (!hold()) { + return; + } + long s1 = System.currentTimeMillis(); + boolean result = true; + try { + LOGGER.info("manualCompaction Start. {}", this.dbPath); + this.db.compactRange(this.defaultCFHandle, null, null, compactRangeOptions); + } catch (RocksDBException e) { + result = false; + scheduleReloadRocksdb(e); + LOGGER.error("manualCompaction Failed. {}, {}", this.dbPath, getStatusError(e)); + } finally { + release(); + LOGGER.info("manualCompaction End. {}, rt: {}(ms), result: {}", this.dbPath, System.currentTimeMillis() - s1, result); + } + } + + protected void manualCompaction(long minPhyOffset, final CompactRangeOptions compactRangeOptions) { + this.manualCompactionThread.submit(new Runnable() { + @Override + public void run() { + manualCompactionDefaultCfRange(compactRangeOptions); + } + }); + } + + protected void open(final List<ColumnFamilyDescriptor> cfDescriptors, + final List<ColumnFamilyHandle> cfHandles) throws RocksDBException { + if (this.readOnly) { + this.db = RocksDB.openReadOnly(this.options, this.dbPath, cfDescriptors, cfHandles); + } else { + this.db = RocksDB.open(this.options, this.dbPath, cfDescriptors, cfHandles); + } + this.db.getEnv().setBackgroundThreads(8, Priority.HIGH); + this.db.getEnv().setBackgroundThreads(8, Priority.LOW); + + if (this.db == null) { + throw new RocksDBException("open rocksdb null"); + } + } + + protected abstract boolean postLoad(); + + public synchronized boolean start() { + if (this.loaded) { + return true; + } + if (postLoad()) { + this.loaded = true; + LOGGER.info("start OK. {}", this.dbPath); + this.closed = false; + return true; + } else { + return false; + } + } + + protected abstract void preShutdown(); + + public synchronized boolean shutdown() { + try { + if (!this.loaded) { + return true; + } + + final FlushOptions flushOptions = new FlushOptions(); + flushOptions.setWaitForFlush(true); + try { + flush(flushOptions); + } finally { + flushOptions.close(); + } + this.db.cancelAllBackgroundWork(true); + this.db.pauseBackgroundWork(); + //The close order is matter. + //1. close column family handles + preShutdown(); + + this.defaultCFHandle.close(); + //2. close column family options. + for (final ColumnFamilyOptions opt : this.cfOptions) { + opt.close(); + } + //3. close options + if (this.writeOptions != null) { + this.writeOptions.close(); + } + if (this.ableWalWriteOptions != null) { + this.ableWalWriteOptions.close(); + } + if (this.readOptions != null) { + this.readOptions.close(); + } + if (this.totalOrderReadOptions != null) { + this.totalOrderReadOptions.close(); + } + if (this.options != null) { + this.options.close(); + } + //4. close db. + if (db != null) { + this.db.syncWal(); + this.db.closeE(); + } + //5. help gc. + this.cfOptions.clear(); + this.db = null; + this.readOptions = null; + this.totalOrderReadOptions = null; + this.writeOptions = null; + this.ableWalWriteOptions = null; + this.options = null; + + this.loaded = false; + LOGGER.info("shutdown OK. {}", this.dbPath); + } catch (Exception e) { + LOGGER.error("shutdown Failed. {}", this.dbPath, e); + return false; + } + return true; + } + + public void flush(final FlushOptions flushOptions) { + if (!this.loaded || this.readOnly || closed) { + return; + } + + try { + if (db != null) { + this.db.flush(flushOptions); + } + } catch (RocksDBException e) { + scheduleReloadRocksdb(e); + LOGGER.error("flush Failed. {}, {}", this.dbPath, getStatusError(e)); + } + } + + public Statistics getStatistics() { + return this.options.statistics(); + } + + public ColumnFamilyHandle getDefaultCFHandle() { + return defaultCFHandle; + } + + public List<LiveFileMetaData> getCompactionStatus() { + if (!hold()) { + return null; + } + try { + return this.db.getLiveFilesMetaData(); + } finally { + release(); + } + } + + private void scheduleReloadRocksdb(RocksDBException rocksDBException) { + if (rocksDBException == null || rocksDBException.getStatus() == null) { + return; + } + Status status = rocksDBException.getStatus(); + Status.Code code = status.getCode(); + // Status.Code.Incomplete == code + if (Status.Code.Aborted == code || Status.Code.Corruption == code || Status.Code.Undefined == code) { + LOGGER.error("scheduleReloadRocksdb. {}, {}", this.dbPath, getStatusError(rocksDBException)); + scheduleReloadRocksdb0(); + } + } + + private void scheduleReloadRocksdb0() { + if (!this.reloadPermit.tryAcquire()) { + return; + } + this.closed = true; + this.reloadScheduler.schedule(new Runnable() { + @Override + public void run() { + boolean result = true; + try { + reloadRocksdb(); + } catch (Exception e) { + result = false; + } finally { + reloadPermit.release(); + } + // try to reload rocksdb next time + if (!result) { + LOGGER.info("reload rocksdb Retry. {}", dbPath); + scheduleReloadRocksdb0(); + } + } + }, 10, TimeUnit.SECONDS); + } + + private void reloadRocksdb() throws Exception { + LOGGER.info("reload rocksdb Start. {}", this.dbPath); + if (!shutdown() || !start()) { + LOGGER.error("reload rocksdb Failed. {}", dbPath); + throw new Exception("reload rocksdb Error"); + } + LOGGER.info("reload rocksdb OK. {}", this.dbPath); + } + + public void flushWAL() throws RocksDBException { + this.db.flushWal(true); + } + + protected class WrappedRocksIterator { + private final RocksIterator iterator; + + public WrappedRocksIterator(final RocksIterator iterator) { + this.iterator = iterator; + } + + public byte[] key() { + return iterator.key(); + } + + public byte[] value() { + return iterator.value(); + } + + public void next() { + iterator.next(); + } + + public void prev() { + iterator.prev(); + } + + public void seek(byte[] target) { + iterator.seek(target); + } + + public void seekForPrev(byte[] target) { + iterator.seekForPrev(target); + } + + public void seekToFirst() { + iterator.seekToFirst(); + } + + public boolean isValid() { + return iterator.isValid(); + } + + public void close() { + iterator.close(); + } + } + + private String getStatusError(RocksDBException e) { + if (e == null || e.getStatus() == null) { + return "null"; + } + Status status = e.getStatus(); + StringBuilder sb = new StringBuilder(64); + sb.append("code: "); + if (status.getCode() != null) { + sb.append(status.getCode().name()); + } else { + sb.append("null"); + } + sb.append(", ").append("subCode: "); + if (status.getSubCode() != null) { + sb.append(status.getSubCode().name()); + } else { + sb.append("null"); + } + sb.append(", ").append("state: ").append(status.getState()); + return sb.toString(); + } + + public void statRocksdb(Logger logger) { + try { + + List<LiveFileMetaData> liveFileMetaDataList = this.getCompactionStatus(); + if (liveFileMetaDataList == null || liveFileMetaDataList.isEmpty()) { + return; + } + Map<Integer, StringBuilder> map = Maps.newHashMap(); + for (LiveFileMetaData metaData : liveFileMetaDataList) { + StringBuilder sb = map.get(metaData.level()); + if (sb == null) { + sb = new StringBuilder(256); + map.put(metaData.level(), sb); + } + sb.append(new String(metaData.columnFamilyName(), CHARSET_UTF8)).append(SPACE). + append(metaData.fileName()).append(SPACE). + append("s: ").append(metaData.size()).append(SPACE). + append("a: ").append(metaData.numEntries()).append(SPACE). + append("r: ").append(metaData.numReadsSampled()).append(SPACE). + append("d: ").append(metaData.numDeletions()).append(SPACE). + append(metaData.beingCompacted()).append("\n"); + } + for (Map.Entry<Integer, StringBuilder> entry : map.entrySet()) { + logger.info("level: {}\n{}", entry.getKey(), entry.getValue().toString()); + } + + String blockCacheMemUsage = this.db.getProperty("rocksdb.block-cache-usage"); + String indexesAndFilterBlockMemUsage = this.db.getProperty("rocksdb.estimate-table-readers-mem"); + String memTableMemUsage = this.db.getProperty("rocksdb.cur-size-all-mem-tables"); + String blocksPinnedByIteratorMemUsage = this.db.getProperty("rocksdb.block-cache-pinned-usage"); + logger.info("MemUsage. blockCache: {}, indexesAndFilterBlock: {}, memtable: {}, blocksPinnedByIterator: {}", + blockCacheMemUsage, indexesAndFilterBlockMemUsage, memTableMemUsage, blocksPinnedByIteratorMemUsage); + } catch (Exception ignored) { + } + } + + public int compareTo(byte[] v1, byte[] v2) { + int len1 = v1.length; + int len2 = v2.length; + int lim = Math.min(len1, len2); + + int k = 0; + while (k < lim) { + byte c1 = v1[k]; + byte c2 = v2[k]; + if (c1 != c2) { + return c1 - c2; + } + k++; + } + return len1 - len2; + } +} \ No newline at end of file diff --git a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java new file mode 100644 index 000000000..9d05ed282 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.config; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.UtilAll; +import org.rocksdb.BlockBasedTableConfig; +import org.rocksdb.BloomFilter; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompactRangeOptions; +import org.rocksdb.CompactRangeOptions.BottommostLevelCompaction; +import org.rocksdb.CompactionOptions; +import org.rocksdb.CompactionStyle; +import org.rocksdb.CompressionType; +import org.rocksdb.DBOptions; +import org.rocksdb.DataBlockIndexType; +import org.rocksdb.IndexType; +import org.rocksdb.InfoLogLevel; +import org.rocksdb.LRUCache; +import org.rocksdb.RateLimiter; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.SkipListMemTableConfig; +import org.rocksdb.Statistics; +import org.rocksdb.StatsLevel; +import org.rocksdb.StringAppendOperator; +import org.rocksdb.WALRecoveryMode; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; +import org.rocksdb.util.SizeUnit; + +public class ConfigRocksDBStorage extends AbstractRocksDBStorage { + + public ConfigRocksDBStorage(final String dbPath) { + super(); + this.dbPath = dbPath; + this.readOnly = false; + } + + private void initOptions() { + this.options = createConfigDBOptions(); + + this.writeOptions = new WriteOptions(); + this.writeOptions.setSync(false); + this.writeOptions.setDisableWAL(true); + this.writeOptions.setNoSlowdown(true); + + this.ableWalWriteOptions = new WriteOptions(); + this.ableWalWriteOptions.setSync(false); + this.ableWalWriteOptions.setDisableWAL(false); + this.ableWalWriteOptions.setNoSlowdown(true); + + this.readOptions = new ReadOptions(); + this.readOptions.setPrefixSameAsStart(true); + this.readOptions.setTotalOrderSeek(false); + this.readOptions.setTailing(false); + + this.totalOrderReadOptions = new ReadOptions(); + this.totalOrderReadOptions.setPrefixSameAsStart(false); + this.totalOrderReadOptions.setTotalOrderSeek(false); + this.totalOrderReadOptions.setTailing(false); + + this.compactRangeOptions = new CompactRangeOptions(); + this.compactRangeOptions.setBottommostLevelCompaction(BottommostLevelCompaction.kForce); + this.compactRangeOptions.setAllowWriteStall(true); + this.compactRangeOptions.setExclusiveManualCompaction(false); + this.compactRangeOptions.setChangeLevel(true); + this.compactRangeOptions.setTargetLevel(-1); + this.compactRangeOptions.setMaxSubcompactions(4); + + this.compactionOptions = new CompactionOptions(); + this.compactionOptions.setCompression(CompressionType.LZ4_COMPRESSION); + this.compactionOptions.setMaxSubcompactions(4); + this.compactionOptions.setOutputFileSizeLimit(4 * 1024 * 1024 * 1024L); + } + + @Override + protected boolean postLoad() { + try { + UtilAll.ensureDirOK(this.dbPath); + + initOptions(); + + final List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList(); + + ColumnFamilyOptions defaultOptions = createConfigOptions(); + this.cfOptions.add(defaultOptions); + cfDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, defaultOptions)); + + final List<ColumnFamilyHandle> cfHandles = new ArrayList(); + open(cfDescriptors, cfHandles); + + this.defaultCFHandle = cfHandles.get(0); + } catch (final Exception e) { + AbstractRocksDBStorage.LOGGER.error("postLoad Failed. {}", this.dbPath, e); + return false; + } + return true; + } + + @Override + protected void preShutdown() { + + } + + private ColumnFamilyOptions createConfigOptions() { + BlockBasedTableConfig blockBasedTableConfig = new BlockBasedTableConfig(). + setFormatVersion(5). + setIndexType(IndexType.kBinarySearch). + setDataBlockIndexType(DataBlockIndexType.kDataBlockBinarySearch). + setBlockSize(32 * SizeUnit.KB). + setFilterPolicy(new BloomFilter(16, false)). + // Indicating if we'd put index/filter blocks to the block cache. + setCacheIndexAndFilterBlocks(false). + setCacheIndexAndFilterBlocksWithHighPriority(true). + setPinL0FilterAndIndexBlocksInCache(false). + setPinTopLevelIndexAndFilter(true). + setBlockCache(new LRUCache(4 * SizeUnit.MB, 8, false)). + setWholeKeyFiltering(true); + + ColumnFamilyOptions options = new ColumnFamilyOptions(); + return options.setMaxWriteBufferNumber(2). + // MemTable size, memtable(cache) -> immutable memtable(cache) -> sst(disk) + setWriteBufferSize(8 * SizeUnit.MB). + setMinWriteBufferNumberToMerge(1). + setTableFormatConfig(blockBasedTableConfig). + setMemTableConfig(new SkipListMemTableConfig()). + setCompressionType(CompressionType.NO_COMPRESSION). + setNumLevels(7). + setCompactionStyle(CompactionStyle.LEVEL). + setLevel0FileNumCompactionTrigger(4). + setLevel0SlowdownWritesTrigger(8). + setLevel0StopWritesTrigger(12). + // The target file size for compaction. + setTargetFileSizeBase(64 * SizeUnit.MB). + setTargetFileSizeMultiplier(2). + // The upper-bound of the total size of L1 files in bytes + setMaxBytesForLevelBase(256 * SizeUnit.MB). + setMaxBytesForLevelMultiplier(2). + setMergeOperator(new StringAppendOperator()). + setInplaceUpdateSupport(true); + } + + private DBOptions createConfigDBOptions() { + //Turn based on https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide + // and http://gitlab.alibaba-inc.com/aloha/aloha/blob/branch_2_5_0/jstorm-core/src/main/java/com/alibaba/jstorm/cache/rocksdb/RocksDbOptionsFactory.java + DBOptions options = new DBOptions(); + Statistics statistics = new Statistics(); + statistics.setStatsLevel(StatsLevel.EXCEPT_DETAILED_TIMERS); + return options. + setDbLogDir(getDBLogDir()). + setInfoLogLevel(InfoLogLevel.INFO_LEVEL). + setWalRecoveryMode(WALRecoveryMode.SkipAnyCorruptedRecords). + setManualWalFlush(true). + setMaxTotalWalSize(500 * SizeUnit.MB). + setWalSizeLimitMB(0). + setWalTtlSeconds(0). + setCreateIfMissing(true). + setCreateMissingColumnFamilies(true). + setMaxOpenFiles(-1). + setMaxLogFileSize(1 * SizeUnit.GB). + setKeepLogFileNum(5). + setMaxManifestFileSize(1 * SizeUnit.GB). + setAllowConcurrentMemtableWrite(false). + setStatistics(statistics). + setStatsDumpPeriodSec(600). + setAtomicFlush(true). + setMaxBackgroundJobs(32). + setMaxSubcompactions(4). + setParanoidChecks(true). + setDelayedWriteRate(16 * SizeUnit.MB). + setRateLimiter(new RateLimiter(100 * SizeUnit.MB)). + setUseDirectIoForFlushAndCompaction(true). + setUseDirectReads(true); + } + + private static String getDBLogDir() { + String rootPath = System.getProperty("user.home"); + if (StringUtils.isEmpty(rootPath)) { + return ""; + } + rootPath = rootPath + File.separator + "logs"; + UtilAll.ensureDirOK(rootPath); + return rootPath + File.separator + "rocketmqlogs" + File.separator; + } + + public void put(final byte[] keyBytes, final int keyLen, final byte[] valueBytes) throws Exception { + put(this.defaultCFHandle, this.ableWalWriteOptions, keyBytes, keyLen, valueBytes, valueBytes.length); + } + + public void put(final ByteBuffer keyBB, final ByteBuffer valueBB) throws Exception { + put(this.defaultCFHandle, this.ableWalWriteOptions, keyBB, valueBB); + } + + public byte[] get(final byte[] keyBytes) throws Exception { + return get(this.defaultCFHandle, this.totalOrderReadOptions, keyBytes); + } + + public void delete(final byte[] keyBytes) throws Exception { + delete(this.defaultCFHandle, this.ableWalWriteOptions, keyBytes); + } + + public List<byte[]> multiGet(final List<ColumnFamilyHandle> cfhList, final List<byte[]> keys) throws + RocksDBException { + return multiGet(this.totalOrderReadOptions, cfhList, keys); + } + + public void batchPut(final WriteBatch batch) throws RocksDBException { + batchPut(this.writeOptions, batch); + } + + public void batchPutWithWal(final WriteBatch batch) throws RocksDBException { + batchPut(this.ableWalWriteOptions, batch); + } + + public RocksIterator iterator() { + return this.db.newIterator(this.defaultCFHandle, this.totalOrderReadOptions); + } + + public void rangeDelete(final byte[] startKey, final byte[] endKey) throws RocksDBException { + rangeDelete(this.defaultCFHandle, this.writeOptions, startKey, endKey); + } + + public RocksIterator iterator(ReadOptions readOptions) { + return this.db.newIterator(this.defaultCFHandle, readOptions); + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/config/RocksDBConfigManager.java b/common/src/main/java/org/apache/rocketmq/common/config/RocksDBConfigManager.java new file mode 100644 index 000000000..f958bbdf0 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/config/RocksDBConfigManager.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.config; + +import java.util.function.BiConsumer; + +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.rocksdb.FlushOptions; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteBatch; + +public class RocksDBConfigManager { + protected static final Logger BROKER_LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + + protected volatile boolean isStop = false; + protected ConfigRocksDBStorage configRocksDBStorage = null; + private FlushOptions flushOptions = null; + private volatile long lastFlushMemTableMicroSecond = 0; + private final long memTableFlushInterval; + + public RocksDBConfigManager(long memTableFlushInterval) { + this.memTableFlushInterval = memTableFlushInterval; + } + + public boolean load(String configFilePath, BiConsumer<byte[], byte[]> biConsumer) { + this.isStop = false; + this.configRocksDBStorage = new ConfigRocksDBStorage(configFilePath); + if (!this.configRocksDBStorage.start()) { + return false; + } + RocksIterator iterator = this.configRocksDBStorage.iterator(); + try { + iterator.seekToFirst(); + while (iterator.isValid()) { + biConsumer.accept(iterator.key(), iterator.value()); + iterator.next(); + } + } finally { + iterator.close(); + } + + this.flushOptions = new FlushOptions(); + this.flushOptions.setWaitForFlush(false); + this.flushOptions.setAllowWriteStall(false); + return true; + } + + public void start() { + } + + public boolean stop() { + this.isStop = true; + if (this.configRocksDBStorage != null) { + return this.configRocksDBStorage.shutdown(); + } + if (this.flushOptions != null) { + this.flushOptions.close(); + } + return true; + } + + public void flushWAL() { + try { + if (this.isStop) { + return; + } + if (this.configRocksDBStorage != null) { + this.configRocksDBStorage.flushWAL(); + + long now = System.currentTimeMillis(); + if (now > this.lastFlushMemTableMicroSecond + this.memTableFlushInterval) { + this.configRocksDBStorage.flush(this.flushOptions); + this.lastFlushMemTableMicroSecond = now; + } + } + } catch (Exception e) { + BROKER_LOG.error("kv flush WAL Failed.", e); + } + } + + public void put(final byte[] keyBytes, final int keyLen, final byte[] valueBytes) throws Exception { + this.configRocksDBStorage.put(keyBytes, keyLen, valueBytes); + } + + public void delete(final byte[] keyBytes) throws Exception { + this.configRocksDBStorage.delete(keyBytes); + } + + public void batchPutWithWal(final WriteBatch batch) throws Exception { + this.configRocksDBStorage.batchPutWithWal(batch); + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java index c1176ea15..cb04b00b3 100644 --- a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java +++ b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java @@ -51,4 +51,5 @@ public class LoggerName { public static final String PROXY_LOGGER_NAME = "RocketmqProxy"; public static final String PROXY_WATER_MARK_LOGGER_NAME = "RocketmqProxyWatermark"; public static final String ROCKETMQ_COLDCTR_LOGGER_NAME = "RocketmqColdCtr"; + public static final String ROCKSDB_LOGGER_NAME = "RocketmqRocksDB"; } diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java index b104016fb..41c9eedd9 100644 --- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java @@ -49,8 +49,7 @@ public class Consumer { * } * </pre> */ - // Uncomment the following line while debugging, namesrvAddr should be set to your local address -// consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR); + consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR); /* * Specify where to start in case the specific consumer group is a brand-new one. diff --git a/pom.xml b/pom.xml index 4d5dd1dec..3a08d75f2 100644 --- a/pom.xml +++ b/pom.xml @@ -137,6 +137,7 @@ <opentelemetry-exporter-prometheus.version>1.26.0-alpha</opentelemetry-exporter-prometheus.version> <jul-to-slf4j.version>2.0.6</jul-to-slf4j.version> <s3.version>2.20.29</s3.version> + <rocksdb.version>1.0.3</rocksdb.version> <jackson-databind.version>2.13.4.2</jackson-databind.version> <!-- Test dependencies --> @@ -711,6 +712,11 @@ <artifactId>slf4j-api</artifactId> <version>${slf4j-api.version}</version> </dependency> + <dependency> + <groupId>io.github.aliyunmq</groupId> + <artifactId>rocketmq-rocksdb</artifactId> + <version>${rocksdb.version}</version> + </dependency> <dependency> <groupId>io.github.aliyunmq</groupId> <artifactId>rocketmq-shaded-slf4j-api-bridge</artifactId> diff --git a/remoting/BUILD.bazel b/remoting/BUILD.bazel index e3e1bce3b..db8b24301 100644 --- a/remoting/BUILD.bazel +++ b/remoting/BUILD.bazel @@ -38,6 +38,7 @@ java_library( "@maven//:org_apache_commons_commons_lang3", "@maven//:io_github_aliyunmq_rocketmq_slf4j_api", "@maven//:io_github_aliyunmq_rocketmq_logback_classic", + "@maven//:commons_collections_commons_collections", ], ) diff --git a/store/src/main/java/org/apache/rocketmq/store/StoreType.java b/store/src/main/java/org/apache/rocketmq/store/StoreType.java new file mode 100644 index 000000000..4f9c4d0e4 --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/StoreType.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store; + +public enum StoreType { + DEFAULT("default"), + DEFAULT_ROCKSDB("defaultRocksDB"); + + private String storeType; + + StoreType(String storeType) { + this.storeType = storeType; + } + + public String getStoreType() { + return storeType; + } +} diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 4f204d742..efb728ac0 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -20,6 +20,7 @@ import java.io.File; import org.apache.rocketmq.common.annotation.ImportantField; import org.apache.rocketmq.store.ConsumeQueue; +import org.apache.rocketmq.store.StoreType; import org.apache.rocketmq.store.queue.BatchConsumeQueue; public class MessageStoreConfig { @@ -102,6 +103,9 @@ public class MessageStoreConfig { private int timerMetricSmallThreshold = 1000000; private int timerProgressLogIntervalMs = 10 * 1000; + // default, defaultRocksDB + @ImportantField + private String storeType = StoreType.DEFAULT.getStoreType(); // ConsumeQueue file size,default is 30W private int mappedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQ_STORE_UNIT_SIZE; // enable consume queue ext @@ -392,6 +396,11 @@ public class MessageStoreConfig { private int batchDispatchRequestThreadPoolNums = 16; + // rocksdb mode + private boolean realTimePersistRocksDBConfig = true; + private long memTableFlushInterval = 60 * 60 * 1000L; + private boolean enableRocksDBLog = false; + public boolean isDebugLockEnable() { return debugLockEnable; } @@ -488,6 +497,14 @@ public class MessageStoreConfig { this.mappedFileSizeCommitLog = mappedFileSizeCommitLog; } + public String getStoreType() { + return storeType; + } + + public void setStoreType(String storeType) { + this.storeType = storeType; + } + public int getMappedFileSizeConsumeQueue() { int factor = (int) Math.ceil(this.mappedFileSizeConsumeQueue / (ConsumeQueue.CQ_STORE_UNIT_SIZE * 1.0)); @@ -1710,4 +1727,28 @@ public class MessageStoreConfig { public void setBatchDispatchRequestThreadPoolNums(int batchDispatchRequestThreadPoolNums) { this.batchDispatchRequestThreadPoolNums = batchDispatchRequestThreadPoolNums; } + + public boolean isRealTimePersistRocksDBConfig() { + return realTimePersistRocksDBConfig; + } + + public void setRealTimePersistRocksDBConfig(boolean realTimePersistRocksDBConfig) { + this.realTimePersistRocksDBConfig = realTimePersistRocksDBConfig; + } + + public long getMemTableFlushInterval() { + return memTableFlushInterval; + } + + public void setMemTableFlushInterval(long memTableFlushInterval) { + this.memTableFlushInterval = memTableFlushInterval; + } + + public boolean isEnableRocksDBLog() { + return enableRocksDBLog; + } + + public void setEnableRocksDBLog(boolean enableRocksDBLog) { + this.enableRocksDBLog = enableRocksDBLog; + } } diff --git a/test/BUILD.bazel b/test/BUILD.bazel index 058532df7..5df71200c 100644 --- a/test/BUILD.bazel +++ b/test/BUILD.bazel @@ -128,6 +128,9 @@ GenTestRules( "src/test/java/org/apache/rocketmq/test/delay/NormalMsgDelayIT", "src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByIdExceptionIT", ], + flaky_tests = [ + "src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByKeyIT", + ], test_files = glob(["src/test/java/**/*IT.java"]), deps = [ ":tests", diff --git a/tieredstore/BUILD.bazel b/tieredstore/BUILD.bazel index bc7d8f938..5b3885a4e 100644 --- a/tieredstore/BUILD.bazel +++ b/tieredstore/BUILD.bazel @@ -66,6 +66,7 @@ java_library( "@maven//:com_google_guava_guava", "@maven//:io_github_aliyunmq_rocketmq_slf4j_api", "@maven//:io_github_aliyunmq_rocketmq_shaded_slf4j_api_bridge", + "@maven//:net_java_dev_jna_jna", ], ) -- 2.32.0.windows.2 From 6bc2c8474a0ce1e2833c82dffea7b1d8f718fcd7 Mon Sep 17 00:00:00 2001 From: rongtong <jinrongtong5@163.com> Date: Wed, 9 Aug 2023 16:11:37 +0800 Subject: [PATCH 2/4] [ISSUE #7135] Temporarily ignoring plainAccessValidator test (#7135) --- .../rocketmq/acl/plain/PlainAccessControlFlowTest.java | 5 +++++ .../apache/rocketmq/acl/plain/PlainAccessValidatorTest.java | 3 +++ .../rocketmq/acl/plain/PlainPermissionManagerTest.java | 3 +++ 3 files changed, 11 insertions(+) diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessControlFlowTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessControlFlowTest.java index 519345714..e7fd0932f 100644 --- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessControlFlowTest.java +++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessControlFlowTest.java @@ -31,6 +31,7 @@ import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; import java.io.File; @@ -43,6 +44,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; + /** * <p> In this class, we'll test the following scenarios, each containing several consecutive operations on ACL, * <p> like updating and deleting ACL, changing config files and checking validations. @@ -50,6 +52,9 @@ import java.util.List; * <p> Case 2: Only conf/acl/plain_acl.yml exists; * <p> Case 3: Both conf/plain_acl.yml and conf/acl/plain_acl.yml exists. */ + +// Ignore this test case as it is currently unable to pass on ubuntu workflow +@Ignore public class PlainAccessControlFlowTest { public static final String DEFAULT_TOPIC = "topic-acl"; diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java index ef0cffbdc..a3a925758 100644 --- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java +++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java @@ -56,8 +56,11 @@ import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; +// Ignore this test case as it is currently unable to pass on ubuntu workflow +@Ignore public class PlainAccessValidatorTest { private PlainAccessValidator plainAccessValidator; diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java index 941d8c779..aa7539f3a 100644 --- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java +++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java @@ -29,6 +29,7 @@ import org.assertj.core.api.Assertions; import org.assertj.core.util.Lists; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import java.io.File; @@ -41,6 +42,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +// Ignore this test case as it is currently unable to pass on ubuntu workflow +@Ignore public class PlainPermissionManagerTest { PlainPermissionManager plainPermissionManager; -- 2.32.0.windows.2 From 04683ec05808d63f742f8702a9bd3a2fb846c154 Mon Sep 17 00:00:00 2001 From: lk <xdkxlk@outlook.com> Date: Wed, 9 Aug 2023 19:08:33 +0800 Subject: [PATCH 3/4] [ISSUE 7117] check message is in memory or not when init consumer offset for pop (#7118) --- .../broker/processor/AckMessageProcessor.java | 1 - .../broker/processor/PopMessageProcessor.java | 40 ++++++++++++------- .../apache/rocketmq/common/BrokerConfig.java | 9 +++++ .../service/route/TopicRouteService.java | 2 +- 4 files changed, 36 insertions(+), 16 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java index 2140aa881..687811409 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java @@ -308,7 +308,6 @@ public class AckMessageProcessor implements NettyRequestProcessor { && putMessageResult.getPutMessageStatus() != PutMessageStatus.SLAVE_NOT_AVAILABLE) { POP_LOGGER.error("put ack msg error:" + putMessageResult); } - System.out.printf("put ack to store %s", ackMsg); PopMetricsManager.incPopReviveAckPutCount(ackMsg, putMessageResult.getPutMessageStatus()); brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, ackCount); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index 53e172561..441f7de08 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -639,20 +639,7 @@ public class PopMessageProcessor implements NettyRequestProcessor { long offset = this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, queueId); if (offset < 0) { - if (ConsumeInitMode.MIN == initMode) { - offset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId); - } else { - // pop last one,then commit offset. - offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - 1; - // max & no consumer offset - if (offset < 0) { - offset = 0; - } - if (init) { - this.brokerController.getConsumerOffsetManager().commitOffset( - "getPopOffset", group, topic, queueId, offset); - } - } + offset = this.getInitOffset(topic, group, queueId, initMode, init); } if (checkResetOffset) { @@ -670,6 +657,31 @@ public class PopMessageProcessor implements NettyRequestProcessor { } } + private long getInitOffset(String topic, String group, int queueId, int initMode, boolean init) { + long offset; + if (ConsumeInitMode.MIN == initMode) { + return this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId); + } else { + if (this.brokerController.getBrokerConfig().isInitPopOffsetByCheckMsgInMem() && + this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId) <= 0 && + this.brokerController.getMessageStore().checkInMemByConsumeOffset(topic, queueId, 0, 1)) { + offset = 0; + } else { + // pop last one,then commit offset. + offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - 1; + // max & no consumer offset + if (offset < 0) { + offset = 0; + } + } + if (init) { + this.brokerController.getConsumerOffsetManager().commitOffset( + "getPopOffset", group, topic, queueId, offset); + } + } + return offset; + } + public final MessageExtBrokerInner buildCkMsg(final PopCheckPoint ck, final int reviveQid) { MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 02c692e2b..a815636b1 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -222,6 +222,7 @@ public class BrokerConfig extends BrokerIdentity { private int popCkOffsetMaxQueueSize = 20000; private boolean enablePopBatchAck = false; private boolean enableNotifyAfterPopOrderLockRelease = true; + private boolean initPopOffsetByCheckMsgInMem = true; private boolean realTimeNotifyConsumerChange = true; @@ -1264,6 +1265,14 @@ public class BrokerConfig extends BrokerIdentity { this.enableNotifyAfterPopOrderLockRelease = enableNotifyAfterPopOrderLockRelease; } + public boolean isInitPopOffsetByCheckMsgInMem() { + return initPopOffsetByCheckMsgInMem; + } + + public void setInitPopOffsetByCheckMsgInMem(boolean initPopOffsetByCheckMsgInMem) { + this.initPopOffsetByCheckMsgInMem = initPopOffsetByCheckMsgInMem; + } + public boolean isRealTimeNotifyConsumerChange() { return realTimeNotifyConsumerChange; } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java index b6b14faa4..e012a5465 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java @@ -133,7 +133,7 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown { protected MessageQueueView buildMessageQueueView(String topic, TopicRouteData topicRouteData) { if (isTopicRouteValid(topicRouteData)) { MessageQueueView tmp = new MessageQueueView(topic, topicRouteData); - log.info("load topic route from namesrv. topic: {}, queue: {}", topic, tmp); + log.debug("load topic route from namesrv. topic: {}, queue: {}", topic, tmp); return tmp; } return MessageQueueView.WRAPPED_EMPTY_QUEUE; -- 2.32.0.windows.2 From bcba5a8e628e35086c699852388990ba8a4bdcf8 Mon Sep 17 00:00:00 2001 From: rongtong <jinrongtong5@163.com> Date: Thu, 10 Aug 2023 10:19:34 +0800 Subject: [PATCH 4/4] [ISSUE #7146] Log output error needs to be corrected (#7147) --- .../org/apache/rocketmq/broker/out/BrokerOuterAPI.java | 8 ++++---- .../org/apache/rocketmq/example/quickstart/Consumer.java | 3 ++- .../org/apache/rocketmq/example/quickstart/Producer.java | 2 +- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index 1793a83c0..ae81e8b11 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -654,9 +654,9 @@ public class BrokerOuterAPI { try { RemotingCommand response = BrokerOuterAPI.this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills); assert response != null; - LOGGER.info("Register single topic %s to broker %s with response code %s", topic, brokerName, response.getCode()); + LOGGER.info("Register single topic {} to broker {} with response code {}", topic, brokerName, response.getCode()); } catch (Exception e) { - LOGGER.warn(String.format("Register single topic %s to broker %s exception", topic, brokerName), e); + LOGGER.warn("Register single topic {} to broker {} exception", topic, brokerName, e); } finally { countDownLatch.countDown(); } @@ -722,10 +722,10 @@ public class BrokerOuterAPI { default: break; } - LOGGER.warn("Query data version from name server {} OK, changed {}, broker {},name server {}", namesrvAddr, changed, topicConfigWrapper.getDataVersion(), nameServerDataVersion == null ? "" : nameServerDataVersion); + LOGGER.warn("Query data version from name server {} OK, changed {}, broker {}, name server {}", namesrvAddr, changed, topicConfigWrapper.getDataVersion(), nameServerDataVersion == null ? "" : nameServerDataVersion); } catch (Exception e) { changedList.add(Boolean.TRUE); - LOGGER.error("Query data version from name server {} Exception, {}", namesrvAddr, e); + LOGGER.error("Query data version from name server {} exception", namesrvAddr, e); } finally { countDownLatch.countDown(); } diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java index 41c9eedd9..3a101bf66 100644 --- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java @@ -49,7 +49,8 @@ public class Consumer { * } * </pre> */ - consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR); + // Uncomment the following line while debugging, namesrvAddr should be set to your local address + // consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR); /* * Specify where to start in case the specific consumer group is a brand-new one. diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java index 2c67e463e..aac295030 100644 --- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java @@ -54,7 +54,7 @@ public class Producer { * </pre> */ // Uncomment the following line while debugging, namesrvAddr should be set to your local address - producer.setNamesrvAddr(DEFAULT_NAMESRVADDR); + // producer.setNamesrvAddr(DEFAULT_NAMESRVADDR); /* * Launch the instance. -- 2.32.0.windows.2
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.
浙ICP备2022010568号-2