Projects
home:Eustace:branches:Eulaceura:Factory
rocketmq
_service:obs_scm:patch025-backport-Fix-channel-...
Sign Up
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File _service:obs_scm:patch025-backport-Fix-channel-connect-issue.patch of Package rocketmq
From d73b6013825db9124e39a37db67094e34b9c3d88 Mon Sep 17 00:00:00 2001 From: Zhouxiang Zhan <zhouxzhan@apache.org> Date: Mon, 16 Oct 2023 19:06:40 +0800 Subject: [PATCH] [ISSUE #7330] Fix channel connect issue for goaway (#7467) * add waitChannelFuture for goaway * add body for retry channel --- .../remoting/netty/NettyRemotingClient.java | 41 +++++++++++++------ 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index 4bc51bd83..340daee67 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -716,20 +716,25 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } if (cw != null) { - ChannelFuture channelFuture = cw.getChannelFuture(); - if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) { - if (cw.isOK()) { - LOGGER.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString()); - return cw.getChannel(); - } else { - LOGGER.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString()); - } + return waitChannelFuture(addr, cw); + } + + return null; + } + + private Channel waitChannelFuture(String addr, ChannelWrapper cw) { + ChannelFuture channelFuture = cw.getChannelFuture(); + if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) { + if (cw.isOK()) { + LOGGER.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString()); + return cw.getChannel(); } else { - LOGGER.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(), - channelFuture.toString()); + LOGGER.warn("createChannel: connect remote host[{}] failed, {}", addr, channelFuture.toString()); } + } else { + LOGGER.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(), + channelFuture.toString()); } - return null; } @@ -818,8 +823,14 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti long duration = stopwatch.elapsed(TimeUnit.MILLISECONDS); stopwatch.stop(); RemotingCommand retryRequest = RemotingCommand.createRequestCommand(request.getCode(), request.readCustomHeader()); - Channel retryChannel = channelWrapper.getChannel(); - if (channel != retryChannel) { + retryRequest.setBody(request.getBody()); + Channel retryChannel; + if (channelWrapper.isOK()) { + retryChannel = channelWrapper.getChannel(); + } else { + retryChannel = waitChannelFuture(channelWrapper.getChannelAddress(), channelWrapper); + } + if (retryChannel != null && channel != retryChannel) { return super.invokeImpl(retryChannel, retryRequest, timeoutMillis - duration); } } @@ -994,6 +1005,10 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti this.lastResponseTime = System.currentTimeMillis(); } + public String getChannelAddress() { + return channelAddress; + } + public boolean reconnect() { if (lock.writeLock().tryLock()) { try { -- 2.32.0.windows.2
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.
浙ICP备2022010568号-2