Projects
Eulaceura:Factory
rocketmq
_service:obs_scm:patch043-backport-fix-some-bug...
Sign Up
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File _service:obs_scm:patch043-backport-fix-some-bugs.patch of Package rocketmq
From c2c29c2435e0626cfe4f49830fbdc0d9421d82b5 Mon Sep 17 00:00:00 2001 From: lizhimins <707364882@qq.com> Date: Mon, 4 Dec 2023 16:13:07 +0800 Subject: [PATCH 1/2] [ISSUE #7545] Fix set mapped file to null cause file can not destroy (#7612) --- .../rocketmq/tieredstore/index/IndexStoreFile.java | 2 -- .../rocketmq/tieredstore/index/IndexStoreService.java | 10 ++++++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java index 52a686f68..def5c8f2d 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java @@ -457,11 +457,9 @@ public class IndexStoreFile implements IndexFile { this.fileStatus.set(IndexStatusEnum.SHUTDOWN); if (this.mappedFile != null) { this.mappedFile.shutdown(TimeUnit.SECONDS.toMillis(10)); - this.mappedFile = null; } if (this.compactMappedFile != null) { this.compactMappedFile.shutdown(TimeUnit.SECONDS.toMillis(10)); - this.compactMappedFile = null; } } catch (Exception e) { log.error("IndexStoreFile shutdown failed, timestamp: {}, status: {}", this.getTimestamp(), fileStatus.get(), e); diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java index 14608aa58..e99ea0de1 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java @@ -37,6 +37,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.ServiceThread; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.store.logfile.DefaultMappedFile; @@ -101,6 +102,10 @@ public class IndexStoreService extends ServiceThread implements IndexService { private void recover() { Stopwatch stopwatch = Stopwatch.createStarted(); + // delete compact file directory + UtilAll.deleteFile(new File(Paths.get(storeConfig.getStorePathRootDir(), + FILE_DIRECTORY_NAME, FILE_COMPACTED_DIRECTORY_NAME).toString())); + // recover local File dir = new File(Paths.get(storeConfig.getStorePathRootDir(), FILE_DIRECTORY_NAME).toString()); this.doConvertOldFormatFile(Paths.get(dir.getPath(), "0000").toString()); @@ -141,6 +146,10 @@ public class IndexStoreService extends ServiceThread implements IndexService { for (TieredFileSegment fileSegment : flatFile.getFileSegmentList()) { IndexFile indexFile = new IndexStoreFile(storeConfig, fileSegment); + IndexFile localFile = timeStoreTable.get(indexFile.getTimestamp()); + if (localFile != null) { + localFile.destroy(); + } timeStoreTable.put(indexFile.getTimestamp(), indexFile); log.info("IndexStoreService recover load remote file, timestamp: {}", indexFile.getTimestamp()); } @@ -248,6 +257,7 @@ public class IndexStoreService extends ServiceThread implements IndexService { if (IndexFile.IndexStatusEnum.UPLOAD.equals(indexFile.getFileStatus())) { log.error("IndexStoreService file status not correct, so skip, timestamp: {}, status: {}", indexFile.getTimestamp(), indexFile.getFileStatus()); + indexFile.destroy(); return; } -- 2.32.0.windows.2 From faae64715d917bb5d64b8d72581172d26ebe9501 Mon Sep 17 00:00:00 2001 From: gaoyf <gaoyf@users.noreply.github.com> Date: Thu, 7 Dec 2023 11:25:22 +0800 Subject: [PATCH 2/2] [ISSUE #7601] Fix slave acting master bug (#7603) * fix NullPointerException when message escape to remote * fix NumberFormatException when message retry to escape to remote * fix timerCheckPoint of the master is not updated, causing the timer message to be replayed after master is restarted * Use properties copies instead of referencing the same map when converting message --- .../org/apache/rocketmq/broker/BrokerController.java | 1 + .../rocketmq/broker/slave/SlaveSynchronize.java | 4 +++- .../rocketmq/common/message/MessageAccessor.java | 7 +++++++ .../rocketmq/store/timer/TimerMessageStore.java | 12 +++++++++--- 4 files changed, 20 insertions(+), 4 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 9f1fd0ad0..8d29d4438 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -2108,6 +2108,7 @@ public class BrokerController { isScheduleServiceStart = shouldStart; if (timerMessageStore != null) { + timerMessageStore.syncLastReadTimeMs(); timerMessageStore.setShouldRunningDequeue(shouldStart); } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java index 53cdecdf8..7f802adb9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java @@ -215,11 +215,13 @@ public class SlaveSynchronize { String masterAddrBak = this.masterAddr; if (masterAddrBak != null) { try { - if (null != brokerController.getMessageStore().getTimerMessageStore()) { + if (null != brokerController.getMessageStore().getTimerMessageStore() && + !brokerController.getTimerMessageStore().isShouldRunningDequeue()) { TimerCheckpoint checkpoint = this.brokerController.getBrokerOuterAPI().getTimerCheckPoint(masterAddrBak); if (null != this.brokerController.getTimerCheckpoint()) { this.brokerController.getTimerCheckpoint().setLastReadTimeMs(checkpoint.getLastReadTimeMs()); this.brokerController.getTimerCheckpoint().setMasterTimerQueueOffset(checkpoint.getMasterTimerQueueOffset()); + this.brokerController.getTimerCheckpoint().getDataVersion().assignNewOne(checkpoint.getDataVersion()); } } } catch (Exception e) { diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java index 1b7e2bba3..62e3bbd7e 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.common.message; +import java.util.HashMap; import java.util.Map; public class MessageAccessor { @@ -96,4 +97,10 @@ public class MessageAccessor { return newMsg; } + public static Map<String, String> deepCopyProperties(Map<String, String> properties) { + if (properties == null) { + return null; + } + return new HashMap<>(properties); + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java index d796e4467..872cd7105 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java @@ -602,6 +602,10 @@ public class TimerMessageStore { this.shouldRunningDequeue = shouldRunningDequeue; } + public boolean isShouldRunningDequeue() { + return shouldRunningDequeue; + } + public void addMetric(MessageExt msg, int value) { try { if (null == msg || null == msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC)) { @@ -1084,8 +1088,10 @@ public class TimerMessageStore { case PUT_OK: if (brokerStatsManager != null) { this.brokerStatsManager.incTopicPutNums(message.getTopic(), 1, 1); - this.brokerStatsManager.incTopicPutSize(message.getTopic(), - putMessageResult.getAppendMessageResult().getWroteBytes()); + if (putMessageResult.getAppendMessageResult() != null) { + this.brokerStatsManager.incTopicPutSize(message.getTopic(), + putMessageResult.getAppendMessageResult().getWroteBytes()); + } this.brokerStatsManager.incBrokerPutNums(message.getTopic(), 1); } return PUT_OK; @@ -1119,7 +1125,7 @@ public class TimerMessageStore { MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setBody(msgExt.getBody()); msgInner.setFlag(msgExt.getFlag()); - MessageAccessor.setProperties(msgInner, msgExt.getProperties()); + MessageAccessor.setProperties(msgInner, MessageAccessor.deepCopyProperties(msgExt.getProperties())); TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag()); long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags()); -- 2.32.0.windows.2
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.
浙ICP备2022010568号-2