Projects
home:Eustace:branches:Eulaceura:Factory
rocketmq
_service:obs_scm:patch006-backport-auto-batch-p...
Sign Up
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File _service:obs_scm:patch006-backport-auto-batch-producer.patch of Package rocketmq
From 737c1e53383350a5671fa207ee0e4ce932850bac Mon Sep 17 00:00:00 2001 From: rongtong <jinrongtong5@163.com> Date: Tue, 18 Jul 2023 14:12:39 +0800 Subject: [PATCH 1/7] [ISSUE #7029] Add a config to determine whether pop response should return the actual retry topic or tamper with the original topic (#7030) --- .../broker/processor/PopMessageProcessor.java | 4 ++-- .../org/apache/rocketmq/common/BrokerConfig.java | 13 +++++++++++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index 464f8f4fd..53e172561 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -591,8 +591,8 @@ public class PopMessageProcessor implements NettyRequestProcessor { atomicRestNum.set(result.getMaxOffset() - result.getNextBeginOffset() + atomicRestNum.get()); String brokerName = brokerController.getBrokerConfig().getBrokerName(); for (SelectMappedBufferResult mapedBuffer : result.getMessageMapedList()) { - // We should not recode buffer for normal topic message - if (!isRetry) { + // We should not recode buffer when popResponseReturnActualRetryTopic is true or topic is not retry topic + if (brokerController.getBrokerConfig().isPopResponseReturnActualRetryTopic() || !isRetry) { getMessageResult.addMessage(mapedBuffer); } else { List<MessageExt> messageExtList = MessageDecoder.decodesBatch(mapedBuffer.getByteBuffer(), diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index f5f0db101..a4d82d1c5 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -381,6 +381,11 @@ public class BrokerConfig extends BrokerIdentity { */ private long fetchNamesrvAddrInterval = 10 * 1000; + /** + * Pop response returns the actual retry topic rather than tampering with the original topic + */ + private boolean popResponseReturnActualRetryTopic = false; + public long getMaxPopPollingSize() { return maxPopPollingSize; } @@ -1676,4 +1681,12 @@ public class BrokerConfig extends BrokerIdentity { public void setFetchNamesrvAddrInterval(final long fetchNamesrvAddrInterval) { this.fetchNamesrvAddrInterval = fetchNamesrvAddrInterval; } + + public boolean isPopResponseReturnActualRetryTopic() { + return popResponseReturnActualRetryTopic; + } + + public void setPopResponseReturnActualRetryTopic(boolean popResponseReturnActualRetryTopic) { + this.popResponseReturnActualRetryTopic = popResponseReturnActualRetryTopic; + } } -- 2.32.0.windows.2 From 7996ec3b3f7ccea01f66951ac639b48303bbf7a6 Mon Sep 17 00:00:00 2001 From: leeyiyu <43566239+leeyiyu@users.noreply.github.com> Date: Tue, 18 Jul 2023 20:58:56 +0800 Subject: [PATCH 2/7] [ISSUE #6879] ConcurrentHashMapUtils fails to solve the loop bug in JDK8 (#6883) --- .../common/utils/ConcurrentHashMapUtils.java | 16 +++++++++++++++- .../common/utils/ConcurrentHashMapUtilsTest.java | 2 ++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtils.java b/common/src/main/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtils.java index 1f1b4dd89..6fd9c21c9 100644 --- a/common/src/main/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtils.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.common.utils; +import java.util.Objects; import java.util.concurrent.ConcurrentMap; import java.util.function.Function; @@ -40,10 +41,23 @@ public abstract class ConcurrentHashMapUtils { * @see <a href="https://bugs.openjdk.java.net/browse/JDK-8161372">https://bugs.openjdk.java.net/browse/JDK-8161372</a> */ public static <K, V> V computeIfAbsent(ConcurrentMap<K, V> map, K key, Function<? super K, ? extends V> func) { + Objects.requireNonNull(func); if (isJdk8) { V v = map.get(key); if (null == v) { - v = map.computeIfAbsent(key, func); +// v = map.computeIfAbsent(key, func); + + // this bug fix methods maybe cause `func.apply` multiple calls. + v = func.apply(key); + if (null == v) { + return null; + } + final V res = map.putIfAbsent(key, v); + if (null != res) { + // if pre value present, means other thread put value already, and putIfAbsent not effect + // return exist value + return res; + } } return v; } else { diff --git a/common/src/test/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtilsTest.java b/common/src/test/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtilsTest.java index 8e32fc93a..fa97ddb1c 100644 --- a/common/src/test/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtilsTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtilsTest.java @@ -35,5 +35,7 @@ public class ConcurrentHashMapUtilsTest { assertEquals("2342", value1); String value2 = ConcurrentHashMapUtils.computeIfAbsent(map, "123", k -> "2342"); assertEquals("1111", value2); +// map.computeIfAbsent("AaAa", key->map.computeIfAbsent("BBBB",key2->"42")); + ConcurrentHashMapUtils.computeIfAbsent(map, "AaAa", key -> map.computeIfAbsent("BBBB", key2 -> "42")); } } \ No newline at end of file -- 2.32.0.windows.2 From e0f5295fed8791d93bfa5b8420074c00b651ddfe Mon Sep 17 00:00:00 2001 From: lk <xdkxlk@outlook.com> Date: Wed, 19 Jul 2023 15:55:11 +0800 Subject: [PATCH 3/7] passing the renew event type to create the correct context (#7045) --- .../apache/rocketmq/proxy/common/RenewEvent.java | 14 +++++++++++++- .../proxy/processor/ReceiptHandleProcessor.java | 2 +- .../receipt/DefaultReceiptHandleManager.java | 6 +++--- 3 files changed, 17 insertions(+), 5 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java index fdf9833cc..0ff65c1cc 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java @@ -23,11 +23,19 @@ import org.apache.rocketmq.client.consumer.AckResult; public class RenewEvent { protected MessageReceiptHandle messageReceiptHandle; protected long renewTime; + protected EventType eventType; protected CompletableFuture<AckResult> future; - public RenewEvent(MessageReceiptHandle messageReceiptHandle, long renewTime, CompletableFuture<AckResult> future) { + public enum EventType { + RENEW, + STOP_RENEW, + CLEAR_GROUP + } + + public RenewEvent(MessageReceiptHandle messageReceiptHandle, long renewTime, EventType eventType, CompletableFuture<AckResult> future) { this.messageReceiptHandle = messageReceiptHandle; this.renewTime = renewTime; + this.eventType = eventType; this.future = future; } @@ -39,6 +47,10 @@ public class RenewEvent { return renewTime; } + public EventType getEventType() { + return eventType; + } + public CompletableFuture<AckResult> getFuture() { return future; } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java index fc49e7622..460842a86 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java @@ -38,7 +38,7 @@ public class ReceiptHandleProcessor extends AbstractProcessor { public ReceiptHandleProcessor(MessagingProcessor messagingProcessor, ServiceManager serviceManager) { super(messagingProcessor, serviceManager); StateEventListener<RenewEvent> eventListener = event -> { - ProxyContext context = createContext("RenewMessage"); + ProxyContext context = createContext(event.getEventType().name()); MessageReceiptHandle messageReceiptHandle = event.getMessageReceiptHandle(); ReceiptHandle handle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr()); messagingProcessor.changeInvisibleTime(context, handle, messageReceiptHandle.getMessageId(), diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java index c7633d658..9f35435f0 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java @@ -188,7 +188,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem } if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) { CompletableFuture<AckResult> future = new CompletableFuture<>(); - eventListener.fireEvent(new RenewEvent(messageReceiptHandle, RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), future)); + eventListener.fireEvent(new RenewEvent(messageReceiptHandle, RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), RenewEvent.EventType.RENEW, future)); future.whenComplete((ackResult, throwable) -> { if (throwable != null) { log.error("error when renew. handle:{}", messageReceiptHandle, throwable); @@ -218,7 +218,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem } RetryPolicy retryPolicy = subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy(); CompletableFuture<AckResult> future = new CompletableFuture<>(); - eventListener.fireEvent(new RenewEvent(messageReceiptHandle, retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), future)); + eventListener.fireEvent(new RenewEvent(messageReceiptHandle, retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), RenewEvent.EventType.STOP_RENEW, future)); future.whenComplete((ackResult, throwable) -> { if (throwable != null) { log.error("error when nack in renew. handle:{}", messageReceiptHandle, throwable); @@ -246,7 +246,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem try { handleGroup.computeIfPresent(msgID, handle, messageReceiptHandle -> { CompletableFuture<AckResult> future = new CompletableFuture<>(); - eventListener.fireEvent(new RenewEvent(messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), future)); + eventListener.fireEvent(new RenewEvent(messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), RenewEvent.EventType.CLEAR_GROUP, future)); return CompletableFuture.completedFuture(null); }); } catch (Exception e) { -- 2.32.0.windows.2 From 2c5808b9fdab8cae63318c89f34ad48a1ab6e962 Mon Sep 17 00:00:00 2001 From: lizhimins <707364882@qq.com> Date: Wed, 19 Jul 2023 20:14:02 +0800 Subject: [PATCH 4/7] [#ISSUE 7035] Fix correct min offset behavior in tiered storage (#7038) --- .../tieredstore/TieredDispatcher.java | 2 +- .../tieredstore/TieredMessageFetcher.java | 5 + .../tieredstore/file/CompositeFlatFile.java | 12 +- .../tieredstore/file/TieredCommitLog.java | 46 +++++++- .../tieredstore/file/TieredFlatFile.java | 29 +++-- .../file/TieredFlatFileManager.java | 2 +- .../metrics/TieredStoreMetricsConstant.java | 1 + .../provider/posix/PosixFileSegment.java | 19 +-- .../tieredstore/file/TieredCommitLogTest.java | 108 ++++++++++++++++++ .../TieredStoreMetricsManagerTest.java | 4 +- .../src/test/resources/rmq.logback-test.xml | 2 +- 11 files changed, 201 insertions(+), 29 deletions(-) create mode 100644 tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java index 6584b0e89..523b0c2cd 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java @@ -352,7 +352,7 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch case SUCCESS: long offset = MessageBufferUtil.getQueueOffset(message); if (queueOffset != offset) { - logger.error("Message cq offset in commitlog does not meet expectations, " + + logger.warn("Message cq offset in commitlog does not meet expectations, " + "result={}, topic={}, queueId={}, cq offset={}, msg offset={}", AppendResult.OFFSET_INCORRECT, topic, queueId, queueOffset, offset); } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java index 8802a73a3..c4fed54bd 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java @@ -473,6 +473,11 @@ public class TieredMessageFetcher implements MessageStoreFetcher { return CompletableFuture.completedFuture(result); } + // request range | result + // (0, min) | too small + // [min, max) | correct + // [max, max] | overflow one + // (max, +oo) | overflow badly if (queueOffset < minQueueOffset) { result.setStatus(GetMessageStatus.OFFSET_TOO_SMALL); result.setNextBeginOffset(flatFile.getConsumeQueueMinOffset()); diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java index 8f8ba98b1..fa01382e1 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java @@ -120,17 +120,19 @@ public class CompositeFlatFile implements CompositeAccess { return commitLog.getBeginTimestamp(); } - public long getConsumeQueueBaseOffset() { - return consumeQueue.getBaseOffset(); - } - @Override public long getCommitLogDispatchCommitOffset() { return commitLog.getDispatchCommitOffset(); } + public long getConsumeQueueBaseOffset() { + return consumeQueue.getBaseOffset(); + } + public long getConsumeQueueMinOffset() { - return consumeQueue.getMinOffset() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE; + long cqOffset = consumeQueue.getMinOffset() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE; + long effectiveOffset = this.commitLog.getMinConsumeQueueOffset(); + return Math.max(cqOffset, effectiveOffset); } public long getConsumeQueueCommitOffset() { diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java index 92aea58be..80e1bce50 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.tieredstore.common.AppendResult; @@ -31,6 +32,7 @@ import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; public class TieredCommitLog { private static final Logger log = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME); + private static final Long NOT_EXIST_MIN_OFFSET = -1L; /** * item size: int, 4 bytes @@ -42,10 +44,13 @@ public class TieredCommitLog { private final TieredMessageStoreConfig storeConfig; private final TieredFlatFile flatFile; + private final AtomicLong minConsumeQueueOffset; public TieredCommitLog(TieredFileAllocator fileQueueFactory, String filePath) { this.storeConfig = fileQueueFactory.getStoreConfig(); this.flatFile = fileQueueFactory.createFlatFileForCommitLog(filePath); + this.minConsumeQueueOffset = new AtomicLong(NOT_EXIST_MIN_OFFSET); + this.correctMinOffset(); } @VisibleForTesting @@ -61,6 +66,10 @@ public class TieredCommitLog { return flatFile.getCommitOffset(); } + public long getMinConsumeQueueOffset() { + return minConsumeQueueOffset.get() != NOT_EXIST_MIN_OFFSET ? minConsumeQueueOffset.get() : correctMinOffset(); + } + public long getDispatchCommitOffset() { return flatFile.getDispatchCommitOffset(); } @@ -82,6 +91,39 @@ public class TieredCommitLog { return flatFile.getFileToWrite().getMaxTimestamp(); } + public synchronized long correctMinOffset() { + if (flatFile.getFileSegmentCount() == 0) { + this.minConsumeQueueOffset.set(NOT_EXIST_MIN_OFFSET); + return NOT_EXIST_MIN_OFFSET; + } + + // queue offset field length is 8 + int length = MessageBufferUtil.QUEUE_OFFSET_POSITION + 8; + if (flatFile.getCommitOffset() - flatFile.getMinOffset() < length) { + this.minConsumeQueueOffset.set(NOT_EXIST_MIN_OFFSET); + return NOT_EXIST_MIN_OFFSET; + } + + try { + return this.flatFile.readAsync(this.flatFile.getMinOffset(), length) + .thenApply(buffer -> { + long offset = MessageBufferUtil.getQueueOffset(buffer); + minConsumeQueueOffset.set(offset); + log.info("Correct commitlog min cq offset success, filePath={}, min cq offset={}, range={}-{}", + flatFile.getFilePath(), offset, flatFile.getMinOffset(), flatFile.getCommitOffset()); + return offset; + }) + .exceptionally(throwable -> { + log.warn("Correct commitlog min cq offset error, filePath={}, range={}-{}", + flatFile.getFilePath(), flatFile.getMinOffset(), flatFile.getCommitOffset(), throwable); + return minConsumeQueueOffset.get(); + }).get(); + } catch (Exception e) { + log.error("Correct commitlog min cq offset error, filePath={}", flatFile.getFilePath(), e); + } + return minConsumeQueueOffset.get(); + } + public AppendResult append(ByteBuffer byteBuf) { return flatFile.append(byteBuf, MessageBufferUtil.getStoreTimeStamp(byteBuf)); } @@ -99,7 +141,9 @@ public class TieredCommitLog { } public void cleanExpiredFile(long expireTimestamp) { - flatFile.cleanExpiredFile(expireTimestamp); + if (flatFile.cleanExpiredFile(expireTimestamp) > 0) { + correctMinOffset(); + } } public void destroyExpiredFile() { diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java index a71323348..90ca843bf 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java @@ -133,6 +133,14 @@ public class TieredFlatFile { } } + public String getFilePath() { + return filePath; + } + + public FileSegmentType getFileType() { + return fileType; + } + @VisibleForTesting public List<TieredFileSegment> getFileSegmentList() { return fileSegmentList; @@ -333,10 +341,9 @@ public class TieredFlatFile { TieredFileSegment fileSegment = this.newSegment(fileType, offset, true); fileSegmentList.add(fileSegment); needCommitFileSegmentList.add(fileSegment); - Collections.sort(fileSegmentList); - - logger.debug("Create a new file segment: baseOffset: {}, file: {}, file type: {}", baseOffset, fileSegment.getPath(), fileType); + logger.debug("Create a new file segment: baseOffset: {}, file: {}, file type: {}", + offset, fileSegment.getPath(), fileType); return fileSegment; } finally { fileSegmentLock.writeLock().unlock(); @@ -429,7 +436,7 @@ public class TieredFlatFile { return result; } - public void cleanExpiredFile(long expireTimestamp) { + public int cleanExpiredFile(long expireTimestamp) { Set<Long> needToDeleteSet = new HashSet<>(); try { tieredMetadataStore.iterateFileSegment(filePath, fileType, metadata -> { @@ -438,32 +445,32 @@ public class TieredFlatFile { } }); } catch (Exception e) { - logger.error("clean expired failed: filePath: {}, file type: {}, expire timestamp: {}", + logger.error("Clean expired file, filePath: {}, file type: {}, expire timestamp: {}", filePath, fileType, expireTimestamp); } if (needToDeleteSet.isEmpty()) { - return; + return 0; } fileSegmentLock.writeLock().lock(); try { for (int i = 0; i < fileSegmentList.size(); i++) { + TieredFileSegment fileSegment = fileSegmentList.get(i); try { - TieredFileSegment fileSegment = fileSegmentList.get(i); if (needToDeleteSet.contains(fileSegment.getBaseOffset())) { fileSegment.close(); fileSegmentList.remove(fileSegment); needCommitFileSegmentList.remove(fileSegment); i--; this.updateFileSegment(fileSegment); - logger.info("expired file {} is been cleaned", fileSegment.getPath()); + logger.debug("Clean expired file, filePath: {}", fileSegment.getPath()); } else { break; } } catch (Exception e) { - logger.error("clean expired file failed: filePath: {}, file type: {}, expire timestamp: {}", - filePath, fileType, expireTimestamp, e); + logger.error("Clean expired file failed: filePath: {}, file type: {}, expire timestamp: {}", + fileSegment.getPath(), fileSegment.getFileType(), expireTimestamp, e); } } if (fileSegmentList.size() > 0) { @@ -476,6 +483,7 @@ public class TieredFlatFile { } finally { fileSegmentLock.writeLock().unlock(); } + return needToDeleteSet.size(); } @VisibleForTesting @@ -493,7 +501,6 @@ public class TieredFlatFile { fileSegment.destroyFile(); if (!fileSegment.exists()) { tieredMetadataStore.deleteFileSegment(filePath, fileType, metadata.getBaseOffset()); - logger.info("Destroyed expired file, file path: {}", fileSegment.getPath()); } } catch (Exception e) { logger.error("Destroyed expired file failed, file path: {}, file type: {}", diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java index 5fe511f68..aeca44b8c 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java @@ -223,7 +223,7 @@ public class TieredFlatFileManager { public CompositeQueueFlatFile getOrCreateFlatFileIfAbsent(MessageQueue messageQueue) { return queueFlatFileMap.computeIfAbsent(messageQueue, mq -> { try { - logger.info("TieredFlatFileManager#getOrCreateFlatFileIfAbsent: " + + logger.debug("TieredFlatFileManager#getOrCreateFlatFileIfAbsent: " + "try to create new flat file: topic: {}, queueId: {}", messageQueue.getTopic(), messageQueue.getQueueId()); return new CompositeQueueFlatFile(tieredFileAllocator, mq); diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsConstant.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsConstant.java index ad7281510..cb4674ea9 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsConstant.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsConstant.java @@ -38,6 +38,7 @@ public class TieredStoreMetricsConstant { public static final String LABEL_OPERATION = "operation"; public static final String LABEL_SUCCESS = "success"; + public static final String LABEL_PATH = "path"; public static final String LABEL_TOPIC = "topic"; public static final String LABEL_GROUP = "group"; public static final String LABEL_QUEUE_ID = "queue_id"; diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java index 8c0d1cbcd..52be90b1d 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java @@ -41,8 +41,8 @@ import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_FILE_TYPE; import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_OPERATION; +import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_PATH; import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_SUCCESS; -import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_TOPIC; /** * this class is experimental and may change without notice. @@ -55,6 +55,7 @@ public class PosixFileSegment extends TieredFileSegment { private static final String OPERATION_POSIX_READ = "read"; private static final String OPERATION_POSIX_WRITE = "write"; + private final String fullPath; private volatile File file; private volatile FileChannel readFileChannel; private volatile FileChannel writeFileChannel; @@ -71,7 +72,7 @@ public class PosixFileSegment extends TieredFileSegment { // fullPath: basePath/hash_cluster/broker/topic/queueId/fileType/baseOffset String brokerClusterName = storeConfig.getBrokerClusterName(); String clusterBasePath = TieredStoreUtil.getHash(brokerClusterName) + UNDERLINE + brokerClusterName; - String fullPath = Paths.get(basePath, clusterBasePath, filePath, + this.fullPath = Paths.get(basePath, clusterBasePath, filePath, fileType.toString(), TieredStoreUtil.offset2FileName(baseOffset)).toString(); logger.info("Constructing Posix FileSegment, filePath: {}", fullPath); @@ -80,13 +81,13 @@ public class PosixFileSegment extends TieredFileSegment { protected AttributesBuilder newAttributesBuilder() { return TieredStoreMetricsManager.newAttributesBuilder() - .put(LABEL_TOPIC, filePath) + .put(LABEL_PATH, filePath) .put(LABEL_FILE_TYPE, fileType.name().toLowerCase()); } @Override public String getPath() { - return filePath; + return fullPath; } @Override @@ -107,7 +108,7 @@ public class PosixFileSegment extends TieredFileSegment { if (file == null) { synchronized (this) { if (file == null) { - File file = new File(filePath); + File file = new File(fullPath); try { File dir = file.getParentFile(); if (!dir.exists()) { @@ -136,8 +137,9 @@ public class PosixFileSegment extends TieredFileSegment { if (writeFileChannel != null && writeFileChannel.isOpen()) { writeFileChannel.close(); } + logger.info("Destroy Posix FileSegment, filePath: {}", fullPath); } catch (IOException e) { - logger.error("PosixFileSegment#destroyFile: destroy file {} failed: ", filePath, e); + logger.error("Destroy Posix FileSegment failed, filePath: {}", fullPath, e); } if (file.exists()) { @@ -181,8 +183,9 @@ public class PosixFileSegment extends TieredFileSegment { } @Override - public CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream inputStream, long position, int length, - boolean append) { + public CompletableFuture<Boolean> commit0( + TieredFileSegmentInputStream inputStream, long position, int length, boolean append) { + Stopwatch stopwatch = Stopwatch.createStarted(); AttributesBuilder attributesBuilder = newAttributesBuilder() .put(LABEL_OPERATION, OPERATION_POSIX_WRITE); diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java new file mode 100644 index 000000000..6693d3cb7 --- /dev/null +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tieredstore.file; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.tieredstore.TieredStoreTestUtil; +import org.apache.rocketmq.tieredstore.common.AppendResult; +import org.apache.rocketmq.tieredstore.common.FileSegmentType; +import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; +import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor; +import org.apache.rocketmq.tieredstore.metadata.FileSegmentMetadata; +import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore; +import org.apache.rocketmq.tieredstore.provider.TieredFileSegment; +import org.apache.rocketmq.tieredstore.util.MessageBufferUtil; +import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest; +import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TieredCommitLogTest { + + private final String storePath = TieredStoreTestUtil.getRandomStorePath(); + private MessageQueue mq; + private TieredFileAllocator fileAllocator; + private TieredMetadataStore metadataStore; + + @Before + public void setUp() throws ClassNotFoundException, NoSuchMethodException { + TieredMessageStoreConfig storeConfig = new TieredMessageStoreConfig(); + storeConfig.setBrokerName("brokerName"); + storeConfig.setStorePathRootDir(storePath); + storeConfig.setTieredStoreFilePath(storePath + File.separator); + storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment"); + storeConfig.setCommitLogRollingInterval(0); + storeConfig.setTieredStoreCommitLogMaxSize(1000); + + metadataStore = TieredStoreUtil.getMetadataStore(storeConfig); + fileAllocator = new TieredFileAllocator(storeConfig); + mq = new MessageQueue("CommitLogTest", storeConfig.getBrokerName(), 0); + TieredStoreExecutor.init(); + } + + @After + public void tearDown() throws IOException { + TieredStoreTestUtil.destroyCompositeFlatFileManager(); + TieredStoreTestUtil.destroyMetadataStore(); + TieredStoreTestUtil.destroyTempDir(storePath); + TieredStoreExecutor.shutdown(); + } + + @Test + public void correctMinOffsetTest() { + String filePath = TieredStoreUtil.toPath(mq); + TieredCommitLog tieredCommitLog = new TieredCommitLog(fileAllocator, filePath); + Assert.assertEquals(0L, tieredCommitLog.getMinOffset()); + Assert.assertEquals(0L, tieredCommitLog.getCommitOffset()); + Assert.assertEquals(0L, tieredCommitLog.getDispatchCommitOffset()); + + // append some messages + for (int i = 6; i < 50; i++) { + ByteBuffer byteBuffer = MessageBufferUtilTest.buildMockedMessageBuffer(); + byteBuffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, i); + Assert.assertEquals(AppendResult.SUCCESS, tieredCommitLog.append(byteBuffer)); + } + + tieredCommitLog.commit(true); + tieredCommitLog.correctMinOffset(); + + // single file store: 1000 / 122 = 8, file count: 44 / 8 = 5 + Assert.assertEquals(6, tieredCommitLog.getFlatFile().getFileSegmentCount()); + + metadataStore.iterateFileSegment(filePath, FileSegmentType.COMMIT_LOG, metadata -> { + if (metadata.getBaseOffset() < 1000) { + metadata.setStatus(FileSegmentMetadata.STATUS_DELETED); + metadataStore.updateFileSegment(metadata); + } + }); + + // manually delete file + List<TieredFileSegment> segmentList = tieredCommitLog.getFlatFile().getFileSegmentList(); + segmentList.remove(0).destroyFile(); + segmentList.remove(0).destroyFile(); + + tieredCommitLog.correctMinOffset(); + Assert.assertEquals(4, tieredCommitLog.getFlatFile().getFileSegmentCount()); + Assert.assertEquals(6 + 8 + 8, tieredCommitLog.getMinConsumeQueueOffset()); + } +} \ No newline at end of file diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java index a1dde0451..26b38b970 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.rocketmq.tieredstore.TieredMessageFetcher; import org.apache.rocketmq.tieredstore.TieredStoreTestUtil; import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; +import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor; import org.junit.After; import org.junit.Test; @@ -30,9 +31,9 @@ public class TieredStoreMetricsManagerTest { public void tearDown() throws IOException { TieredStoreTestUtil.destroyCompositeFlatFileManager(); TieredStoreTestUtil.destroyMetadataStore(); + TieredStoreExecutor.shutdown(); } - @Test public void getMetricsView() { TieredStoreMetricsManager.getMetricsView(); @@ -40,6 +41,7 @@ public class TieredStoreMetricsManagerTest { @Test public void init() { + TieredStoreExecutor.init(); TieredMessageStoreConfig storeConfig = new TieredMessageStoreConfig(); storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.memory.MemoryFileSegment"); TieredStoreMetricsManager.init(OpenTelemetrySdk.builder().build().getMeter(""), diff --git a/tieredstore/src/test/resources/rmq.logback-test.xml b/tieredstore/src/test/resources/rmq.logback-test.xml index b70b42046..a7933b5ef 100644 --- a/tieredstore/src/test/resources/rmq.logback-test.xml +++ b/tieredstore/src/test/resources/rmq.logback-test.xml @@ -23,7 +23,7 @@ </encoder> </appender> - <root level="debug"> + <root level="info"> <appender-ref ref="STDOUT" /> </root> </configuration> -- 2.32.0.windows.2 From ebad3c8a6b41915edb3db65fca593123b296042d Mon Sep 17 00:00:00 2001 From: gaoyf <gaoyf@users.noreply.github.com> Date: Thu, 20 Jul 2023 10:59:40 +0800 Subject: [PATCH 5/7] [ISSUE #7047] NettyRemotingClient#invokeOneway throw Exception with address --- .../rocketmq/remoting/netty/NettyRemotingClient.java | 2 +- .../remoting/netty/NettyRemotingClientTest.java | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index afd779c83..9715b918a 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -756,7 +756,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } } else { this.closeChannel(addr, channel); - throw new RemotingConnectException(channelRemoteAddr); + throw new RemotingConnectException(addr); } } diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java index efa3eb3d5..8fabbb21d 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java @@ -20,6 +20,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.rocketmq.remoting.InvokeCallback; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; @@ -123,4 +124,14 @@ public class NettyRemotingClientTest { Throwable thrown = catchThrowable(future::get); assertThat(thrown.getCause()).isInstanceOf(RemotingException.class); } + + @Test + public void testInvokeOnewayException() throws Exception { + String addr = "0.0.0.0"; + try { + remotingClient.invokeOneway(addr, null, 1000); + } catch (RemotingConnectException e) { + assertThat(e.getMessage()).contains(addr); + } + } } -- 2.32.0.windows.2 From 804f2d85f22d9ee52573b9c6ee6abae248c9b387 Mon Sep 17 00:00:00 2001 From: wenbin yao <ywb992134@163.com> Date: Thu, 20 Jul 2023 11:01:38 +0800 Subject: [PATCH 6/7] [ISSUE ##7036] rename method: getWriteQueueIdByBroker to getWriteQueueNumsByBroker(#7037) * [ISSUE ##7036] rename method: getWriteQueueIdByBroker to getWriteQueueNumsByBroker * [ISSUE #7036] rename method from getWriteQueueIdByBroker to getWriteQueueNumsByBroker --- .../apache/rocketmq/client/impl/producer/TopicPublishInfo.java | 2 +- .../org/apache/rocketmq/client/latency/MQFaultStrategy.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java index a5f840500..275ada7ac 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java @@ -89,7 +89,7 @@ public class TopicPublishInfo { return this.messageQueueList.get(pos); } - public int getWriteQueueIdByBroker(final String brokerName) { + public int getWriteQueueNumsByBroker(final String brokerName) { for (int i = 0; i < topicRouteData.getQueueDatas().size(); i++) { final QueueData queueData = this.topicRouteData.getQueueDatas().get(i); if (queueData.getBrokerName().equals(brokerName)) { diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java index e86238e55..1e1953fad 100644 --- a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java +++ b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java @@ -69,7 +69,7 @@ public class MQFaultStrategy { } final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); - int writeQueueNums = tpInfo.getWriteQueueIdByBroker(notBestBroker); + int writeQueueNums = tpInfo.getWriteQueueNumsByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { -- 2.32.0.windows.2 From af993d28e20922d91862f0911e59f748dcb64e6a Mon Sep 17 00:00:00 2001 From: guyinyou <36399867+guyinyou@users.noreply.github.com> Date: Fri, 21 Jul 2023 09:31:56 +0800 Subject: [PATCH 7/7] [ISSUE #3717][RIP-27] Auto batching in producer Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com> --- .../rocketmq/client/impl/MQClientManager.java | 21 +- .../impl/producer/DefaultMQProducerImpl.java | 36 ++ .../client/producer/DefaultMQProducer.java | 501 +++++++++++------ .../rocketmq/client/producer/MQProducer.java | 24 +- .../client/producer/ProduceAccumulator.java | 510 ++++++++++++++++++ .../producer/DefaultMQProducerTest.java | 38 +- .../producer/ProduceAccumulatorTest.java | 176 ++++++ .../rocketmq/common/message/MessageBatch.java | 2 +- 8 files changed, 1133 insertions(+), 175 deletions(-) create mode 100644 client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java create mode 100644 client/src/test/java/org/apache/rocketmq/client/producer/ProduceAccumulatorTest.java diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java index 49186633f..02eaa66e9 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java @@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.client.producer.ProduceAccumulator; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; @@ -31,6 +32,9 @@ public class MQClientManager { private AtomicInteger factoryIndexGenerator = new AtomicInteger(); private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable = new ConcurrentHashMap<>(); + private ConcurrentMap<String/* clientId */, ProduceAccumulator> accumulatorTable = + new ConcurrentHashMap<String, ProduceAccumulator>(); + private MQClientManager() { @@ -43,7 +47,6 @@ public class MQClientManager { public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig) { return getOrCreateMQClientInstance(clientConfig, null); } - public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) { String clientId = clientConfig.buildMQClientId(); MQClientInstance instance = this.factoryTable.get(clientId); @@ -62,6 +65,22 @@ public class MQClientManager { return instance; } + public ProduceAccumulator getOrCreateProduceAccumulator(final ClientConfig clientConfig) { + String clientId = clientConfig.buildMQClientId(); + ProduceAccumulator accumulator = this.accumulatorTable.get(clientId); + if (null == accumulator) { + accumulator = new ProduceAccumulator(clientId); + ProduceAccumulator prev = this.accumulatorTable.putIfAbsent(clientId, accumulator); + if (prev != null) { + accumulator = prev; + log.warn("Returned Previous ProduceAccumulator for clientId:[{}]", clientId); + } else { + log.info("Created new ProduceAccumulator for clientId:[{}]", clientId); + } + } + + return accumulator; + } public void removeClientFactory(final String clientId) { this.factoryTable.remove(clientId); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 4eb0e6924..3f4c6e5f7 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -573,6 +573,42 @@ public class DefaultMQProducerImpl implements MQProducerInner { } + public MessageQueue invokeMessageQueueSelector(Message msg, MessageQueueSelector selector, Object arg, + final long timeout) throws MQClientException, RemotingTooMuchRequestException { + long beginStartTime = System.currentTimeMillis(); + this.makeSureStateOK(); + Validators.checkMessage(msg, this.defaultMQProducer); + + TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); + if (topicPublishInfo != null && topicPublishInfo.ok()) { + MessageQueue mq = null; + try { + List<MessageQueue> messageQueueList = + mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList()); + Message userMessage = MessageAccessor.cloneMessage(msg); + String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace()); + userMessage.setTopic(userTopic); + + mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg)); + } catch (Throwable e) { + throw new MQClientException("select message queue threw exception.", e); + } + + long costTime = System.currentTimeMillis() - beginStartTime; + if (timeout < costTime) { + throw new RemotingTooMuchRequestException("sendSelectImpl call timeout"); + } + if (mq != null) { + return mq; + } else { + throw new MQClientException("select message queue return null.", null); + } + } + + validateNameServerSetting(); + throw new MQClientException("No route info for this topic, " + msg.getTopic(), null); + } + public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 6e9ffed8c..c5b1b5223 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -29,6 +29,7 @@ import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.RequestTimeoutException; +import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.trace.AsyncTraceDispatcher; import org.apache.rocketmq.client.trace.TraceDispatcher; @@ -38,6 +39,7 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageBatch; import org.apache.rocketmq.common.message.MessageClientIDSetter; +import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.topic.TopicValidator; @@ -49,10 +51,10 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; /** * This class is the entry point for applications intending to send messages. </p> - * + * <p> * It's fine to tune fields which exposes getter/setter methods, but keep in mind, all of them should work well out of * box for most scenarios. </p> - * + * <p> * This class aggregates various <code>send</code> methods to deliver messages to broker(s). Each of them has pros and * cons; you'd better understand strengths and weakness of them before actually coding. </p> * @@ -78,9 +80,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Producer group conceptually aggregates all producer instances of exactly same role, which is particularly * important when transactional messages are involved. </p> - * + * <p> * For non-transactional messages, it does not matter as long as it's unique per process. </p> - * + * <p> * See <a href="https://rocketmq.apache.org/docs/introduction/02concepts">core concepts</a> for more discussion. */ private String producerGroup; @@ -107,14 +109,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Maximum number of retry to perform internally before claiming sending failure in synchronous mode. </p> - * + * <p> * This may potentially cause message duplication which is up to application developers to resolve. */ private int retryTimesWhenSendFailed = 2; /** * Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p> - * + * <p> * This may potentially cause message duplication which is up to application developers to resolve. */ private int retryTimesWhenSendAsyncFailed = 2; @@ -134,6 +136,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { */ private TraceDispatcher traceDispatcher = null; + /** + * Switch flag instance for automatic batch message + */ + private boolean autoBatch = false; + /** + * Instance for batching message automatically + */ + private ProduceAccumulator produceAccumulator = null; + /** * Indicate whether to block message when asynchronous sending traffic is too heavy. */ @@ -179,11 +190,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Constructor specifying producer group. * - * @param producerGroup Producer group, see the name-sake field. - * @param rpcHook RPC hook to execute per each remoting command execution. - * @param enableMsgTrace Switch flag instance for message trace. + * @param producerGroup Producer group, see the name-sake field. + * @param rpcHook RPC hook to execute per each remoting command execution. + * @param enableMsgTrace Switch flag instance for message trace. * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default - * trace topic name. + * trace topic name. */ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic) { @@ -193,7 +204,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Constructor specifying producer group. * - * @param namespace Namespace for this MQ Producer instance. + * @param namespace Namespace for this MQ Producer instance. * @param producerGroup Producer group, see the name-sake field. */ public DefaultMQProducer(final String namespace, final String producerGroup) { @@ -204,7 +215,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * Constructor specifying both producer group and RPC hook. * * @param producerGroup Producer group, see the name-sake field. - * @param rpcHook RPC hook to execute per each remoting command execution. + * @param rpcHook RPC hook to execute per each remoting command execution. */ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) { this(null, producerGroup, rpcHook); @@ -213,20 +224,21 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Constructor specifying namespace, producer group and RPC hook. * - * @param namespace Namespace for this MQ Producer instance. + * @param namespace Namespace for this MQ Producer instance. * @param producerGroup Producer group, see the name-sake field. - * @param rpcHook RPC hook to execute per each remoting command execution. + * @param rpcHook RPC hook to execute per each remoting command execution. */ public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) { this.namespace = namespace; this.producerGroup = producerGroup; defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); + produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this); } /** * Constructor specifying producer group and enabled msg trace flag. * - * @param producerGroup Producer group, see the name-sake field. + * @param producerGroup Producer group, see the name-sake field. * @param enableMsgTrace Switch flag instance for message trace. */ public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace) { @@ -236,10 +248,10 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Constructor specifying producer group, enabled msgTrace flag and customized trace topic name. * - * @param producerGroup Producer group, see the name-sake field. - * @param enableMsgTrace Switch flag instance for message trace. + * @param producerGroup Producer group, see the name-sake field. + * @param enableMsgTrace Switch flag instance for message trace. * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default - * trace topic name. + * trace topic name. */ public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic) { this(null, producerGroup, null, enableMsgTrace, customizedTraceTopic); @@ -249,18 +261,19 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * Constructor specifying namespace, producer group, RPC hook, enabled msgTrace flag and customized trace topic * name. * - * @param namespace Namespace for this MQ Producer instance. - * @param producerGroup Producer group, see the name-sake field. - * @param rpcHook RPC hook to execute per each remoting command execution. - * @param enableMsgTrace Switch flag instance for message trace. + * @param namespace Namespace for this MQ Producer instance. + * @param producerGroup Producer group, see the name-sake field. + * @param rpcHook RPC hook to execute per each remoting command execution. + * @param enableMsgTrace Switch flag instance for message trace. * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default - * trace topic name. + * trace topic name. */ public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic) { this.namespace = namespace; this.producerGroup = producerGroup; defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); + produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this); //if client open the message trace feature if (enableMsgTrace) { try { @@ -297,6 +310,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { public void start() throws MQClientException { this.setProducerGroup(withNamespace(this.producerGroup)); this.defaultMQProducerImpl.start(); + if (this.produceAccumulator != null) { + this.produceAccumulator.start(); + } if (null != traceDispatcher) { try { traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); @@ -312,6 +328,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { @Override public void shutdown() { this.defaultMQProducerImpl.shutdown(); + if (this.produceAccumulator != null) { + this.produceAccumulator.shutdown(); + } if (null != traceDispatcher) { traceDispatcher.shutdown(); } @@ -329,6 +348,26 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { return this.defaultMQProducerImpl.fetchPublishMessageQueues(withNamespace(topic)); } + private boolean canBatch(Message msg) { + // produceAccumulator is full + if (!produceAccumulator.tryAddMessage(msg)) { + return false; + } + // delay message do not support batch processing + if (msg.getDelayTimeLevel() > 0) { + return false; + } + // retry message do not support batch processing + if (msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + return false; + } + // message which have been assigned to producer group do not support batch processing + if (msg.getProperties().containsKey(MessageConst.PROPERTY_PRODUCER_GROUP)) { + return false; + } + return true; + } + /** * Send message in synchronous mode. This method returns only when the sending procedure totally completes. </p> * @@ -339,28 +378,32 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * @param msg Message to send. * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message, * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc. - * @throws MQClientException if there is any client error. - * @throws RemotingException if there is any network-tier error. - * @throws MQBrokerException if there is any error with broker. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws MQBrokerException if there is any error with broker. * @throws InterruptedException if the sending thread is interrupted. */ @Override public SendResult send( Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { msg.setTopic(withNamespace(msg.getTopic())); - return this.defaultMQProducerImpl.send(msg); + if (this.getAutoBatch() && !(msg instanceof MessageBatch)) { + return sendByAccumulator(msg, null, null); + } else { + return sendDirect(msg, null, null); + } } /** * Same to {@link #send(Message)} with send timeout specified in addition. * - * @param msg Message to send. + * @param msg Message to send. * @param timeout send timeout. * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message, * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc. - * @throws MQClientException if there is any client error. - * @throws RemotingException if there is any network-tier error. - * @throws MQBrokerException if there is any error with broker. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws MQBrokerException if there is any error with broker. * @throws InterruptedException if the sending thread is interrupted. */ @Override @@ -372,34 +415,42 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Send message to broker asynchronously. </p> - * + * <p> * This method returns immediately. On sending completion, <code>sendCallback</code> will be executed. </p> - * + * <p> * Similar to {@link #send(Message)}, internal implementation would potentially retry up to {@link * #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication and * application developers are the one to resolve this potential issue. * - * @param msg Message to send. + * @param msg Message to send. * @param sendCallback Callback to execute on sending completed, either successful or unsuccessful. - * @throws MQClientException if there is any client error. - * @throws RemotingException if there is any network-tier error. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. * @throws InterruptedException if the sending thread is interrupted. */ @Override public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { msg.setTopic(withNamespace(msg.getTopic())); - this.defaultMQProducerImpl.send(msg, sendCallback); + try { + if (this.getAutoBatch() && !(msg instanceof MessageBatch)) { + sendByAccumulator(msg, null, sendCallback); + } else { + sendDirect(msg, null, sendCallback); + } + } catch (Throwable e) { + sendCallback.onException(e); + } } /** * Same to {@link #send(Message, SendCallback)} with send timeout specified in addition. * - * @param msg message to send. + * @param msg message to send. * @param sendCallback Callback to execute. - * @param timeout send timeout. - * @throws MQClientException if there is any client error. - * @throws RemotingException if there is any network-tier error. + * @param timeout send timeout. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. * @throws InterruptedException if the sending thread is interrupted. */ @Override @@ -414,8 +465,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * acknowledgement from broker before return. Obviously, it has maximums throughput yet potentials of message loss. * * @param msg Message to send. - * @throws MQClientException if there is any client error. - * @throws RemotingException if there is any network-tier error. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. * @throws InterruptedException if the sending thread is interrupted. */ @Override @@ -428,32 +479,37 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * Same to {@link #send(Message)} with target message queue specified in addition. * * @param msg Message to send. - * @param mq Target message queue. + * @param mq Target message queue. * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message, * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc. - * @throws MQClientException if there is any client error. - * @throws RemotingException if there is any network-tier error. - * @throws MQBrokerException if there is any error with broker. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws MQBrokerException if there is any error with broker. * @throws InterruptedException if the sending thread is interrupted. */ @Override public SendResult send(Message msg, MessageQueue mq) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { msg.setTopic(withNamespace(msg.getTopic())); - return this.defaultMQProducerImpl.send(msg, queueWithNamespace(mq)); + mq = queueWithNamespace(mq); + if (this.getAutoBatch() && !(msg instanceof MessageBatch)) { + return sendByAccumulator(msg, mq, null); + } else { + return sendDirect(msg, mq, null); + } } /** * Same to {@link #send(Message)} with target message queue and send timeout specified. * - * @param msg Message to send. - * @param mq Target message queue. + * @param msg Message to send. + * @param mq Target message queue. * @param timeout send timeout. * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message, * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc. - * @throws MQClientException if there is any client error. - * @throws RemotingException if there is any network-tier error. - * @throws MQBrokerException if there is any error with broker. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws MQBrokerException if there is any error with broker. * @throws InterruptedException if the sending thread is interrupted. */ @Override @@ -466,29 +522,38 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Same to {@link #send(Message, SendCallback)} with target message queue specified. * - * @param msg Message to send. - * @param mq Target message queue. + * @param msg Message to send. + * @param mq Target message queue. * @param sendCallback Callback to execute on sending completed, either successful or unsuccessful. - * @throws MQClientException if there is any client error. - * @throws RemotingException if there is any network-tier error. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. * @throws InterruptedException if the sending thread is interrupted. */ @Override public void send(Message msg, MessageQueue mq, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { msg.setTopic(withNamespace(msg.getTopic())); - this.defaultMQProducerImpl.send(msg, queueWithNamespace(mq), sendCallback); + mq = queueWithNamespace(mq); + try { + if (this.getAutoBatch() && !(msg instanceof MessageBatch)) { + sendByAccumulator(msg, mq, sendCallback); + } else { + sendDirect(msg, mq, sendCallback); + } + } catch (MQBrokerException e) { + // ignore + } } /** * Same to {@link #send(Message, SendCallback)} with target message queue and send timeout specified. * - * @param msg Message to send. - * @param mq Target message queue. + * @param msg Message to send. + * @param mq Target message queue. * @param sendCallback Callback to execute on sending completed, either successful or unsuccessful. - * @param timeout Send timeout. - * @throws MQClientException if there is any client error. - * @throws RemotingException if there is any network-tier error. + * @param timeout Send timeout. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. * @throws InterruptedException if the sending thread is interrupted. */ @Override @@ -502,9 +567,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * Same to {@link #sendOneway(Message)} with target message queue specified. * * @param msg Message to send. - * @param mq Target message queue. - * @throws MQClientException if there is any client error. - * @throws RemotingException if there is any network-tier error. + * @param mq Target message queue. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. * @throws InterruptedException if the sending thread is interrupted. */ @Override @@ -517,35 +582,41 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Same to {@link #send(Message)} with message queue selector specified. * - * @param msg Message to send. + * @param msg Message to send. * @param selector Message queue selector, through which we get target message queue to deliver message to. - * @param arg Argument to work along with message queue selector. + * @param arg Argument to work along with message queue selector. * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message, * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc. - * @throws MQClientException if there is any client error. - * @throws RemotingException if there is any network-tier error. - * @throws MQBrokerException if there is any error with broker. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws MQBrokerException if there is any error with broker. * @throws InterruptedException if the sending thread is interrupted. */ @Override public SendResult send(Message msg, MessageQueueSelector selector, Object arg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { msg.setTopic(withNamespace(msg.getTopic())); - return this.defaultMQProducerImpl.send(msg, selector, arg); + MessageQueue mq = this.defaultMQProducerImpl.invokeMessageQueueSelector(msg, selector, arg, this.getSendMsgTimeout()); + mq = queueWithNamespace(mq); + if (this.getAutoBatch() && !(msg instanceof MessageBatch)) { + return sendByAccumulator(msg, mq, null); + } else { + return sendDirect(msg, mq, null); + } } /** * Same to {@link #send(Message, MessageQueueSelector, Object)} with send timeout specified. * - * @param msg Message to send. + * @param msg Message to send. * @param selector Message queue selector, through which we get target message queue to deliver message to. - * @param arg Argument to work along with message queue selector. - * @param timeout Send timeout. + * @param arg Argument to work along with message queue selector. + * @param timeout Send timeout. * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message, * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc. - * @throws MQClientException if there is any client error. - * @throws RemotingException if there is any network-tier error. - * @throws MQBrokerException if there is any error with broker. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws MQBrokerException if there is any error with broker. * @throws InterruptedException if the sending thread is interrupted. */ @Override @@ -558,31 +629,41 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Same to {@link #send(Message, SendCallback)} with message queue selector specified. * - * @param msg Message to send. - * @param selector Message selector through which to get target message queue. - * @param arg Argument used along with message queue selector. + * @param msg Message to send. + * @param selector Message selector through which to get target message queue. + * @param arg Argument used along with message queue selector. * @param sendCallback callback to execute on sending completion. - * @throws MQClientException if there is any client error. - * @throws RemotingException if there is any network-tier error. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. * @throws InterruptedException if the sending thread is interrupted. */ @Override public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { msg.setTopic(withNamespace(msg.getTopic())); - this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback); + try { + MessageQueue mq = this.defaultMQProducerImpl.invokeMessageQueueSelector(msg, selector, arg, this.getSendMsgTimeout()); + mq = queueWithNamespace(mq); + if (this.getAutoBatch() && !(msg instanceof MessageBatch)) { + sendByAccumulator(msg, mq, sendCallback); + } else { + sendDirect(msg, mq, sendCallback); + } + } catch (Throwable e) { + sendCallback.onException(e); + } } /** * Same to {@link #send(Message, MessageQueueSelector, Object, SendCallback)} with timeout specified. * - * @param msg Message to send. - * @param selector Message selector through which to get target message queue. - * @param arg Argument used along with message queue selector. + * @param msg Message to send. + * @param selector Message selector through which to get target message queue. + * @param arg Argument used along with message queue selector. * @param sendCallback callback to execute on sending completion. - * @param timeout Send timeout. - * @throws MQClientException if there is any client error. - * @throws RemotingException if there is any network-tier error. + * @param timeout Send timeout. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. * @throws InterruptedException if the sending thread is interrupted. */ @Override @@ -592,6 +673,42 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout); } + public SendResult sendDirect(Message msg, MessageQueue mq, + SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { + // send in sync mode + if (sendCallback == null) { + if (mq == null) { + return this.defaultMQProducerImpl.send(msg); + } else { + return this.defaultMQProducerImpl.send(msg, mq); + } + } else { + if (mq == null) { + this.defaultMQProducerImpl.send(msg, sendCallback); + } else { + this.defaultMQProducerImpl.send(msg, mq, sendCallback); + } + return null; + } + } + + public SendResult sendByAccumulator(Message msg, MessageQueue mq, + SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { + // check whether it can batch + if (!canBatch(msg)) { + return sendDirect(msg, mq, sendCallback); + } else { + Validators.checkMessage(msg, this); + MessageClientIDSetter.setUniqID(msg); + if (sendCallback == null) { + return this.produceAccumulator.send(msg, mq, this); + } else { + this.produceAccumulator.send(msg, mq, sendCallback, this); + return null; + } + } + } + /** * Send request message in synchronous mode. This method returns only when the consumer consume the request message and reply a message. </p> * @@ -599,13 +716,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * {@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may be potentially * delivered to broker(s). It's up to the application developers to resolve potential duplication issue. * - * @param msg request message to send + * @param msg request message to send * @param timeout request timeout * @return reply message - * @throws MQClientException if there is any client error. - * @throws RemotingException if there is any network-tier error. - * @throws MQBrokerException if there is any broker error. - * @throws InterruptedException if the thread is interrupted. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws MQBrokerException if there is any broker error. + * @throws InterruptedException if the thread is interrupted. * @throws RequestTimeoutException if request timeout. */ @Override @@ -618,18 +735,18 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Request asynchronously. </p> * This method returns immediately. On receiving reply message, <code>requestCallback</code> will be executed. </p> - * + * <p> * Similar to {@link #request(Message, long)}, internal implementation would potentially retry up to {@link * #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication and * application developers are the one to resolve this potential issue. * - * @param msg request message to send + * @param msg request message to send * @param requestCallback callback to execute on request completion. - * @param timeout request timeout - * @throws MQClientException if there is any client error. - * @throws RemotingException if there is any network-tier error. + * @param timeout request timeout + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. * @throws InterruptedException if the thread is interrupted. - * @throws MQBrokerException if there is any broker error. + * @throws MQBrokerException if there is any broker error. */ @Override public void request(final Message msg, final RequestCallback requestCallback, final long timeout) @@ -641,15 +758,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Same to {@link #request(Message, long)} with message queue selector specified. * - * @param msg request message to send + * @param msg request message to send * @param selector message queue selector, through which we get target message queue to deliver message to. - * @param arg argument to work along with message queue selector. - * @param timeout timeout of request. + * @param arg argument to work along with message queue selector. + * @param timeout timeout of request. * @return reply message - * @throws MQClientException if there is any client error. - * @throws RemotingException if there is any network-tier error. - * @throws MQBrokerException if there is any broker error. - * @throws InterruptedException if the thread is interrupted. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws MQBrokerException if there is any broker error. + * @throws InterruptedException if the thread is interrupted. * @throws RequestTimeoutException if request timeout. */ @Override @@ -663,15 +780,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Same to {@link #request(Message, RequestCallback, long)} with target message selector specified. * - * @param msg requst message to send - * @param selector message queue selector, through which we get target message queue to deliver message to. - * @param arg argument to work along with message queue selector. + * @param msg requst message to send + * @param selector message queue selector, through which we get target message queue to deliver message to. + * @param arg argument to work along with message queue selector. * @param requestCallback callback to execute on request completion. - * @param timeout timeout of request. - * @throws MQClientException if there is any client error. - * @throws RemotingException if there is any network-tier error. + * @param timeout timeout of request. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. * @throws InterruptedException if the thread is interrupted. - * @throws MQBrokerException if there is any broker error. + * @throws MQBrokerException if there is any broker error. */ @Override public void request(final Message msg, final MessageQueueSelector selector, final Object arg, @@ -684,13 +801,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Same to {@link #request(Message, long)} with target message queue specified in addition. * - * @param msg request message to send - * @param mq target message queue. + * @param msg request message to send + * @param mq target message queue. * @param timeout request timeout - * @throws MQClientException if there is any client error. - * @throws RemotingException if there is any network-tier error. - * @throws MQBrokerException if there is any broker error. - * @throws InterruptedException if the thread is interrupted. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws MQBrokerException if there is any broker error. + * @throws InterruptedException if the thread is interrupted. * @throws RequestTimeoutException if request timeout. */ @Override @@ -703,14 +820,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Same to {@link #request(Message, RequestCallback, long)} with target message queue specified. * - * @param msg request message to send - * @param mq target message queue. + * @param msg request message to send + * @param mq target message queue. * @param requestCallback callback to execute on request completion. - * @param timeout timeout of request. - * @throws MQClientException if there is any client error. - * @throws RemotingException if there is any network-tier error. + * @param timeout timeout of request. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. * @throws InterruptedException if the thread is interrupted. - * @throws MQBrokerException if there is any broker error. + * @throws MQBrokerException if there is any broker error. */ @Override public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout) @@ -722,11 +839,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Same to {@link #sendOneway(Message)} with message queue selector specified. * - * @param msg Message to send. + * @param msg Message to send. * @param selector Message queue selector, through which to determine target message queue to deliver message - * @param arg Argument used along with message queue selector. - * @throws MQClientException if there is any client error. - * @throws RemotingException if there is any network-tier error. + * @param arg Argument used along with message queue selector. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. * @throws InterruptedException if the sending thread is interrupted. */ @Override @@ -739,9 +856,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * This method is to send transactional messages. * - * @param msg Transactional message to send. + * @param msg Transactional message to send. * @param tranExecuter local transaction executor. - * @param arg Argument used along with local transaction executor. + * @param arg Argument used along with local transaction executor. * @return Transaction result. * @throws MQClientException if there is any client error. */ @@ -769,15 +886,16 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * This method will be removed in a certain version after April 5, 2020, so please do not use this method. * - * @param key accessKey - * @param newTopic topic name - * @param queueNum topic's queue number + * @param key accessKey + * @param newTopic topic name + * @param queueNum topic's queue number * @param attributes * @throws MQClientException if there is any client error. */ @Deprecated @Override - public void createTopic(String key, String newTopic, int queueNum, Map<String, String> attributes) throws MQClientException { + public void createTopic(String key, String newTopic, int queueNum, + Map<String, String> attributes) throws MQClientException { createTopic(key, withNamespace(newTopic), queueNum, 0, null); } @@ -785,23 +903,24 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * Create a topic on broker. This method will be removed in a certain version after April 5, 2020, so please do not * use this method. * - * @param key accessKey - * @param newTopic topic name - * @param queueNum topic's queue number + * @param key accessKey + * @param newTopic topic name + * @param queueNum topic's queue number * @param topicSysFlag topic system flag * @param attributes * @throws MQClientException if there is any client error. */ @Deprecated @Override - public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, Map<String, String> attributes) throws MQClientException { + public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, + Map<String, String> attributes) throws MQClientException { this.defaultMQProducerImpl.createTopic(key, withNamespace(newTopic), queueNum, topicSysFlag); } /** * Search consume queue offset of the given time stamp. * - * @param mq Instance of MessageQueue + * @param mq Instance of MessageQueue * @param timestamp from when in milliseconds. * @return Consume queue offset. * @throws MQClientException if there is any client error. @@ -813,7 +932,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Query maximum offset of the given message queue. - * + * <p> * This method will be removed in a certain version after April 5, 2020, so please do not use this method. * * @param mq Instance of MessageQueue @@ -828,7 +947,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Query minimum offset of the given message queue. - * + * <p> * This method will be removed in a certain version after April 5, 2020, so please do not use this method. * * @param mq Instance of MessageQueue @@ -843,7 +962,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Query the earliest message store time. - * + * <p> * This method will be removed in a certain version after April 5, 2020, so please do not use this method. * * @param mq Instance of MessageQueue @@ -858,14 +977,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Query message of the given offset message ID. - * + * <p> * This method will be removed in a certain version after April 5, 2020, so please do not use this method. * * @param offsetMsgId message id * @return Message specified. - * @throws MQBrokerException if there is any broker error. - * @throws MQClientException if there is any client error. - * @throws RemotingException if there is any network-tier error. + * @throws MQBrokerException if there is any broker error. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. * @throws InterruptedException if the sending thread is interrupted. */ @Deprecated @@ -877,16 +996,16 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Query message by key. - * + * <p> * This method will be removed in a certain version after April 5, 2020, so please do not use this method. * - * @param topic message topic - * @param key message key index word + * @param topic message topic + * @param key message key index word * @param maxNum max message number - * @param begin from when - * @param end to when + * @param begin from when + * @param end to when * @return QueryResult instance contains matched messages. - * @throws MQClientException if there is any client error. + * @throws MQClientException if there is any client error. * @throws InterruptedException if the thread is interrupted. */ @Deprecated @@ -898,15 +1017,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Query message of the given message ID. - * + * <p> * This method will be removed in a certain version after April 5, 2020, so please do not use this method. * * @param topic Topic * @param msgId Message ID * @return Message specified. - * @throws MQBrokerException if there is any broker error. - * @throws MQClientException if there is any client error. - * @throws RemotingException if there is any network-tier error. + * @throws MQBrokerException if there is any broker error. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. * @throws InterruptedException if the sending thread is interrupted. */ @Deprecated @@ -945,7 +1064,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { } @Override - public void send(Collection<Message> msgs, SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + public void send(Collection<Message> msgs, + SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.defaultMQProducerImpl.send(batch(msgs), sendCallback); } @@ -963,7 +1083,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { @Override public void send(Collection<Message> msgs, MessageQueue mq, - SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + SendCallback sendCallback, + long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.defaultMQProducerImpl.send(batch(msgs), queueWithNamespace(mq), sendCallback, timeout); } @@ -1012,6 +1133,62 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { return msgBatch; } + public int getBatchMaxDelayMs() { + if (this.produceAccumulator == null) { + return 0; + } + return produceAccumulator.getBatchMaxDelayMs(); + } + + public void batchMaxDelayMs(int holdMs) { + if (this.produceAccumulator == null) { + throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch"); + } + this.produceAccumulator.batchMaxDelayMs(holdMs); + } + + public long getBatchMaxBytes() { + if (this.produceAccumulator == null) { + return 0; + } + return produceAccumulator.getBatchMaxBytes(); + } + + public void batchMaxBytes(long holdSize) { + if (this.produceAccumulator == null) { + throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch"); + } + this.produceAccumulator.batchMaxBytes(holdSize); + } + + public long getTotalBatchMaxBytes() { + if (this.produceAccumulator == null) { + return 0; + } + return produceAccumulator.getTotalBatchMaxBytes(); + } + + public void totalBatchMaxBytes(long totalHoldSize) { + if (this.produceAccumulator == null) { + throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch"); + } + this.produceAccumulator.totalBatchMaxBytes(totalHoldSize); + } + + public boolean getAutoBatch() { + if (this.produceAccumulator == null) { + return false; + } + return this.autoBatch; + } + + public void setAutoBatch(boolean autoBatch) { + if (this.produceAccumulator == null) { + throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch"); + } + this.autoBatch = autoBatch; + } + public String getProducerGroup() { return producerGroup; } @@ -1130,7 +1307,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { } public boolean isEnableBackpressureForAsyncMode() { - return enableBackpressureForAsyncMode; + return enableBackpressureForAsyncMode; } public void setEnableBackpressureForAsyncMode(boolean enableBackpressureForAsyncMode) { diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java index f70ddb283..78657e623 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java @@ -40,7 +40,7 @@ public interface MQProducer extends MQAdmin { RemotingException, MQBrokerException, InterruptedException; void send(final Message msg, final SendCallback sendCallback) throws MQClientException, - RemotingException, InterruptedException; + RemotingException, InterruptedException, MQBrokerException; void send(final Message msg, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException; @@ -99,19 +99,23 @@ public interface MQProducer extends MQAdmin { SendResult send(final Collection<Message> msgs, final MessageQueue mq, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; - - void send(final Collection<Message> msgs, final SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, + + void send(final Collection<Message> msgs, + final SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; - - void send(final Collection<Message> msgs, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, + + void send(final Collection<Message> msgs, final SendCallback sendCallback, + final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; - - void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback) throws MQClientException, RemotingException, + + void send(final Collection<Message> msgs, final MessageQueue mq, + final SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; - - void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback, final long timeout) throws MQClientException, + + void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback, + final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; - + //for rpc Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException; diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java b/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java new file mode 100644 index 000000000..46dfcf71d --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java @@ -0,0 +1,510 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.producer; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.ServiceThread; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageBatch; +import org.apache.rocketmq.common.message.MessageClientIDSetter; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.remoting.exception.RemotingException; + +public class ProduceAccumulator { + // totalHoldSize normal value + private long totalHoldSize = 32 * 1024 * 1024; + // holdSize normal value + private long holdSize = 32 * 1024; + // holdMs normal value + private int holdMs = 10; + private final Logger log = LoggerFactory.getLogger(DefaultMQProducer.class); + private final GuardForSyncSendService guardThreadForSyncSend; + private final GuardForAsyncSendService guardThreadForAsyncSend; + private Map<AggregateKey, MessageAccumulation> syncSendBatchs = new ConcurrentHashMap<AggregateKey, MessageAccumulation>(); + private Map<AggregateKey, MessageAccumulation> asyncSendBatchs = new ConcurrentHashMap<AggregateKey, MessageAccumulation>(); + private AtomicLong currentlyHoldSize = new AtomicLong(0); + private final String instanceName; + + public ProduceAccumulator(String instanceName) { + this.instanceName = instanceName; + this.guardThreadForSyncSend = new GuardForSyncSendService(this.instanceName); + this.guardThreadForAsyncSend = new GuardForAsyncSendService(this.instanceName); + } + + private class GuardForSyncSendService extends ServiceThread { + private final String serviceName; + + public GuardForSyncSendService(String clientInstanceName) { + serviceName = String.format("Client_%s_GuardForSyncSend", clientInstanceName); + } + + @Override public String getServiceName() { + return serviceName; + } + + @Override public void run() { + log.info(this.getServiceName() + " service started"); + + while (!this.isStopped()) { + try { + this.doWork(); + } catch (Exception e) { + log.warn(this.getServiceName() + " service has exception. ", e); + } + } + + log.info(this.getServiceName() + " service end"); + } + + private void doWork() throws InterruptedException { + Collection<MessageAccumulation> values = syncSendBatchs.values(); + final int sleepTime = Math.max(1, holdMs / 2); + for (MessageAccumulation v : values) { + v.wakeup(); + synchronized (v) { + synchronized (v.closed) { + if (v.messagesSize.get() == 0) { + v.closed.set(true); + syncSendBatchs.remove(v.aggregateKey, v); + } else { + v.notify(); + } + } + } + } + Thread.sleep(sleepTime); + } + } + + private class GuardForAsyncSendService extends ServiceThread { + private final String serviceName; + + public GuardForAsyncSendService(String clientInstanceName) { + serviceName = String.format("Client_%s_GuardForAsyncSend", clientInstanceName); + } + + @Override public String getServiceName() { + return serviceName; + } + + @Override public void run() { + log.info(this.getServiceName() + " service started"); + + while (!this.isStopped()) { + try { + this.doWork(); + } catch (Exception e) { + log.warn(this.getServiceName() + " service has exception. ", e); + } + } + + log.info(this.getServiceName() + " service end"); + } + + private void doWork() throws Exception { + Collection<MessageAccumulation> values = asyncSendBatchs.values(); + final int sleepTime = Math.max(1, holdMs / 2); + for (MessageAccumulation v : values) { + if (v.readyToSend()) { + v.send(null); + } + synchronized (v.closed) { + if (v.messagesSize.get() == 0) { + v.closed.set(true); + asyncSendBatchs.remove(v.aggregateKey, v); + } + } + } + Thread.sleep(sleepTime); + } + } + + void start() { + guardThreadForSyncSend.start(); + guardThreadForAsyncSend.start(); + } + + void shutdown() { + guardThreadForSyncSend.shutdown(); + guardThreadForAsyncSend.shutdown(); + } + + int getBatchMaxDelayMs() { + return holdMs; + } + + void batchMaxDelayMs(int holdMs) { + if (holdMs <= 0 || holdMs > 30 * 1000) { + throw new IllegalArgumentException(String.format("batchMaxDelayMs expect between 1ms and 30s, but get %d!", holdMs)); + } + this.holdMs = holdMs; + } + + long getBatchMaxBytes() { + return holdSize; + } + + void batchMaxBytes(long holdSize) { + if (holdSize <= 0 || holdSize > 2 * 1024 * 1024) { + throw new IllegalArgumentException(String.format("batchMaxBytes expect between 1B and 2MB, but get %d!", holdSize)); + } + this.holdSize = holdSize; + } + + long getTotalBatchMaxBytes() { + return holdSize; + } + + void totalBatchMaxBytes(long totalHoldSize) { + if (totalHoldSize <= 0) { + throw new IllegalArgumentException(String.format("totalBatchMaxBytes must bigger then 0, but get %d!", totalHoldSize)); + } + this.totalHoldSize = totalHoldSize; + } + + private MessageAccumulation getOrCreateSyncSendBatch(AggregateKey aggregateKey, + DefaultMQProducer defaultMQProducer) { + MessageAccumulation batch = syncSendBatchs.get(aggregateKey); + if (batch != null) { + return batch; + } + batch = new MessageAccumulation(aggregateKey, defaultMQProducer); + MessageAccumulation previous = syncSendBatchs.putIfAbsent(aggregateKey, batch); + + return previous == null ? batch : previous; + } + + private MessageAccumulation getOrCreateAsyncSendBatch(AggregateKey aggregateKey, + DefaultMQProducer defaultMQProducer) { + MessageAccumulation batch = asyncSendBatchs.get(aggregateKey); + if (batch != null) { + return batch; + } + batch = new MessageAccumulation(aggregateKey, defaultMQProducer); + MessageAccumulation previous = asyncSendBatchs.putIfAbsent(aggregateKey, batch); + + return previous == null ? batch : previous; + } + + SendResult send(Message msg, + DefaultMQProducer defaultMQProducer) throws InterruptedException, MQBrokerException, RemotingException, MQClientException { + AggregateKey partitionKey = new AggregateKey(msg); + while (true) { + MessageAccumulation batch = getOrCreateSyncSendBatch(partitionKey, defaultMQProducer); + int index = batch.add(msg); + if (index == -1) { + syncSendBatchs.remove(partitionKey, batch); + } else { + return batch.sendResults[index]; + } + } + } + + SendResult send(Message msg, MessageQueue mq, + DefaultMQProducer defaultMQProducer) throws InterruptedException, MQBrokerException, RemotingException, MQClientException { + AggregateKey partitionKey = new AggregateKey(msg, mq); + while (true) { + MessageAccumulation batch = getOrCreateSyncSendBatch(partitionKey, defaultMQProducer); + int index = batch.add(msg); + if (index == -1) { + syncSendBatchs.remove(partitionKey, batch); + } else { + return batch.sendResults[index]; + } + } + } + + void send(Message msg, SendCallback sendCallback, + DefaultMQProducer defaultMQProducer) throws InterruptedException, RemotingException, MQClientException { + AggregateKey partitionKey = new AggregateKey(msg); + while (true) { + MessageAccumulation batch = getOrCreateAsyncSendBatch(partitionKey, defaultMQProducer); + if (!batch.add(msg, sendCallback)) { + asyncSendBatchs.remove(partitionKey, batch); + } else { + return; + } + } + } + + void send(Message msg, MessageQueue mq, + SendCallback sendCallback, + DefaultMQProducer defaultMQProducer) throws InterruptedException, RemotingException, MQClientException { + AggregateKey partitionKey = new AggregateKey(msg, mq); + while (true) { + MessageAccumulation batch = getOrCreateAsyncSendBatch(partitionKey, defaultMQProducer); + if (!batch.add(msg, sendCallback)) { + asyncSendBatchs.remove(partitionKey, batch); + } else { + return; + } + } + } + + boolean tryAddMessage(Message message) { + synchronized (currentlyHoldSize) { + if (currentlyHoldSize.get() < totalHoldSize) { + currentlyHoldSize.addAndGet(message.getBody().length); + return true; + } else { + return false; + } + } + } + + private class AggregateKey { + public String topic = null; + public MessageQueue mq = null; + public boolean waitStoreMsgOK = false; + public String tag = null; + + public AggregateKey(Message message) { + this(message.getTopic(), null, message.isWaitStoreMsgOK(), message.getTags()); + } + + public AggregateKey(Message message, MessageQueue mq) { + this(message.getTopic(), mq, message.isWaitStoreMsgOK(), message.getTags()); + } + + public AggregateKey(String topic, MessageQueue mq, boolean waitStoreMsgOK, String tag) { + this.topic = topic; + this.mq = mq; + this.waitStoreMsgOK = waitStoreMsgOK; + this.tag = tag; + } + + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + AggregateKey key = (AggregateKey) o; + return waitStoreMsgOK == key.waitStoreMsgOK && topic.equals(key.topic) && Objects.equals(mq, key.mq) && Objects.equals(tag, key.tag); + } + + @Override public int hashCode() { + return Objects.hash(topic, mq, waitStoreMsgOK, tag); + } + } + + private class MessageAccumulation { + private final DefaultMQProducer defaultMQProducer; + private LinkedList<Message> messages; + private LinkedList<SendCallback> sendCallbacks; + private Set<String> keys; + private AtomicBoolean closed; + private SendResult[] sendResults; + private AggregateKey aggregateKey; + private AtomicInteger messagesSize; + private int count; + private long createTime; + + public MessageAccumulation(AggregateKey aggregateKey, DefaultMQProducer defaultMQProducer) { + this.defaultMQProducer = defaultMQProducer; + this.messages = new LinkedList<Message>(); + this.sendCallbacks = new LinkedList<SendCallback>(); + this.keys = new HashSet<String>(); + this.closed = new AtomicBoolean(false); + this.messagesSize = new AtomicInteger(0); + this.aggregateKey = aggregateKey; + this.count = 0; + this.createTime = System.currentTimeMillis(); + } + + private boolean readyToSend() { + if (this.messagesSize.get() > holdSize + || System.currentTimeMillis() >= this.createTime + holdMs) { + return true; + } + return false; + } + + public int add( + Message msg) throws InterruptedException, MQBrokerException, RemotingException, MQClientException { + int ret = -1; + synchronized (this.closed) { + if (this.closed.get()) { + return ret; + } + ret = this.count++; + this.messages.add(msg); + messagesSize.addAndGet(msg.getBody().length); + String msgKeys = msg.getKeys(); + if (msgKeys != null) { + this.keys.addAll(Arrays.asList(msgKeys.split(MessageConst.KEY_SEPARATOR))); + } + } + synchronized (this) { + while (!this.closed.get()) { + if (readyToSend()) { + this.send(); + break; + } else { + this.wait(); + } + } + return ret; + } + } + + public boolean add(Message msg, + SendCallback sendCallback) throws InterruptedException, RemotingException, MQClientException { + synchronized (this.closed) { + if (this.closed.get()) { + return false; + } + this.count++; + this.messages.add(msg); + this.sendCallbacks.add(sendCallback); + messagesSize.getAndAdd(msg.getBody().length); + } + if (readyToSend()) { + this.send(sendCallback); + } + return true; + + } + + public synchronized void wakeup() { + if (this.closed.get()) { + return; + } + this.notify(); + } + + private MessageBatch batch() { + MessageBatch messageBatch = new MessageBatch(this.messages); + messageBatch.setTopic(this.aggregateKey.topic); + messageBatch.setWaitStoreMsgOK(this.aggregateKey.waitStoreMsgOK); + messageBatch.setKeys(this.keys); + messageBatch.setTags(this.aggregateKey.tag); + MessageClientIDSetter.setUniqID(messageBatch); + messageBatch.setBody(MessageDecoder.encodeMessages(this.messages)); + return messageBatch; + } + + private void splitSendResults(SendResult sendResult) { + if (sendResult == null) { + throw new IllegalArgumentException("sendResult is null"); + } + boolean isBatchConsumerQueue = !sendResult.getMsgId().contains(","); + this.sendResults = new SendResult[this.count]; + if (!isBatchConsumerQueue) { + String[] msgIds = sendResult.getMsgId().split(","); + String[] offsetMsgIds = sendResult.getOffsetMsgId().split(","); + if (offsetMsgIds.length != this.count || msgIds.length != this.count) { + throw new IllegalArgumentException("sendResult is illegal"); + } + for (int i = 0; i < this.count; i++) { + this.sendResults[i] = new SendResult(sendResult.getSendStatus(), msgIds[i], + sendResult.getMessageQueue(), sendResult.getQueueOffset() + i, + sendResult.getTransactionId(), offsetMsgIds[i], sendResult.getRegionId()); + } + } else { + for (int i = 0; i < this.count; i++) { + this.sendResults[i] = sendResult; + } + } + } + + private void send() throws InterruptedException, MQClientException, MQBrokerException, RemotingException { + synchronized (this.closed) { + if (this.closed.getAndSet(true)) { + return; + } + } + MessageBatch messageBatch = this.batch(); + SendResult sendResult = null; + try { + if (defaultMQProducer != null) { + sendResult = defaultMQProducer.sendDirect(messageBatch, aggregateKey.mq, null); + this.splitSendResults(sendResult); + } else { + throw new IllegalArgumentException("defaultMQProducer is null, can not send message"); + } + } finally { + currentlyHoldSize.addAndGet(-messagesSize.get()); + this.notifyAll(); + } + } + + private void send(SendCallback sendCallback) { + synchronized (this.closed) { + if (this.closed.getAndSet(true)) { + return; + } + } + MessageBatch messageBatch = this.batch(); + SendResult sendResult = null; + try { + if (defaultMQProducer != null) { + final int size = messagesSize.get(); + defaultMQProducer.sendDirect(messageBatch, aggregateKey.mq, new SendCallback() { + @Override public void onSuccess(SendResult sendResult) { + try { + splitSendResults(sendResult); + int i = 0; + Iterator<SendCallback> it = sendCallbacks.iterator(); + while (it.hasNext()) { + SendCallback v = it.next(); + v.onSuccess(sendResults[i++]); + } + if (i != count) { + throw new IllegalArgumentException("sendResult is illegal"); + } + currentlyHoldSize.addAndGet(-size); + } catch (Exception e) { + onException(e); + } + } + + @Override public void onException(Throwable e) { + for (SendCallback v : sendCallbacks) { + v.onException(e); + } + currentlyHoldSize.addAndGet(-size); + } + }); + } else { + throw new IllegalArgumentException("defaultMQProducer is null, can not send message"); + } + } catch (Exception e) { + for (SendCallback v : sendCallbacks) { + v.onException(e); + } + } + } + } +} diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index 658f22ab0..d4153c7cd 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -250,7 +250,7 @@ public class DefaultMQProducerTest { @Test public void testBatchSendMessageAsync() - throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + throws RemotingException, MQClientException, InterruptedException, MQBrokerException { final AtomicInteger cc = new AtomicInteger(0); final CountDownLatch countDownLatch = new CountDownLatch(4); @@ -504,6 +504,42 @@ public class DefaultMQProducerTest { assertThat(cc.get()).isEqualTo(1); } + @Test + public void testBatchSendMessageAsync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); + producer.setAutoBatch(true); + producer.send(message, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); + assertThat(sendResult.getOffsetMsgId()).isEqualTo("123"); + assertThat(sendResult.getQueueOffset()).isEqualTo(456L); + countDownLatch.countDown(); + } + + @Override + public void onException(Throwable e) { + countDownLatch.countDown(); + } + }); + + countDownLatch.await(3000L, TimeUnit.MILLISECONDS); + producer.setAutoBatch(false); + } + + @Test + public void testBatchSendMessageSync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { + producer.setAutoBatch(true); + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); + SendResult sendResult = producer.send(message); + + assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); + assertThat(sendResult.getOffsetMsgId()).isEqualTo("123"); + assertThat(sendResult.getQueueOffset()).isEqualTo(456L); + producer.setAutoBatch(false); + } + public static TopicRouteData createTopicRoute() { TopicRouteData topicRouteData = new TopicRouteData(); diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/ProduceAccumulatorTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/ProduceAccumulatorTest.java new file mode 100644 index 000000000..7074fae24 --- /dev/null +++ b/client/src/test/java/org/apache/rocketmq/client/producer/ProduceAccumulatorTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.producer; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageBatch; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ProduceAccumulatorTest { + private boolean compareMessageBatch(MessageBatch a, MessageBatch b) { + if (!a.getTopic().equals(b.getTopic())) { + return false; + } + if (!Arrays.equals(a.getBody(), b.getBody())) { + return false; + } + return true; + } + + private class MockMQProducer extends DefaultMQProducer { + private Message beSendMessage = null; + private MessageQueue beSendMessageQueue = null; + + @Override + public SendResult sendDirect(Message msg, MessageQueue mq, + SendCallback sendCallback) { + this.beSendMessage = msg; + this.beSendMessageQueue = mq; + + SendResult sendResult = new SendResult(); + sendResult.setMsgId("123"); + if (sendCallback != null) { + sendCallback.onSuccess(sendResult); + } + return sendResult; + } + } + + @Test + public void testProduceAccumulator_async() throws MQBrokerException, RemotingException, InterruptedException, MQClientException { + MockMQProducer mockMQProducer = new MockMQProducer(); + + ProduceAccumulator produceAccumulator = new ProduceAccumulator("test"); + produceAccumulator.start(); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + List<Message> messages = new ArrayList<Message>(); + messages.add(new Message("testTopic", "1".getBytes())); + messages.add(new Message("testTopic", "22".getBytes())); + messages.add(new Message("testTopic", "333".getBytes())); + messages.add(new Message("testTopic", "4444".getBytes())); + messages.add(new Message("testTopic", "55555".getBytes())); + for (Message message : messages) { + produceAccumulator.send(message, new SendCallback() { + final CountDownLatch finalCountDownLatch = countDownLatch; + + @Override + public void onSuccess(SendResult sendResult) { + finalCountDownLatch.countDown(); + } + + @Override + public void onException(Throwable e) { + finalCountDownLatch.countDown(); + } + }, mockMQProducer); + } + assertThat(countDownLatch.await(3000L, TimeUnit.MILLISECONDS)).isTrue(); + assertThat(mockMQProducer.beSendMessage instanceof MessageBatch).isTrue(); + + MessageBatch messageBatch1 = (MessageBatch) mockMQProducer.beSendMessage; + MessageBatch messageBatch2 = MessageBatch.generateFromList(messages); + messageBatch2.setBody(messageBatch2.encode()); + + assertThat(compareMessageBatch(messageBatch1, messageBatch2)).isTrue(); + } + + @Test + public void testProduceAccumulator_sync() throws MQBrokerException, RemotingException, InterruptedException, MQClientException { + final MockMQProducer mockMQProducer = new MockMQProducer(); + + final ProduceAccumulator produceAccumulator = new ProduceAccumulator("test"); + produceAccumulator.start(); + + List<Message> messages = new ArrayList<Message>(); + messages.add(new Message("testTopic", "1".getBytes())); + messages.add(new Message("testTopic", "22".getBytes())); + messages.add(new Message("testTopic", "333".getBytes())); + messages.add(new Message("testTopic", "4444".getBytes())); + messages.add(new Message("testTopic", "55555".getBytes())); + final CountDownLatch countDownLatch = new CountDownLatch(messages.size()); + + for (final Message message : messages) { + new Thread(new Runnable() { + final ProduceAccumulator finalProduceAccumulator = produceAccumulator; + final CountDownLatch finalCountDownLatch = countDownLatch; + final MockMQProducer finalMockMQProducer = mockMQProducer; + final Message finalMessage = message; + + @Override + public void run() { + try { + finalProduceAccumulator.send(finalMessage, finalMockMQProducer); + finalCountDownLatch.countDown(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }).start(); + } + assertThat(countDownLatch.await(3000L, TimeUnit.MILLISECONDS)).isTrue(); + assertThat(mockMQProducer.beSendMessage instanceof MessageBatch).isTrue(); + + MessageBatch messageBatch1 = (MessageBatch) mockMQProducer.beSendMessage; + MessageBatch messageBatch2 = MessageBatch.generateFromList(messages); + messageBatch2.setBody(messageBatch2.encode()); + + assertThat(messageBatch1.getTopic()).isEqualTo(messageBatch2.getTopic()); + // The execution order is uncertain, just compare the length + assertThat(messageBatch1.getBody().length).isEqualTo(messageBatch2.getBody().length); + } + + @Test + public void testProduceAccumulator_sendWithMessageQueue() throws MQBrokerException, RemotingException, InterruptedException, MQClientException { + MockMQProducer mockMQProducer = new MockMQProducer(); + + MessageQueue messageQueue = new MessageQueue("topicTest", "brokerTest", 0); + final ProduceAccumulator produceAccumulator = new ProduceAccumulator("test"); + produceAccumulator.start(); + + Message message = new Message("testTopic", "1".getBytes()); + produceAccumulator.send(message, messageQueue, mockMQProducer); + assertThat(mockMQProducer.beSendMessageQueue).isEqualTo(messageQueue); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + produceAccumulator.send(message, messageQueue, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + countDownLatch.countDown(); + } + + @Override + public void onException(Throwable e) { + countDownLatch.countDown(); + } + }, mockMQProducer); + assertThat(countDownLatch.await(3000L, TimeUnit.MILLISECONDS)).isTrue(); + assertThat(mockMQProducer.beSendMessageQueue).isEqualTo(messageQueue); + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java index a423048c5..30369b8f3 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java @@ -27,7 +27,7 @@ public class MessageBatch extends Message implements Iterable<Message> { private static final long serialVersionUID = 621335151046335557L; private final List<Message> messages; - private MessageBatch(List<Message> messages) { + public MessageBatch(List<Message> messages) { this.messages = messages; } -- 2.32.0.windows.2
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.
浙ICP备2022010568号-2