Projects
home:Eustace:branches:Eulaceura:Factory
rocketmq
_service:obs_scm:patch028-backport-Fix-proxy-cl...
Sign Up
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File _service:obs_scm:patch028-backport-Fix-proxy-client-language-error.patch of Package rocketmq
From 3968c186a59db96701ade8c343bc6a5d31ee2d24 Mon Sep 17 00:00:00 2001 From: weihubeats <weihu@apache.org> Date: Fri, 20 Oct 2023 14:49:00 +0800 Subject: [PATCH 1/2] [ISSUE #7231] Fix: proxy client language error (#7200) * Adding null does not update * add langeuga code * add langeuga code * add langeuga code * add langeuga code * add langeuga code * Rerun ci * Rerun ci * Rerun ci * remove redundant package imports * redundant line * modify the parameter passed as proxyContext to language * format --- .../proxy/service/message/LocalMessageService.java | 12 ++++++------ .../proxy/service/message/LocalRemotingCommand.java | 8 ++++++-- .../rocketmq/remoting/protocol/LanguageCode.java | 11 +++++++++++ 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java index ca7dcc9eb..aaa688fee 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java @@ -104,7 +104,7 @@ public class LocalMessageService implements MessageService { body = message.getBody(); messageId = MessageClientIDSetter.getUniqID(message); } - RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader); + RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader, ctx.getLanguage()); request.setBody(body); CompletableFuture<RemotingCommand> future = new CompletableFuture<>(); SimpleChannel channel = channelManager.createInvocationChannel(ctx); @@ -162,7 +162,7 @@ public class LocalMessageService implements MessageService { ConsumerSendMsgBackRequestHeader requestHeader, long timeoutMillis) { SimpleChannel channel = channelManager.createChannel(ctx); ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext(); - RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader); + RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader, ctx.getLanguage()); CompletableFuture<RemotingCommand> future = new CompletableFuture<>(); try { RemotingCommand response = brokerController.getSendMessageProcessor() @@ -181,7 +181,7 @@ public class LocalMessageService implements MessageService { CompletableFuture<Void> future = new CompletableFuture<>(); SimpleChannel channel = channelManager.createChannel(ctx); ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext(); - RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader); + RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader, ctx.getLanguage()); try { brokerController.getEndTransactionProcessor() .processRequest(channelHandlerContext, command); @@ -196,7 +196,7 @@ public class LocalMessageService implements MessageService { public CompletableFuture<PopResult> popMessage(ProxyContext ctx, AddressableMessageQueue messageQueue, PopMessageRequestHeader requestHeader, long timeoutMillis) { requestHeader.setBornTime(System.currentTimeMillis()); - RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.POP_MESSAGE, requestHeader); + RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.POP_MESSAGE, requestHeader, ctx.getLanguage()); CompletableFuture<RemotingCommand> future = new CompletableFuture<>(); SimpleChannel channel = channelManager.createInvocationChannel(ctx); InvocationContext invocationContext = new InvocationContext(future); @@ -307,7 +307,7 @@ public class LocalMessageService implements MessageService { ChangeInvisibleTimeRequestHeader requestHeader, long timeoutMillis) { SimpleChannel channel = channelManager.createChannel(ctx); ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext(); - RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, requestHeader); + RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, requestHeader, ctx.getLanguage()); CompletableFuture<RemotingCommand> future = new CompletableFuture<>(); try { RemotingCommand response = brokerController.getChangeInvisibleTimeProcessor() @@ -346,7 +346,7 @@ public class LocalMessageService implements MessageService { AckMessageRequestHeader requestHeader, long timeoutMillis) { SimpleChannel channel = channelManager.createChannel(ctx); ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext(); - RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader); + RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader, ctx.getLanguage()); CompletableFuture<RemotingCommand> future = new CompletableFuture<>(); try { RemotingCommand response = brokerController.getAckMessageProcessor() diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java index 73048dbbc..915cafcd5 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java @@ -16,16 +16,19 @@ */ package org.apache.rocketmq.proxy.service.message; -import java.util.HashMap; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.protocol.LanguageCode; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import java.util.HashMap; + public class LocalRemotingCommand extends RemotingCommand { - public static LocalRemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) { + public static LocalRemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader, String language) { LocalRemotingCommand cmd = new LocalRemotingCommand(); cmd.setCode(code); + cmd.setLanguage(LanguageCode.getCode(language)); cmd.writeCustomHeader(customHeader); cmd.setExtFields(new HashMap<>()); setCmdVersion(cmd); @@ -37,4 +40,5 @@ public class LocalRemotingCommand extends RemotingCommand { Class<? extends CommandCustomHeader> classHeader) throws RemotingCommandException { return classHeader.cast(readCustomHeader()); } + } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java index 19280f996..2df9fbf02 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java @@ -17,6 +17,11 @@ package org.apache.rocketmq.remoting.protocol; +import java.util.Arrays; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + public enum LanguageCode { JAVA((byte) 0), CPP((byte) 1), @@ -50,4 +55,10 @@ public enum LanguageCode { public byte getCode() { return code; } + + private static final Map<String, LanguageCode> MAP = Arrays.stream(LanguageCode.values()).collect(Collectors.toMap(LanguageCode::name, Function.identity())); + + public static LanguageCode getCode(String language) { + return MAP.get(language); + } } -- 2.32.0.windows.2 From 8f020b397a3afdd75429ae91e3624812b8ffc9e1 Mon Sep 17 00:00:00 2001 From: Ao Qiao <qiao_ao@foxmail.com> Date: Mon, 23 Oct 2023 16:34:10 +0800 Subject: [PATCH 2/2] [ISSUE #7489] Code comment enhancement in example (#7490) * Doc: How to debug in Idea * en * enhance code comment --- .../java/org/apache/rocketmq/example/simple/PullConsumer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java index 5ac8d247d..e1a02aa26 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java @@ -75,7 +75,7 @@ public class PullConsumer { if (msgs != null && !msgs.isEmpty()) { this.doSomething(msgs); - //update offset to broker + //update offset to local memory, eventually to broker consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset()); //print pull tps this.incPullTPS(topic, pullResult.getMsgFoundList().size()); -- 2.32.0.windows.2
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.
浙ICP备2022010568号-2