Projects
Eulaceura:Factory
rocketmq
_service:obs_scm:patch015-backport-fix-some-bug...
Sign Up
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File _service:obs_scm:patch015-backport-fix-some-bugs.patch of Package rocketmq
From bd0e9c09db9748f7f74a0c707579142dccf30afc Mon Sep 17 00:00:00 2001 From: PiteXChen <44110731+RapperCL@users.noreply.github.com> Date: Tue, 29 Aug 2023 19:39:27 +0800 Subject: [PATCH 1/7] [ISSUE #7111] Remove responseFuture from the responseTable when exception occurs (#7112) * remove responseFuture when exception * Empty-Commit --------- Co-authored-by: chenyong152 <chenyong152@midea.com> --- .../apache/rocketmq/remoting/netty/NettyRemotingAbstract.java | 1 + 1 file changed, 1 insertion(+) diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index 44d6a3df4..fce2de267 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -529,6 +529,7 @@ public abstract class NettyRemotingAbstract { log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel)); }); } catch (Exception e) { + responseTable.remove(opaque); responseFuture.release(); log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e); throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); -- 2.32.0.windows.2 From c78061bf6ca5f35452510ec4107c46735c51c316 Mon Sep 17 00:00:00 2001 From: lizhimins <707364882@qq.com> Date: Wed, 30 Aug 2023 22:29:51 +0800 Subject: [PATCH 2/7] [ISSUE#7280] Fix and refactor handle commit exception in tiered storage (#7281) * refactor handle commit exception * refactor handle commit exception * fix handle commit exception --- .../tieredstore/TieredDispatcher.java | 3 +- .../tieredstore/TieredMessageFetcher.java | 57 ++-- .../tieredstore/TieredMessageStore.java | 26 +- .../provider/TieredFileSegment.java | 291 ++++++++++-------- .../provider/TieredStoreProvider.java | 8 +- .../provider/posix/PosixFileSegment.java | 4 +- .../CommitLogInputStream.java} | 30 +- .../FileSegmentInputStream.java} | 49 ++- .../FileSegmentInputStreamFactory.java} | 26 +- .../tieredstore/TieredMessageStoreTest.java | 14 +- .../tieredstore/file/TieredFlatFileTest.java | 2 + .../tieredstore/file/TieredIndexFileTest.java | 2 + ...m.java => MockFileSegmentInputStream.java} | 8 +- .../TieredFileSegmentInputStreamTest.java | 24 +- .../provider/TieredFileSegmentTest.java | 89 +++++- .../provider/memory/MemoryFileSegment.java | 27 +- .../memory/MemoryFileSegmentWithoutCheck.java | 4 +- 17 files changed, 427 insertions(+), 237 deletions(-) rename tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/{inputstream/TieredCommitLogInputStream.java => stream/CommitLogInputStream.java} (88%) rename tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/{inputstream/TieredFileSegmentInputStream.java => stream/FileSegmentInputStream.java} (77%) rename tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/{inputstream/TieredFileSegmentInputStreamFactory.java => stream/FileSegmentInputStreamFactory.java} (54%) rename tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/{MockTieredFileSegmentInputStream.java => MockFileSegmentInputStream.java} (82%) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java index 1746190cd..430c2b62e 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java @@ -318,8 +318,7 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch continue; case FILE_CLOSED: tieredFlatFileManager.destroyCompositeFile(flatFile.getMessageQueue()); - logger.info("TieredDispatcher#dispatchFlatFile: file has been close and destroy, " + - "topic: {}, queueId: {}", topic, queueId); + logger.info("File has been closed and destroy, topic: {}, queueId: {}", topic, queueId); return; default: dispatchOffset--; diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java index 9a9a3e5a5..766ff64f6 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java @@ -273,15 +273,17 @@ public class TieredMessageFetcher implements MessageStoreFetcher { TieredStoreMetricsManager.cacheHit.add(resultWrapperList.size(), attributes); } - // if no cached message found and there is currently an inflight request, wait for the request to end before continuing + // If there are no messages in the cache and there are currently requests being pulled. + // We need to wait for the request to return before continuing. if (resultWrapperList.isEmpty() && waitInflightRequest) { - CompletableFuture<Long> future = flatFile.getInflightRequest(group, queueOffset, maxCount) - .getFuture(queueOffset); + CompletableFuture<Long> future = + flatFile.getInflightRequest(group, queueOffset, maxCount).getFuture(queueOffset); if (!future.isDone()) { Stopwatch stopwatch = Stopwatch.createStarted(); // to prevent starvation issues, only allow waiting for inflight request once return future.thenCompose(v -> { - LOGGER.debug("TieredMessageFetcher#getMessageFromCacheAsync: wait for inflight request cost: {}ms", stopwatch.elapsed(TimeUnit.MILLISECONDS)); + LOGGER.debug("MessageFetcher#getMessageFromCacheAsync: wait for response cost: {}ms", + stopwatch.elapsed(TimeUnit.MILLISECONDS)); return getMessageFromCacheAsync(flatFile, group, queueOffset, maxCount, false); }); } @@ -302,7 +304,8 @@ public class TieredMessageFetcher implements MessageStoreFetcher { // if cache hit, result will be returned immediately and asynchronously prefetch messages for later requests if (!resultWrapperList.isEmpty()) { - LOGGER.debug("TieredMessageFetcher#getMessageFromCacheAsync: cache hit: topic: {}, queue: {}, queue offset: {}, max message num: {}, cache hit num: {}", + LOGGER.debug("MessageFetcher#getMessageFromCacheAsync: cache hit: " + + "topic: {}, queue: {}, queue offset: {}, max message num: {}, cache hit num: {}", mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, resultWrapperList.size()); prefetchMessage(flatFile, group, maxCount, lastGetOffset + 1); @@ -316,8 +319,10 @@ public class TieredMessageFetcher implements MessageStoreFetcher { } // if cache is miss, immediately pull messages - LOGGER.warn("TieredMessageFetcher#getMessageFromCacheAsync: cache miss: topic: {}, queue: {}, queue offset: {}, max message num: {}", + LOGGER.warn("TieredMessageFetcher#getMessageFromCacheAsync: cache miss: " + + "topic: {}, queue: {}, queue offset: {}, max message num: {}", mq.getTopic(), mq.getQueueId(), queueOffset, maxCount); + CompletableFuture<GetMessageResult> resultFuture; synchronized (flatFile) { int batchSize = maxCount * storeConfig.getReadAheadMinFactor(); @@ -453,42 +458,42 @@ public class TieredMessageFetcher implements MessageStoreFetcher { public CompletableFuture<GetMessageResult> getMessageAsync( String group, String topic, int queueId, long queueOffset, int maxCount, final MessageFilter messageFilter) { + GetMessageResult result = new GetMessageResult(); CompositeQueueFlatFile flatFile = flatFileManager.getFlatFile(new MessageQueue(topic, brokerName, queueId)); + if (flatFile == null) { - GetMessageResult result = new GetMessageResult(); result.setNextBeginOffset(queueOffset); result.setStatus(GetMessageStatus.NO_MATCHED_LOGIC_QUEUE); return CompletableFuture.completedFuture(result); } - GetMessageResult result = new GetMessageResult(); - long minQueueOffset = flatFile.getConsumeQueueMinOffset(); - long maxQueueOffset = flatFile.getConsumeQueueCommitOffset(); - result.setMinOffset(minQueueOffset); - result.setMaxOffset(maxQueueOffset); + // Max queue offset means next message put position + result.setMinOffset(flatFile.getConsumeQueueMinOffset()); + result.setMaxOffset(flatFile.getConsumeQueueCommitOffset()); + + // Fill result according file offset. + // Offset range | Result | Fix to + // (-oo, 0] | no message | current offset + // (0, min) | too small | min offset + // [min, max) | correct | + // [max, max] | overflow one | max offset + // (max, +oo) | overflow badly | max offset - if (flatFile.getConsumeQueueCommitOffset() <= 0) { + if (result.getMaxOffset() <= 0) { result.setStatus(GetMessageStatus.NO_MESSAGE_IN_QUEUE); result.setNextBeginOffset(queueOffset); return CompletableFuture.completedFuture(result); - } - - // request range | result - // (0, min) | too small - // [min, max) | correct - // [max, max] | overflow one - // (max, +oo) | overflow badly - if (queueOffset < minQueueOffset) { + } else if (queueOffset < result.getMinOffset()) { result.setStatus(GetMessageStatus.OFFSET_TOO_SMALL); - result.setNextBeginOffset(flatFile.getConsumeQueueMinOffset()); + result.setNextBeginOffset(result.getMinOffset()); return CompletableFuture.completedFuture(result); - } else if (queueOffset == maxQueueOffset) { + } else if (queueOffset == result.getMaxOffset()) { result.setStatus(GetMessageStatus.OFFSET_OVERFLOW_ONE); - result.setNextBeginOffset(flatFile.getConsumeQueueCommitOffset()); + result.setNextBeginOffset(result.getMaxOffset()); return CompletableFuture.completedFuture(result); - } else if (queueOffset > maxQueueOffset) { + } else if (queueOffset > result.getMaxOffset()) { result.setStatus(GetMessageStatus.OFFSET_OVERFLOW_BADLY); - result.setNextBeginOffset(flatFile.getConsumeQueueCommitOffset()); + result.setNextBeginOffset(result.getMaxOffset()); return CompletableFuture.completedFuture(result); } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java index 5240ac8e9..78e855f36 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java @@ -99,11 +99,11 @@ public class TieredMessageStore extends AbstractPluginMessageStore { return storeConfig; } - public boolean viaTieredStorage(String topic, int queueId, long offset) { - return viaTieredStorage(topic, queueId, offset, 1); + public boolean fetchFromCurrentStore(String topic, int queueId, long offset) { + return fetchFromCurrentStore(topic, queueId, offset, 1); } - public boolean viaTieredStorage(String topic, int queueId, long offset, int batchSize) { + public boolean fetchFromCurrentStore(String topic, int queueId, long offset, int batchSize) { TieredMessageStoreConfig.TieredStorageLevel deepStorageLevel = storeConfig.getTieredStorageLevel(); if (deepStorageLevel.check(TieredMessageStoreConfig.TieredStorageLevel.FORCE)) { @@ -146,8 +146,10 @@ public class TieredMessageStore extends AbstractPluginMessageStore { public CompletableFuture<GetMessageResult> getMessageAsync(String group, String topic, int queueId, long offset, int maxMsgNums, MessageFilter messageFilter) { - if (!viaTieredStorage(topic, queueId, offset, maxMsgNums)) { - logger.trace("GetMessageAsync from next store topic: {}, queue: {}, offset: {}", topic, queueId, offset); + if (fetchFromCurrentStore(topic, queueId, offset, maxMsgNums)) { + logger.trace("GetMessageAsync from current store, topic: {}, queue: {}, offset: {}", topic, queueId, offset); + } else { + logger.trace("GetMessageAsync from next store, topic: {}, queue: {}, offset: {}", topic, queueId, offset); return next.getMessageAsync(group, topic, queueId, offset, maxMsgNums, messageFilter); } @@ -168,14 +170,14 @@ public class TieredMessageStore extends AbstractPluginMessageStore { if (next.checkInStoreByConsumeOffset(topic, queueId, offset)) { TieredStoreMetricsManager.fallbackTotal.add(1, latencyAttributes); - logger.debug("GetMessageAsync not found then try back to next store, result: {}, " + + logger.debug("GetMessageAsync not found, then back to next store, result: {}, " + "topic: {}, queue: {}, queue offset: {}, offset range: {}-{}", result.getStatus(), topic, queueId, offset, result.getMinOffset(), result.getMaxOffset()); return next.getMessage(group, topic, queueId, offset, maxMsgNums, messageFilter); } } - // system topic + // Fetch system topic data from the broker when using the force level. if (result.getStatus() == GetMessageStatus.NO_MATCHED_LOGIC_QUEUE) { if (TieredStoreUtil.isSystemTopic(topic) || PopAckConstants.isStartWithRevivePrefix(topic)) { return next.getMessage(group, topic, queueId, offset, maxMsgNums, messageFilter); @@ -198,7 +200,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore { TieredStoreMetricsManager.messagesOutTotal.add(result.getMessageCount(), messagesOutAttributes); } - // fix min or max offset according next store + // Fix min or max offset according next store at last long minOffsetInQueue = next.getMinOffsetInQueue(topic, queueId); if (minOffsetInQueue >= 0 && minOffsetInQueue < result.getMinOffset()) { result.setMinOffset(minOffsetInQueue); @@ -209,7 +211,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore { } return result; }).exceptionally(e -> { - logger.error("GetMessageAsync from tiered store failed: ", e); + logger.error("GetMessageAsync from tiered store failed", e); return next.getMessage(group, topic, queueId, offset, maxMsgNums, messageFilter); }); } @@ -251,7 +253,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore { .build(); TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), latencyAttributes); if (time < 0) { - logger.debug("TieredMessageStore#getEarliestMessageTimeAsync: get earliest message time failed, try to get earliest message time from next store: topic: {}, queue: {}", + logger.debug("GetEarliestMessageTimeAsync failed, try to get earliest message time from next store: topic: {}, queue: {}", topic, queueId); return finalNextEarliestMessageTime != Long.MAX_VALUE ? finalNextEarliestMessageTime : -1; } @@ -262,7 +264,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore { @Override public CompletableFuture<Long> getMessageStoreTimeStampAsync(String topic, int queueId, long consumeQueueOffset) { - if (viaTieredStorage(topic, queueId, consumeQueueOffset)) { + if (fetchFromCurrentStore(topic, queueId, consumeQueueOffset)) { Stopwatch stopwatch = Stopwatch.createStarted(); return fetcher.getMessageStoreTimeStampAsync(topic, queueId, consumeQueueOffset) .thenApply(time -> { @@ -272,7 +274,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore { .build(); TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), latencyAttributes); if (time == -1) { - logger.debug("TieredMessageStore#getMessageStoreTimeStampAsync: get message time failed, try to get message time from next store: topic: {}, queue: {}, queue offset: {}", + logger.debug("GetEarliestMessageTimeAsync failed, try to get message time from next store, topic: {}, queue: {}, queue offset: {}", topic, queueId, consumeQueueOffset); return next.getMessageStoreTimeStamp(topic, queueId, consumeQueueOffset); } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java index 5062c7d9e..32911a6e8 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java @@ -16,14 +16,11 @@ */ package org.apache.rocketmq.tieredstore.provider; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Stopwatch; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; @@ -35,8 +32,8 @@ import org.apache.rocketmq.tieredstore.exception.TieredStoreException; import org.apache.rocketmq.tieredstore.file.TieredCommitLog; import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue; import org.apache.rocketmq.tieredstore.file.TieredIndexFile; -import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream; -import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStreamFactory; +import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream; +import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStreamFactory; import org.apache.rocketmq.tieredstore.util.MessageBufferUtil; import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; @@ -50,22 +47,23 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment> protected final TieredMessageStoreConfig storeConfig; private final long maxSize; - private final ReentrantLock bufferLock; - private final Semaphore commitLock; + private final ReentrantLock bufferLock = new ReentrantLock(); + private final Semaphore commitLock = new Semaphore(1); - private volatile boolean full; - private volatile boolean closed; + private volatile boolean full = false; + private volatile boolean closed = false; - private volatile long minTimestamp; - private volatile long maxTimestamp; - private volatile long commitPosition; - private volatile long appendPosition; + private volatile long minTimestamp = Long.MAX_VALUE; + private volatile long maxTimestamp = Long.MAX_VALUE; + private volatile long commitPosition = 0L; + private volatile long appendPosition = 0L; // only used in commitLog - private volatile long dispatchCommitOffset = 0; + private volatile long dispatchCommitOffset = 0L; private ByteBuffer codaBuffer; - private List<ByteBuffer> uploadBufferList = new ArrayList<>(); + private List<ByteBuffer> bufferList = new ArrayList<>(); + private FileSegmentInputStream fileSegmentInputStream; private CompletableFuture<Boolean> flightCommitRequest = CompletableFuture.completedFuture(false); public TieredFileSegment(TieredMessageStoreConfig storeConfig, @@ -75,21 +73,13 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment> this.fileType = fileType; this.filePath = filePath; this.baseOffset = baseOffset; - - this.closed = false; - this.bufferLock = new ReentrantLock(); - this.commitLock = new Semaphore(1); - - this.commitPosition = 0L; - this.appendPosition = 0L; - this.minTimestamp = Long.MAX_VALUE; - this.maxTimestamp = Long.MAX_VALUE; - - // The max segment size of a file is determined by the file type - this.maxSize = getMaxSizeAccordingFileType(storeConfig); + this.maxSize = getMaxSizeByFileType(); } - private long getMaxSizeAccordingFileType(TieredMessageStoreConfig storeConfig) { + /** + * The max segment size of a file is determined by the file type + */ + protected long getMaxSizeByFileType() { switch (fileType) { case COMMIT_LOG: return storeConfig.getTieredStoreCommitLogMaxSize(); @@ -184,39 +174,23 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment> this.appendPosition = pos; } - private List<ByteBuffer> rollingUploadBuffer() { + private List<ByteBuffer> borrowBuffer() { bufferLock.lock(); try { - List<ByteBuffer> tmp = uploadBufferList; - uploadBufferList = new ArrayList<>(); + List<ByteBuffer> tmp = bufferList; + bufferList = new ArrayList<>(); return tmp; } finally { bufferLock.unlock(); } } - private void sendBackBuffer(TieredFileSegmentInputStream inputStream) { - bufferLock.lock(); - try { - List<ByteBuffer> tmpBufferList = inputStream.getUploadBufferList(); - for (ByteBuffer buffer : tmpBufferList) { - buffer.rewind(); - } - tmpBufferList.addAll(uploadBufferList); - uploadBufferList = tmpBufferList; - if (inputStream.getCodaBuffer() != null) { - codaBuffer.rewind(); - } - } finally { - bufferLock.unlock(); - } - } - @SuppressWarnings("NonAtomicOperationOnVolatileField") - public AppendResult append(ByteBuffer byteBuf, long timeStamp) { + public AppendResult append(ByteBuffer byteBuf, long timestamp) { if (closed) { return AppendResult.FILE_CLOSED; } + bufferLock.lock(); try { if (full || codaBuffer != null) { @@ -227,7 +201,8 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment> minTimestamp = byteBuf.getLong(TieredIndexFile.INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION); maxTimestamp = byteBuf.getLong(TieredIndexFile.INDEX_FILE_HEADER_END_TIME_STAMP_POSITION); appendPosition += byteBuf.remaining(); - uploadBufferList.add(byteBuf); + // IndexFile is large and not change after compaction, no need deep copy + bufferList.add(byteBuf); setFull(); return AppendResult.SUCCESS; } @@ -236,23 +211,34 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment> setFull(); return AppendResult.FILE_FULL; } - if (uploadBufferList.size() > storeConfig.getTieredStoreGroupCommitCount() + + if (bufferList.size() > storeConfig.getTieredStoreGroupCommitCount() || appendPosition - commitPosition > storeConfig.getTieredStoreGroupCommitSize()) { commitAsync(); } - if (uploadBufferList.size() > storeConfig.getTieredStoreMaxGroupCommitCount()) { - logger.debug("TieredFileSegment#append: buffer full: file: {}, upload buffer size: {}", - getPath(), uploadBufferList.size()); + + if (bufferList.size() > storeConfig.getTieredStoreMaxGroupCommitCount()) { + logger.debug("File segment append buffer full, file: {}, buffer size: {}, pending bytes: {}", + getPath(), bufferList.size(), appendPosition - commitPosition); return AppendResult.BUFFER_FULL; } - if (timeStamp != Long.MAX_VALUE) { - maxTimestamp = timeStamp; + + if (timestamp != Long.MAX_VALUE) { + maxTimestamp = timestamp; if (minTimestamp == Long.MAX_VALUE) { - minTimestamp = timeStamp; + minTimestamp = timestamp; } } + appendPosition += byteBuf.remaining(); - uploadBufferList.add(byteBuf); + + // deep copy buffer + ByteBuffer byteBuffer = ByteBuffer.allocateDirect(byteBuf.remaining()); + byteBuffer.put(byteBuf); + byteBuffer.flip(); + byteBuf.rewind(); + + bufferList.add(byteBuffer); return AppendResult.SUCCESS; } finally { bufferLock.unlock(); @@ -267,7 +253,6 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment> return appendPosition; } - @VisibleForTesting public void setAppendPosition(long appendPosition) { this.appendPosition = appendPosition; } @@ -333,6 +318,8 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment> if (closed) { return false; } + // result is false when we send real commit request + // use join for wait flight request done Boolean result = commitAsync().join(); if (!result) { result = flightCommitRequest.join(); @@ -340,92 +327,156 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment> return result; } + private void releaseCommitLock() { + if (commitLock.availablePermits() == 0) { + commitLock.release(); + } else { + logger.error("[Bug] FileSegmentCommitAsync, lock is already released: available permits: {}", + commitLock.availablePermits()); + } + } + + private void updateDispatchCommitOffset(List<ByteBuffer> bufferList) { + if (fileType == FileSegmentType.COMMIT_LOG && bufferList.size() > 0) { + dispatchCommitOffset = + MessageBufferUtil.getQueueOffset(bufferList.get(bufferList.size() - 1)); + } + } + + /** + * @return false: commit, true: no commit operation + */ @SuppressWarnings("NonAtomicOperationOnVolatileField") public CompletableFuture<Boolean> commitAsync() { if (closed) { return CompletableFuture.completedFuture(false); } - Stopwatch stopwatch = Stopwatch.createStarted(); + if (!needCommit()) { return CompletableFuture.completedFuture(true); } - try { - int permits = commitLock.drainPermits(); - if (permits <= 0) { - return CompletableFuture.completedFuture(false); - } - } catch (Exception e) { + + if (commitLock.drainPermits() <= 0) { return CompletableFuture.completedFuture(false); } - List<ByteBuffer> bufferList = rollingUploadBuffer(); - int bufferSize = 0; - for (ByteBuffer buffer : bufferList) { - bufferSize += buffer.remaining(); - } - if (codaBuffer != null) { - bufferSize += codaBuffer.remaining(); - } - if (bufferSize == 0) { - return CompletableFuture.completedFuture(true); - } - TieredFileSegmentInputStream inputStream = TieredFileSegmentInputStreamFactory.build( - fileType, baseOffset + commitPosition, bufferList, codaBuffer, bufferSize); - int finalBufferSize = bufferSize; + try { - flightCommitRequest = commit0(inputStream, commitPosition, bufferSize, fileType != FileSegmentType.INDEX) + if (fileSegmentInputStream != null) { + long fileSize = this.getSize(); + if (fileSize == -1L) { + logger.error("Get commit position error before commit, Commit: %d, Expect: %d, Current Max: %d, FileName: %s", + commitPosition, commitPosition + fileSegmentInputStream.getContentLength(), appendPosition, getPath()); + releaseCommitLock(); + return CompletableFuture.completedFuture(false); + } else { + if (correctPosition(fileSize, null)) { + updateDispatchCommitOffset(fileSegmentInputStream.getBufferList()); + fileSegmentInputStream = null; + } + } + } + + int bufferSize; + if (fileSegmentInputStream != null) { + bufferSize = fileSegmentInputStream.available(); + } else { + List<ByteBuffer> bufferList = borrowBuffer(); + bufferSize = bufferList.stream().mapToInt(ByteBuffer::remaining).sum() + + (codaBuffer != null ? codaBuffer.remaining() : 0); + if (bufferSize == 0) { + releaseCommitLock(); + return CompletableFuture.completedFuture(true); + } + fileSegmentInputStream = FileSegmentInputStreamFactory.build( + fileType, baseOffset + commitPosition, bufferList, codaBuffer, bufferSize); + } + + return flightCommitRequest = this + .commit0(fileSegmentInputStream, commitPosition, bufferSize, fileType != FileSegmentType.INDEX) .thenApply(result -> { if (result) { - if (fileType == FileSegmentType.COMMIT_LOG && bufferList.size() > 0) { - dispatchCommitOffset = MessageBufferUtil.getQueueOffset(bufferList.get(bufferList.size() - 1)); - } - commitPosition += finalBufferSize; + updateDispatchCommitOffset(fileSegmentInputStream.getBufferList()); + commitPosition += bufferSize; + fileSegmentInputStream = null; return true; - } - sendBackBuffer(inputStream); - return false; - }) - .exceptionally(e -> handleCommitException(inputStream, e)) - .whenComplete((result, e) -> { - if (commitLock.availablePermits() == 0) { - logger.debug("TieredFileSegment#commitAsync: commit cost: {}ms, file: {}, item count: {}, buffer size: {}", stopwatch.elapsed(TimeUnit.MILLISECONDS), getPath(), bufferList.size(), finalBufferSize); - commitLock.release(); } else { - logger.error("[Bug]TieredFileSegment#commitAsync: commit lock is already released: available permits: {}", commitLock.availablePermits()); + fileSegmentInputStream.rewind(); + return false; } - }); - return flightCommitRequest; + }) + .exceptionally(this::handleCommitException) + .whenComplete((result, e) -> releaseCommitLock()); + } catch (Exception e) { - handleCommitException(inputStream, e); - if (commitLock.availablePermits() == 0) { - logger.debug("TieredFileSegment#commitAsync: commit cost: {}ms, file: {}, item count: {}, buffer size: {}", stopwatch.elapsed(TimeUnit.MILLISECONDS), getPath(), bufferList.size(), finalBufferSize); - commitLock.release(); - } else { - logger.error("[Bug]TieredFileSegment#commitAsync: commit lock is already released: available permits: {}", commitLock.availablePermits()); - } + handleCommitException(e); + releaseCommitLock(); } return CompletableFuture.completedFuture(false); } - private boolean handleCommitException(TieredFileSegmentInputStream inputStream, Throwable e) { + private long getCorrectFileSize(Throwable throwable) { + if (throwable instanceof TieredStoreException) { + long fileSize = ((TieredStoreException) throwable).getPosition(); + if (fileSize > 0) { + return fileSize; + } + } + return getSize(); + } + + private boolean handleCommitException(Throwable e) { + // Get root cause here Throwable cause = e.getCause() != null ? e.getCause() : e; - sendBackBuffer(inputStream); - long realSize = 0; - if (cause instanceof TieredStoreException && ((TieredStoreException) cause).getPosition() > 0) { - realSize = ((TieredStoreException) cause).getPosition(); + long fileSize = this.getCorrectFileSize(cause); + + if (fileSize == -1L) { + logger.error("Get commit position error, Commit: %d, Expect: %d, Current Max: %d, FileName: %s", + commitPosition, commitPosition + fileSegmentInputStream.getContentLength(), appendPosition, getPath()); + fileSegmentInputStream.rewind(); + return false; + } + + if (correctPosition(fileSize, cause)) { + updateDispatchCommitOffset(fileSegmentInputStream.getBufferList()); + fileSegmentInputStream = null; + return true; + } else { + fileSegmentInputStream.rewind(); + return false; } - if (realSize <= 0) { - realSize = getSize(); + } + + /** + * return true to clear buffer + */ + private boolean correctPosition(long fileSize, Throwable throwable) { + + // Current we have three offsets here: commit offset, expect offset, file size. + // We guarantee that the commit offset is less than or equal to the expect offset. + // Max offset will increase because we can continuously put in new buffers + String handleInfo = throwable == null ? "before commit" : "after commit"; + long expectPosition = commitPosition + fileSegmentInputStream.getContentLength(); + + String offsetInfo = String.format("Correct Commit Position, %s, result=[{}], " + + "Commit: %d, Expect: %d, Current Max: %d, FileSize: %d, FileName: %s", + handleInfo, commitPosition, expectPosition, appendPosition, fileSize, this.getPath()); + + // We are believing that the file size returned by the server is correct, + // can reset the commit offset to the file size reported by the storage system. + if (fileSize == expectPosition) { + logger.info(offsetInfo, "Success", throwable); + commitPosition = fileSize; + return true; } - if (realSize > 0 && realSize > commitPosition) { - logger.error("TieredFileSegment#handleCommitException: commit failed: file: {}, try to fix position: origin: {}, real: {}", getPath(), commitPosition, realSize, cause); - // TODO check if this diff part is uploaded to backend storage - long diff = appendPosition - commitPosition; - commitPosition = realSize; - appendPosition = realSize + diff; - // TODO check if appendPosition is large than maxOffset - } else if (realSize < commitPosition) { - logger.error("[Bug]TieredFileSegment#handleCommitException: commit failed: file: {}, can not fix position: origin: {}, real: {}", getPath(), commitPosition, realSize, cause); + + if (fileSize < commitPosition) { + logger.error(offsetInfo, "FileSizeIncorrect", throwable); + } else if (fileSize == commitPosition) { + logger.warn(offsetInfo, "CommitFailed", throwable); + } else if (fileSize > commitPosition) { + logger.warn(offsetInfo, "PartialSuccess", throwable); } + commitPosition = fileSize; return false; } } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java index 5a0ca25f5..0db3eaf8f 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java @@ -18,7 +18,7 @@ package org.apache.rocketmq.tieredstore.provider; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; -import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream; +import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream; public interface TieredStoreProvider { @@ -30,7 +30,9 @@ public interface TieredStoreProvider { String getPath(); /** - * Get file size in backend file system + * Get the real length of the file. + * Return 0 if the file does not exist, + * Return -1 if system get size failed. * * @return file real size */ @@ -71,5 +73,5 @@ public interface TieredStoreProvider { * @param append try to append or create a new file * @return put result, <code>true</code> if data successfully write; <code>false</code> otherwise */ - CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream inputStream,long position, int length, boolean append); + CompletableFuture<Boolean> commit0(FileSegmentInputStream inputStream,long position, int length, boolean append); } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java index 52be90b1d..7e949cb28 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java @@ -36,7 +36,7 @@ import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor; import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager; import org.apache.rocketmq.tieredstore.provider.TieredFileSegment; -import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream; +import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream; import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_FILE_TYPE; @@ -184,7 +184,7 @@ public class PosixFileSegment extends TieredFileSegment { @Override public CompletableFuture<Boolean> commit0( - TieredFileSegmentInputStream inputStream, long position, int length, boolean append) { + FileSegmentInputStream inputStream, long position, int length, boolean append) { Stopwatch stopwatch = Stopwatch.createStarted(); AttributesBuilder attributesBuilder = newAttributesBuilder() diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/CommitLogInputStream.java similarity index 88% rename from tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java rename to tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/CommitLogInputStream.java index c70bb7656..13b6e0ef9 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/CommitLogInputStream.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.tieredstore.provider.inputstream; +package org.apache.rocketmq.tieredstore.provider.stream; import java.io.IOException; import java.nio.ByteBuffer; @@ -23,20 +23,23 @@ import java.util.List; import org.apache.rocketmq.tieredstore.common.FileSegmentType; import org.apache.rocketmq.tieredstore.util.MessageBufferUtil; -public class TieredCommitLogInputStream extends TieredFileSegmentInputStream { +public class CommitLogInputStream extends FileSegmentInputStream { /** * commitLogOffset is the real physical offset of the commitLog buffer which is being read */ + private final long startCommitLogOffset; + private long commitLogOffset; private final ByteBuffer codaBuffer; private long markCommitLogOffset = -1; - public TieredCommitLogInputStream(FileSegmentType fileType, long startOffset, + public CommitLogInputStream(FileSegmentType fileType, long startOffset, List<ByteBuffer> uploadBufferList, ByteBuffer codaBuffer, int contentLength) { super(fileType, uploadBufferList, contentLength); + this.startCommitLogOffset = startOffset; this.commitLogOffset = startOffset; this.codaBuffer = codaBuffer; } @@ -53,6 +56,15 @@ public class TieredCommitLogInputStream extends TieredFileSegmentInputStream { this.commitLogOffset = markCommitLogOffset; } + @Override + public synchronized void rewind() { + super.rewind(); + this.commitLogOffset = this.startCommitLogOffset; + if (this.codaBuffer != null) { + this.codaBuffer.rewind(); + } + } + @Override public ByteBuffer getCodaBuffer() { return this.codaBuffer; @@ -64,17 +76,17 @@ public class TieredCommitLogInputStream extends TieredFileSegmentInputStream { return -1; } readPosition++; - if (curReadBufferIndex >= uploadBufferList.size()) { + if (curReadBufferIndex >= bufferList.size()) { return readCoda(); } int res; if (readPosInCurBuffer >= curBuffer.remaining()) { curReadBufferIndex++; - if (curReadBufferIndex >= uploadBufferList.size()) { + if (curReadBufferIndex >= bufferList.size()) { readPosInCurBuffer = 0; return readCoda(); } - curBuffer = uploadBufferList.get(curReadBufferIndex); + curBuffer = bufferList.get(curReadBufferIndex); commitLogOffset += readPosInCurBuffer; readPosInCurBuffer = 0; } @@ -119,9 +131,9 @@ public class TieredCommitLogInputStream extends TieredFileSegmentInputStream { int posInCurBuffer = readPosInCurBuffer; long curCommitLogOffset = commitLogOffset; ByteBuffer curBuf = curBuffer; - while (needRead > 0 && bufIndex <= uploadBufferList.size()) { + while (needRead > 0 && bufIndex <= bufferList.size()) { int readLen, remaining, realReadLen = 0; - if (bufIndex == uploadBufferList.size()) { + if (bufIndex == bufferList.size()) { // read from coda buffer remaining = codaBuffer.remaining() - posInCurBuffer; readLen = Math.min(remaining, needRead); @@ -137,7 +149,7 @@ public class TieredCommitLogInputStream extends TieredFileSegmentInputStream { } remaining = curBuf.remaining() - posInCurBuffer; readLen = Math.min(remaining, needRead); - curBuf = uploadBufferList.get(bufIndex); + curBuf = bufferList.get(bufIndex); if (posInCurBuffer < MessageBufferUtil.PHYSICAL_OFFSET_POSITION) { realReadLen = Math.min(MessageBufferUtil.PHYSICAL_OFFSET_POSITION - posInCurBuffer, readLen); // read from commitLog buffer diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStream.java similarity index 77% rename from tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java rename to tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStream.java index e1758ca93..9e9d5135c 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStream.java @@ -15,15 +15,16 @@ * limitations under the License. */ -package org.apache.rocketmq.tieredstore.provider.inputstream; +package org.apache.rocketmq.tieredstore.provider.stream; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import java.util.List; +import org.apache.commons.collections.CollectionUtils; import org.apache.rocketmq.tieredstore.common.FileSegmentType; -public class TieredFileSegmentInputStream extends InputStream { +public class FileSegmentInputStream extends InputStream { /** * file type, can be commitlog, consume queue or indexfile now @@ -33,7 +34,7 @@ public class TieredFileSegmentInputStream extends InputStream { /** * hold bytebuffer */ - protected final List<ByteBuffer> uploadBufferList; + protected final List<ByteBuffer> bufferList; /** * total remaining of bytebuffer list @@ -65,13 +66,13 @@ public class TieredFileSegmentInputStream extends InputStream { private int markReadPosInCurBuffer = -1; - public TieredFileSegmentInputStream(FileSegmentType fileType, List<ByteBuffer> uploadBufferList, - int contentLength) { + public FileSegmentInputStream( + FileSegmentType fileType, List<ByteBuffer> bufferList, int contentLength) { this.fileType = fileType; this.contentLength = contentLength; - this.uploadBufferList = uploadBufferList; - if (uploadBufferList != null && uploadBufferList.size() > 0) { - this.curBuffer = uploadBufferList.get(curReadBufferIndex); + this.bufferList = bufferList; + if (bufferList != null && bufferList.size() > 0) { + this.curBuffer = bufferList.get(curReadBufferIndex); } } @@ -95,18 +96,34 @@ public class TieredFileSegmentInputStream extends InputStream { this.readPosition = markReadPosition; this.curReadBufferIndex = markCurReadBufferIndex; this.readPosInCurBuffer = markReadPosInCurBuffer; - if (this.curReadBufferIndex < uploadBufferList.size()) { - this.curBuffer = uploadBufferList.get(curReadBufferIndex); + if (this.curReadBufferIndex < bufferList.size()) { + this.curBuffer = bufferList.get(curReadBufferIndex); } } + public synchronized void rewind() { + this.readPosition = 0; + this.curReadBufferIndex = 0; + this.readPosInCurBuffer = 0; + if (CollectionUtils.isNotEmpty(bufferList)) { + this.curBuffer = bufferList.get(0); + for (ByteBuffer buffer : bufferList) { + buffer.rewind(); + } + } + } + + public int getContentLength() { + return contentLength; + } + @Override public int available() { return contentLength - readPosition; } - public List<ByteBuffer> getUploadBufferList() { - return uploadBufferList; + public List<ByteBuffer> getBufferList() { + return bufferList; } public ByteBuffer getCodaBuffer() { @@ -121,10 +138,10 @@ public class TieredFileSegmentInputStream extends InputStream { readPosition++; if (readPosInCurBuffer >= curBuffer.remaining()) { curReadBufferIndex++; - if (curReadBufferIndex >= uploadBufferList.size()) { + if (curReadBufferIndex >= bufferList.size()) { return -1; } - curBuffer = uploadBufferList.get(curReadBufferIndex); + curBuffer = bufferList.get(curReadBufferIndex); readPosInCurBuffer = 0; } return curBuffer.get(readPosInCurBuffer++) & 0xff; @@ -153,8 +170,8 @@ public class TieredFileSegmentInputStream extends InputStream { int bufIndex = curReadBufferIndex; int posInCurBuffer = readPosInCurBuffer; ByteBuffer curBuf = curBuffer; - while (needRead > 0 && bufIndex < uploadBufferList.size()) { - curBuf = uploadBufferList.get(bufIndex); + while (needRead > 0 && bufIndex < bufferList.size()) { + curBuf = bufferList.get(bufIndex); int remaining = curBuf.remaining() - posInCurBuffer; int readLen = Math.min(remaining, needRead); // read from curBuf diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStreamFactory.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStreamFactory.java similarity index 54% rename from tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStreamFactory.java rename to tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStreamFactory.java index d0c983fd4..a90baff3a 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStreamFactory.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStreamFactory.java @@ -15,30 +15,34 @@ * limitations under the License. */ -package org.apache.rocketmq.tieredstore.provider.inputstream; +package org.apache.rocketmq.tieredstore.provider.stream; import java.nio.ByteBuffer; import java.util.List; import org.apache.rocketmq.tieredstore.common.FileSegmentType; -public class TieredFileSegmentInputStreamFactory { +public class FileSegmentInputStreamFactory { - public static TieredFileSegmentInputStream build(FileSegmentType fileType, - long startOffset, List<ByteBuffer> uploadBufferList, ByteBuffer codaBuffer, int contentLength) { + public static FileSegmentInputStream build( + FileSegmentType fileType, long offset, List<ByteBuffer> bufferList, ByteBuffer byteBuffer, int length) { + + if (bufferList == null) { + throw new IllegalArgumentException("bufferList is null"); + } switch (fileType) { case COMMIT_LOG: - return new TieredCommitLogInputStream( - fileType, startOffset, uploadBufferList, codaBuffer, contentLength); + return new CommitLogInputStream( + fileType, offset, bufferList, byteBuffer, length); case CONSUME_QUEUE: - return new TieredFileSegmentInputStream(fileType, uploadBufferList, contentLength); + return new FileSegmentInputStream(fileType, bufferList, length); case INDEX: - if (uploadBufferList.size() != 1) { - throw new IllegalArgumentException("uploadBufferList size in INDEX type input stream must be 1"); + if (bufferList.size() != 1) { + throw new IllegalArgumentException("buffer block size must be 1 when file type is IndexFile"); } - return new TieredFileSegmentInputStream(fileType, uploadBufferList, contentLength); + return new FileSegmentInputStream(fileType, bufferList, length); default: - throw new IllegalArgumentException("fileType is not supported"); + throw new IllegalArgumentException("file type is not supported"); } } } diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java index 8601392e7..2451199c2 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java @@ -130,36 +130,36 @@ public class TieredMessageStoreTest { // TieredStorageLevel.DISABLE properties.setProperty("tieredStorageLevel", "0"); configuration.update(properties); - Assert.assertFalse(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0)); + Assert.assertFalse(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0)); // TieredStorageLevel.NOT_IN_DISK properties.setProperty("tieredStorageLevel", "1"); configuration.update(properties); when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(), anyLong())).thenReturn(false); - Assert.assertTrue(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0)); + Assert.assertTrue(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0)); when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(), anyLong())).thenReturn(true); - Assert.assertFalse(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0)); + Assert.assertFalse(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0)); // TieredStorageLevel.NOT_IN_MEM properties.setProperty("tieredStorageLevel", "2"); configuration.update(properties); Mockito.when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(), anyLong())).thenReturn(false); Mockito.when(nextStore.checkInMemByConsumeOffset(anyString(), anyInt(), anyLong(), anyInt())).thenReturn(true); - Assert.assertTrue(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0)); + Assert.assertTrue(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0)); Mockito.when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(), anyLong())).thenReturn(true); Mockito.when(nextStore.checkInMemByConsumeOffset(anyString(), anyInt(), anyLong(), anyInt())).thenReturn(false); - Assert.assertTrue(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0)); + Assert.assertTrue(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0)); Mockito.when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(), anyLong())).thenReturn(true); Mockito.when(nextStore.checkInMemByConsumeOffset(anyString(), anyInt(), anyLong(), anyInt())).thenReturn(true); - Assert.assertFalse(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0)); + Assert.assertFalse(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0)); // TieredStorageLevel.FORCE properties.setProperty("tieredStorageLevel", "3"); configuration.update(properties); - Assert.assertTrue(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0)); + Assert.assertTrue(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0)); } @Test diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java index cc39cfbfc..7a4d05969 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java @@ -24,6 +24,7 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.tieredstore.TieredStoreTestUtil; import org.apache.rocketmq.tieredstore.common.FileSegmentType; import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; +import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor; import org.apache.rocketmq.tieredstore.metadata.FileSegmentMetadata; import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore; import org.apache.rocketmq.tieredstore.provider.TieredFileSegment; @@ -55,6 +56,7 @@ public class TieredFlatFileTest { public void tearDown() throws IOException { TieredStoreTestUtil.destroyMetadataStore(); TieredStoreTestUtil.destroyTempDir(storePath); + TieredStoreExecutor.shutdown(); } private List<FileSegmentMetadata> getSegmentMetadataList(TieredMetadataStore metadataStore) { diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java index 262d6645b..2da72bc7a 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java @@ -87,5 +87,7 @@ public class TieredIndexFileTest { indexList = indexFile.queryAsync(mq.getTopic(), "key1", 1000, 1200).join(); Assert.assertEquals(1, indexList.size()); + + indexFile.destroy(); } } diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockTieredFileSegmentInputStream.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockFileSegmentInputStream.java similarity index 82% rename from tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockTieredFileSegmentInputStream.java rename to tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockFileSegmentInputStream.java index a6566b7de..3bbe41dd4 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockTieredFileSegmentInputStream.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockFileSegmentInputStream.java @@ -20,13 +20,13 @@ package org.apache.rocketmq.tieredstore.provider; import java.io.InputStream; import java.nio.ByteBuffer; import java.util.List; -import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream; +import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream; -public class MockTieredFileSegmentInputStream extends TieredFileSegmentInputStream { +public class MockFileSegmentInputStream extends FileSegmentInputStream { private final InputStream inputStream; - public MockTieredFileSegmentInputStream(InputStream inputStream) { + public MockFileSegmentInputStream(InputStream inputStream) { super(null, null, Integer.MAX_VALUE); this.inputStream = inputStream; } @@ -43,7 +43,7 @@ public class MockTieredFileSegmentInputStream extends TieredFileSegmentInputStre } @Override - public List<ByteBuffer> getUploadBufferList() { + public List<ByteBuffer> getBufferList() { return null; } diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java index a2554ba3d..743d9182c 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java @@ -28,8 +28,8 @@ import java.util.Random; import org.apache.rocketmq.tieredstore.common.FileSegmentType; import org.apache.rocketmq.tieredstore.file.TieredCommitLog; import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue; -import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream; -import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStreamFactory; +import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream; +import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStreamFactory; import org.apache.rocketmq.tieredstore.util.MessageBufferUtil; import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest; import org.junit.Assert; @@ -57,7 +57,7 @@ public class TieredFileSegmentInputStreamTest { bufferSize += byteBuffer.remaining(); } - // build expected byte buffer for verifying the TieredFileSegmentInputStream + // build expected byte buffer for verifying the FileSegmentInputStream ByteBuffer expectedByteBuffer = ByteBuffer.allocate(bufferSize); for (ByteBuffer byteBuffer : uploadBufferList) { expectedByteBuffer.put(byteBuffer); @@ -74,7 +74,7 @@ public class TieredFileSegmentInputStreamTest { int[] batchReadSizeTestSet = { MessageBufferUtil.PHYSICAL_OFFSET_POSITION - 1, MessageBufferUtil.PHYSICAL_OFFSET_POSITION, MessageBufferUtil.PHYSICAL_OFFSET_POSITION + 1, MSG_LEN - 1, MSG_LEN, MSG_LEN + 1 }; - verifyReadAndReset(expectedByteBuffer, () -> TieredFileSegmentInputStreamFactory.build( + verifyReadAndReset(expectedByteBuffer, () -> FileSegmentInputStreamFactory.build( FileSegmentType.COMMIT_LOG, COMMIT_LOG_START_OFFSET, uploadBufferList, null, finalBufferSize), finalBufferSize, batchReadSizeTestSet); } @@ -98,7 +98,7 @@ public class TieredFileSegmentInputStreamTest { int codaBufferSize = codaBuffer.remaining(); bufferSize += codaBufferSize; - // build expected byte buffer for verifying the TieredFileSegmentInputStream + // build expected byte buffer for verifying the FileSegmentInputStream ByteBuffer expectedByteBuffer = ByteBuffer.allocate(bufferSize); for (ByteBuffer byteBuffer : uploadBufferList) { expectedByteBuffer.put(byteBuffer); @@ -119,7 +119,7 @@ public class TieredFileSegmentInputStreamTest { MSG_LEN - 1, MSG_LEN, MSG_LEN + 1, bufferSize - 1, bufferSize, bufferSize + 1 }; - verifyReadAndReset(expectedByteBuffer, () -> TieredFileSegmentInputStreamFactory.build( + verifyReadAndReset(expectedByteBuffer, () -> FileSegmentInputStreamFactory.build( FileSegmentType.COMMIT_LOG, COMMIT_LOG_START_OFFSET, uploadBufferList, codaBuffer, finalBufferSize), finalBufferSize, batchReadSizeTestSet); } @@ -134,7 +134,7 @@ public class TieredFileSegmentInputStreamTest { bufferSize += byteBuffer.remaining(); } - // build expected byte buffer for verifying the TieredFileSegmentInputStream + // build expected byte buffer for verifying the FileSegmentInputStream ByteBuffer expectedByteBuffer = ByteBuffer.allocate(bufferSize); for (ByteBuffer byteBuffer : uploadBufferList) { expectedByteBuffer.put(byteBuffer); @@ -143,7 +143,7 @@ public class TieredFileSegmentInputStreamTest { int finalBufferSize = bufferSize; int[] batchReadSizeTestSet = {TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE - 1, TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE, TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE + 1}; - verifyReadAndReset(expectedByteBuffer, () -> TieredFileSegmentInputStreamFactory.build( + verifyReadAndReset(expectedByteBuffer, () -> FileSegmentInputStreamFactory.build( FileSegmentType.CONSUME_QUEUE, COMMIT_LOG_START_OFFSET, uploadBufferList, null, finalBufferSize), bufferSize, batchReadSizeTestSet); } @@ -156,16 +156,16 @@ public class TieredFileSegmentInputStreamTest { byteBuffer.flip(); List<ByteBuffer> uploadBufferList = Arrays.asList(byteBuffer); - // build expected byte buffer for verifying the TieredFileSegmentInputStream + // build expected byte buffer for verifying the FileSegmentInputStream ByteBuffer expectedByteBuffer = byteBuffer.slice(); - verifyReadAndReset(expectedByteBuffer, () -> TieredFileSegmentInputStreamFactory.build( + verifyReadAndReset(expectedByteBuffer, () -> FileSegmentInputStreamFactory.build( FileSegmentType.INDEX, COMMIT_LOG_START_OFFSET, uploadBufferList, null, byteBuffer.limit()), byteBuffer.limit(), new int[] {23, 24, 25}); } - private void verifyReadAndReset(ByteBuffer expectedByteBuffer, Supplier<TieredFileSegmentInputStream> constructor, + private void verifyReadAndReset(ByteBuffer expectedByteBuffer, Supplier<FileSegmentInputStream> constructor, int bufferSize, int[] readBatchSizeTestSet) { - TieredFileSegmentInputStream inputStream = constructor.get(); + FileSegmentInputStream inputStream = constructor.get(); // verify verifyInputStream(inputStream, expectedByteBuffer); diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java index 4cd83e0d2..a655710a5 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java @@ -116,13 +116,22 @@ public class TieredFileSegmentTest { } @Test - public void testCommitFailed() { + public void testCommitFailedThenSuccess() { long startTime = System.currentTimeMillis(); MemoryFileSegment segment = (MemoryFileSegment) createFileSegment(FileSegmentType.COMMIT_LOG); long lastSize = segment.getSize(); - segment.append(MessageBufferUtilTest.buildMockedMessageBuffer(), 0); - segment.append(MessageBufferUtilTest.buildMockedMessageBuffer(), 0); - + segment.setCheckSize(false); + segment.initPosition(lastSize); + segment.setSize((int) lastSize); + + ByteBuffer buffer1 = MessageBufferUtilTest.buildMockedMessageBuffer().putLong( + MessageBufferUtil.PHYSICAL_OFFSET_POSITION, baseOffset + lastSize); + ByteBuffer buffer2 = MessageBufferUtilTest.buildMockedMessageBuffer().putLong( + MessageBufferUtil.PHYSICAL_OFFSET_POSITION, baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN); + segment.append(buffer1, 0); + segment.append(buffer2, 0); + + // Mock new message arrive segment.blocker = new CompletableFuture<>(); new Thread(() -> { try { @@ -131,20 +140,88 @@ public class TieredFileSegmentTest { Assert.fail(e.getMessage()); } ByteBuffer buffer = MessageBufferUtilTest.buildMockedMessageBuffer(); + buffer.putLong(MessageBufferUtil.PHYSICAL_OFFSET_POSITION, MessageBufferUtilTest.MSG_LEN * 2); buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, startTime); segment.append(buffer, 0); segment.blocker.complete(false); }).start(); + // Commit failed segment.commit(); segment.blocker.join(); + segment.blocker = null; + + // Copy data and assume commit success + segment.getMemStore().put(buffer1); + segment.getMemStore().put(buffer2); + segment.setSize((int) (lastSize + MessageBufferUtilTest.MSG_LEN * 2)); - segment.blocker = new CompletableFuture<>(); - segment.blocker.complete(true); segment.commit(); + Assert.assertEquals(lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getCommitPosition()); + Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getCommitOffset()); + Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getMaxOffset()); + + ByteBuffer msg1 = segment.read(lastSize, MessageBufferUtilTest.MSG_LEN); + Assert.assertEquals(baseOffset + lastSize, MessageBufferUtil.getCommitLogOffset(msg1)); + + ByteBuffer msg2 = segment.read(lastSize + MessageBufferUtilTest.MSG_LEN, MessageBufferUtilTest.MSG_LEN); + Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN, MessageBufferUtil.getCommitLogOffset(msg2)); + + ByteBuffer msg3 = segment.read(lastSize + MessageBufferUtilTest.MSG_LEN * 2, MessageBufferUtilTest.MSG_LEN); + Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 2, MessageBufferUtil.getCommitLogOffset(msg3)); + } + + @Test + public void testCommitFailed3Times() { + long startTime = System.currentTimeMillis(); + MemoryFileSegment segment = (MemoryFileSegment) createFileSegment(FileSegmentType.COMMIT_LOG); + long lastSize = segment.getSize(); + segment.setCheckSize(false); + segment.initPosition(lastSize); + segment.setSize((int) lastSize); + + ByteBuffer buffer1 = MessageBufferUtilTest.buildMockedMessageBuffer().putLong( + MessageBufferUtil.PHYSICAL_OFFSET_POSITION, baseOffset + lastSize); + ByteBuffer buffer2 = MessageBufferUtilTest.buildMockedMessageBuffer().putLong( + MessageBufferUtil.PHYSICAL_OFFSET_POSITION, baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN); + segment.append(buffer1, 0); + segment.append(buffer2, 0); + + // Mock new message arrive + segment.blocker = new CompletableFuture<>(); + new Thread(() -> { + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + Assert.fail(e.getMessage()); + } + ByteBuffer buffer = MessageBufferUtilTest.buildMockedMessageBuffer(); + buffer.putLong(MessageBufferUtil.PHYSICAL_OFFSET_POSITION, MessageBufferUtilTest.MSG_LEN * 2); + buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, startTime); + segment.append(buffer, 0); + segment.blocker.complete(false); + }).start(); + + for (int i = 0; i < 3; i++) { + segment.commit(); + } + + Assert.assertEquals(lastSize, segment.getCommitPosition()); + Assert.assertEquals(baseOffset + lastSize, segment.getCommitOffset()); + Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getMaxOffset()); + + segment.blocker.join(); + segment.blocker = null; + segment.commit(); + Assert.assertEquals(lastSize + MessageBufferUtilTest.MSG_LEN * 2, segment.getCommitPosition()); + Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 2, segment.getCommitOffset()); Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getMaxOffset()); + + segment.commit(); + Assert.assertEquals(lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getCommitPosition()); Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getCommitOffset()); + Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getMaxOffset()); ByteBuffer msg1 = segment.read(lastSize, MessageBufferUtilTest.MSG_LEN); Assert.assertEquals(baseOffset + lastSize, MessageBufferUtil.getCommitLogOffset(msg1)); diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegment.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegment.java index cb155cf8f..80ad41f68 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegment.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegment.java @@ -23,7 +23,7 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.tieredstore.common.FileSegmentType; import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; import org.apache.rocketmq.tieredstore.provider.TieredFileSegment; -import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream; +import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream; import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; import org.junit.Assert; @@ -33,6 +33,8 @@ public class MemoryFileSegment extends TieredFileSegment { public CompletableFuture<Boolean> blocker; + protected int size = 0; + protected boolean checkSize = true; public MemoryFileSegment(FileSegmentType fileType, MessageQueue messageQueue, long baseOffset, @@ -56,6 +58,18 @@ public class MemoryFileSegment extends TieredFileSegment { memStore.position((int) getSize()); } + public boolean isCheckSize() { + return checkSize; + } + + public void setCheckSize(boolean checkSize) { + this.checkSize = checkSize; + } + + public ByteBuffer getMemStore() { + return memStore; + } + @Override public String getPath() { return filePath; @@ -66,7 +80,11 @@ public class MemoryFileSegment extends TieredFileSegment { if (checkSize) { return 1000; } - return 0; + return size; + } + + public void setSize(int size) { + this.size = size; } @Override @@ -85,11 +103,11 @@ public class MemoryFileSegment extends TieredFileSegment { @Override public CompletableFuture<Boolean> commit0( - TieredFileSegmentInputStream inputStream, long position, int length, boolean append) { + FileSegmentInputStream inputStream, long position, int length, boolean append) { try { if (blocker != null && !blocker.get()) { - throw new IllegalStateException(); + throw new IllegalStateException("Commit Exception for Memory Test"); } } catch (InterruptedException | ExecutionException e) { Assert.fail(e.getMessage()); @@ -98,7 +116,6 @@ public class MemoryFileSegment extends TieredFileSegment { Assert.assertTrue(!checkSize || position >= getSize()); byte[] buffer = new byte[1024]; - int startPos = memStore.position(); try { int len; diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegmentWithoutCheck.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegmentWithoutCheck.java index 8ac330b37..630fd2223 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegmentWithoutCheck.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegmentWithoutCheck.java @@ -22,7 +22,7 @@ import java.util.concurrent.ExecutionException; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.tieredstore.common.FileSegmentType; import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; -import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream; +import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream; import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; import org.junit.Assert; @@ -46,7 +46,7 @@ public class MemoryFileSegmentWithoutCheck extends MemoryFileSegment { } @Override - public CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream inputStream, long position, int length, + public CompletableFuture<Boolean> commit0(FileSegmentInputStream inputStream, long position, int length, boolean append) { try { if (blocker != null && !blocker.get()) { -- 2.32.0.windows.2 From d000ef947d7c99918ceba0fa451c1e29fd84ba07 Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Thu, 31 Aug 2023 09:41:33 +0800 Subject: [PATCH 3/7] [ISSUE #7283] Incorrect dledger commitlog min offset after mappedFile re delete failed (#7284) --- .../apache/rocketmq/store/dledger/DLedgerCommitLog.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index ec5e86d70..d5f6acdc0 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -162,7 +162,12 @@ public class DLedgerCommitLog extends CommitLog { if (!mappedFileQueue.getMappedFiles().isEmpty()) { return mappedFileQueue.getMinOffset(); } - return dLedgerFileList.getMinOffset(); + for (MmapFile file : dLedgerFileList.getMappedFiles()) { + if (file.isAvailable()) { + return file.getFileFromOffset() + file.getStartPosition(); + } + } + return 0; } @Override -- 2.32.0.windows.2 From f82718ae3b77a16b553c03f672dc971a2d5d48fa Mon Sep 17 00:00:00 2001 From: cnScarb <jjhfen00@163.com> Date: Thu, 31 Aug 2023 15:50:10 +0800 Subject: [PATCH 4/7] [ISSUE #7208] fix: when deleting topic also delete its pop retry topic (#7209) --- .../processor/AdminBrokerProcessor.java | 24 ++++++++++--- .../processor/AdminBrokerProcessorTest.java | 36 +++++++++++++++++++ 2 files changed, 55 insertions(+), 5 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index bbddcec2d..8fbcd3c94 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -51,6 +51,7 @@ import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil; import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.LockCallback; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; @@ -542,16 +543,29 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { } } - this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic()); - this.brokerController.getTopicQueueMappingManager().delete(requestHeader.getTopic()); - this.brokerController.getConsumerOffsetManager().cleanOffsetByTopic(requestHeader.getTopic()); - this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByTopicName(requestHeader.getTopic()); - this.brokerController.getMessageStore().deleteTopics(Sets.newHashSet(requestHeader.getTopic())); + final Set<String> groups = this.brokerController.getConsumerOffsetManager().whichGroupByTopic(topic); + // delete pop retry topics first + for (String group : groups) { + final String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group); + if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopic) != null) { + deleteTopicInBroker(popRetryTopic); + } + } + // delete topic + deleteTopicInBroker(topic); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; } + private void deleteTopicInBroker(String topic) { + this.brokerController.getTopicConfigManager().deleteTopicConfig(topic); + this.brokerController.getTopicQueueMappingManager().delete(topic); + this.brokerController.getConsumerOffsetManager().cleanOffsetByTopic(topic); + this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByTopicName(topic); + this.brokerController.getMessageStore().deleteTopics(Sets.newHashSet(topic)); + } + private synchronized RemotingCommand updateAndCreateAccessConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java index d33a217f7..9d17011b6 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.LongAdder; import org.apache.rocketmq.broker.BrokerController; @@ -41,6 +42,7 @@ import org.apache.rocketmq.broker.topic.RocksDBTopicConfigManager; import org.apache.rocketmq.broker.topic.TopicConfigManager; import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicFilterType; @@ -90,8 +92,11 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) @@ -321,6 +326,37 @@ public class AdminBrokerProcessorTest { "please execute it from master broker."); } + @Test + public void testDeleteWithPopRetryTopic() throws Exception { + String topic = "topicA"; + String anotherTopic = "another_topicA"; + + topicConfigManager = mock(TopicConfigManager.class); + when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager); + final ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>(); + topicConfigTable.put(topic, new TopicConfig()); + topicConfigTable.put(KeyBuilder.buildPopRetryTopic(topic, "cid1"), new TopicConfig()); + + topicConfigTable.put(anotherTopic, new TopicConfig()); + topicConfigTable.put(KeyBuilder.buildPopRetryTopic(anotherTopic, "cid2"), new TopicConfig()); + when(topicConfigManager.getTopicConfigTable()).thenReturn(topicConfigTable); + when(topicConfigManager.selectTopicConfig(anyString())).thenAnswer(invocation -> { + final String selectTopic = invocation.getArgument(0); + return topicConfigManager.getTopicConfigTable().get(selectTopic); + }); + + when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager); + when(consumerOffsetManager.whichGroupByTopic(topic)).thenReturn(Sets.newHashSet("cid1")); + + RemotingCommand request = buildDeleteTopicRequest(topic); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + + verify(topicConfigManager).deleteTopicConfig(topic); + verify(topicConfigManager).deleteTopicConfig(KeyBuilder.buildPopRetryTopic(topic, "cid1")); + verify(messageStore, times(2)).deleteTopics(anySet()); + } + @Test public void testGetAllTopicConfigInRocksdb() throws Exception { if (notToBeExecuted()) { -- 2.32.0.windows.2 From 31d10385d1616445478104ce9ef463a8c4852ba2 Mon Sep 17 00:00:00 2001 From: guyinyou <36399867+guyinyou@users.noreply.github.com> Date: Mon, 4 Sep 2023 14:09:32 +0800 Subject: [PATCH 5/7] [ISSUE #7289] Fixed asynchronous send backpressure capability Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com> --- .../impl/producer/DefaultMQProducerImpl.java | 77 +++++++++++++------ 1 file changed, 53 insertions(+), 24 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index bbbb17b07..2d6b83ac2 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -547,6 +547,8 @@ public class DefaultMQProducerImpl implements MQProducerInner { @Deprecated public void send(final Message msg, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException { + BackpressureSendCallBack newCallBack = new BackpressureSendCallBack(sendCallback); + final long beginStartTime = System.currentTimeMillis(); Runnable runnable = new Runnable() { @Override @@ -554,20 +556,53 @@ public class DefaultMQProducerImpl implements MQProducerInner { long costTime = System.currentTimeMillis() - beginStartTime; if (timeout > costTime) { try { - sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime); + sendDefaultImpl(msg, CommunicationMode.ASYNC, newCallBack, timeout - costTime); } catch (Exception e) { - sendCallback.onException(e); + newCallBack.onException(e); } } else { - sendCallback.onException( + newCallBack.onException( new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout")); } } }; - executeAsyncMessageSend(runnable, msg, sendCallback, timeout, beginStartTime); + executeAsyncMessageSend(runnable, msg, newCallBack, timeout, beginStartTime); } - public void executeAsyncMessageSend(Runnable runnable, final Message msg, final SendCallback sendCallback, + class BackpressureSendCallBack implements SendCallback { + public boolean isSemaphoreAsyncSizeAquired = false; + public boolean isSemaphoreAsyncNumAquired = false; + public int msgLen; + private final SendCallback sendCallback; + + public BackpressureSendCallBack(final SendCallback sendCallback) { + this.sendCallback = sendCallback; + } + + @Override + public void onSuccess(SendResult sendResult) { + if (isSemaphoreAsyncSizeAquired) { + semaphoreAsyncSendSize.release(msgLen); + } + if (isSemaphoreAsyncNumAquired) { + semaphoreAsyncSendNum.release(); + } + sendCallback.onSuccess(sendResult); + } + + @Override + public void onException(Throwable e) { + if (isSemaphoreAsyncSizeAquired) { + semaphoreAsyncSendSize.release(msgLen); + } + if (isSemaphoreAsyncNumAquired) { + semaphoreAsyncSendNum.release(); + } + sendCallback.onException(e); + } + } + + public void executeAsyncMessageSend(Runnable runnable, final Message msg, final BackpressureSendCallBack sendCallback, final long timeout, final long beginStartTime) throws MQClientException, InterruptedException { ExecutorService executor = this.getAsyncSenderExecutor(); @@ -595,7 +630,9 @@ public class DefaultMQProducerImpl implements MQProducerInner { return; } } - + sendCallback.isSemaphoreAsyncSizeAquired = isSemaphoreAsyncSizeAquired; + sendCallback.isSemaphoreAsyncNumAquired = isSemaphoreAsyncNumAquired; + sendCallback.msgLen = msgLen; executor.submit(runnable); } catch (RejectedExecutionException e) { if (isEnableBackpressureForAsyncMode) { @@ -603,15 +640,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { } else { throw new MQClientException("executor rejected ", e); } - } finally { - if (isSemaphoreAsyncSizeAquired) { - semaphoreAsyncSendSize.release(msgLen); - } - if (isSemaphoreAsyncNumAquired) { - semaphoreAsyncSendNum.release(); - } } - } public MessageQueue invokeMessageQueueSelector(Message msg, MessageQueueSelector selector, Object arg, @@ -1188,7 +1217,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { @Deprecated public void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException { - + BackpressureSendCallBack newCallBack = new BackpressureSendCallBack(sendCallback); final long beginStartTime = System.currentTimeMillis(); Runnable runnable = new Runnable() { @Override @@ -1203,22 +1232,22 @@ public class DefaultMQProducerImpl implements MQProducerInner { long costTime = System.currentTimeMillis() - beginStartTime; if (timeout > costTime) { try { - sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback, null, + sendKernelImpl(msg, mq, CommunicationMode.ASYNC, newCallBack, null, timeout - costTime); } catch (MQBrokerException e) { throw new MQClientException("unknown exception", e); } } else { - sendCallback.onException(new RemotingTooMuchRequestException("call timeout")); + newCallBack.onException(new RemotingTooMuchRequestException("call timeout")); } } catch (Exception e) { - sendCallback.onException(e); + newCallBack.onException(e); } } }; - executeAsyncMessageSend(runnable, msg, sendCallback, timeout, beginStartTime); + executeAsyncMessageSend(runnable, msg, newCallBack, timeout, beginStartTime); } /** @@ -1315,7 +1344,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { public void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException { - + BackpressureSendCallBack newCallBack = new BackpressureSendCallBack(sendCallback); final long beginStartTime = System.currentTimeMillis(); Runnable runnable = new Runnable() { @Override @@ -1324,21 +1353,21 @@ public class DefaultMQProducerImpl implements MQProducerInner { if (timeout > costTime) { try { try { - sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback, + sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, newCallBack, timeout - costTime); } catch (MQBrokerException e) { throw new MQClientException("unknown exception", e); } } catch (Exception e) { - sendCallback.onException(e); + newCallBack.onException(e); } } else { - sendCallback.onException(new RemotingTooMuchRequestException("call timeout")); + newCallBack.onException(new RemotingTooMuchRequestException("call timeout")); } } }; - executeAsyncMessageSend(runnable, msg, sendCallback, timeout, beginStartTime); + executeAsyncMessageSend(runnable, msg, newCallBack, timeout, beginStartTime); } /** -- 2.32.0.windows.2 From d67b9d64cbd53824798af57ba18770e0fcefa37a Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Wed, 6 Sep 2023 14:07:23 +0800 Subject: [PATCH 6/7] [ISSUE #7302] Fix singleTopicRegister code deleted in merge --- .../apache/rocketmq/broker/topic/TopicConfigManager.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index 1c3b9711f..4e3c1736c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -330,7 +330,11 @@ public class TopicConfigManager extends ConfigManager { log.error("createTopicIfAbsent ", e); } if (createNew && register) { - this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); + if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { + this.brokerController.registerSingleTopicAll(topicConfig); + } else { + this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); + } } return getTopicConfig(topicConfig.getTopicName()); } -- 2.32.0.windows.2 From 37017dbaec5c521fd529ef4aecf3658092884f84 Mon Sep 17 00:00:00 2001 From: lizhimins <707364882@qq.com> Date: Wed, 6 Sep 2023 15:23:15 +0800 Subject: [PATCH 7/7] [ISSUE #7305] Fix metrics and transactional module not shutdown while broker offline cause coredump(#7307) --- .../java/org/apache/rocketmq/broker/BrokerController.java | 8 ++++++++ .../queue/TransactionalMessageServiceImpl.java | 4 +++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index e8f943702..6aba70cb2 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -1302,6 +1302,10 @@ public class BrokerController { this.fastRemotingServer.shutdown(); } + if (this.brokerMetricsManager != null) { + this.brokerMetricsManager.shutdown(); + } + if (this.brokerStatsManager != null) { this.brokerStatsManager.shutdown(); } @@ -1324,6 +1328,10 @@ public class BrokerController { this.ackMessageProcessor.shutdownPopReviveService(); } + if (this.transactionalMessageService != null) { + this.transactionalMessageService.close(); + } + if (this.notificationProcessor != null) { this.notificationProcessor.getPopLongPollingService().shutdown(); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java index 93fa725a9..48db828e0 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java @@ -629,7 +629,9 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ @Override public void close() { - + if (this.transactionalOpBatchService != null) { + this.transactionalOpBatchService.shutdown(); + } } public Message getOpMessage(int queueId, String moreData) { -- 2.32.0.windows.2
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.
浙ICP备2022010568号-2