Projects
Eulaceura:Factory
rocketmq
_service:obs_scm:patch004-backport-Support-Prox...
Sign Up
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File _service:obs_scm:patch004-backport-Support-Proxy-Protocol-for-gRPC-and-Remoting-Server.patch of Package rocketmq
From 955428278ccd9bfa0f15e21a8d3040c5213358bd Mon Sep 17 00:00:00 2001 From: Dongyuan Pan <dongyuanpan0@gmail.com> Date: Tue, 4 Jul 2023 18:01:48 +0800 Subject: [PATCH 1/5] [ISSUE #6991] Delete rocketmq.client.logUseSlf4j=true in JAVA_OPT --- distribution/bin/runbroker.cmd | 1 - distribution/bin/runbroker.sh | 1 - 2 files changed, 2 deletions(-) diff --git a/distribution/bin/runbroker.cmd b/distribution/bin/runbroker.cmd index 15f676aa8..77a0d1ff8 100644 --- a/distribution/bin/runbroker.cmd +++ b/distribution/bin/runbroker.cmd @@ -36,7 +36,6 @@ set "JAVA_OPT=%JAVA_OPT% -XX:-OmitStackTraceInFastThrow" set "JAVA_OPT=%JAVA_OPT% -XX:+AlwaysPreTouch" set "JAVA_OPT=%JAVA_OPT% -XX:MaxDirectMemorySize=15g" set "JAVA_OPT=%JAVA_OPT% -XX:-UseLargePages -XX:-UseBiasedLocking" -set "JAVA_OPT=%JAVA_OPT% -Drocketmq.client.logUseSlf4j=true" set "JAVA_OPT=%JAVA_OPT% %JAVA_OPT_EXT% -cp %CLASSPATH%" "%JAVA%" %JAVA_OPT% %* \ No newline at end of file diff --git a/distribution/bin/runbroker.sh b/distribution/bin/runbroker.sh index a081df79e..e6e2132ab 100644 --- a/distribution/bin/runbroker.sh +++ b/distribution/bin/runbroker.sh @@ -106,7 +106,6 @@ JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow" JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch" JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g" JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking" -JAVA_OPT="${JAVA_OPT} -Drocketmq.client.logUseSlf4j=true" #JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n" JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}" JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}" -- 2.32.0.windows.2 From 00fc42b8be848fc3f5c550cbab007b92f128dc38 Mon Sep 17 00:00:00 2001 From: ShuangxiDing <dingshuangxi888@gmail.com> Date: Tue, 4 Jul 2023 18:02:16 +0800 Subject: [PATCH 2/5] [ISSUE #6957] Support Proxy Protocol for gRPC and Remoting Server (#6958) --- WORKSPACE | 1 + .../common/constant/HAProxyConstants.java | 28 ++++ pom.xml | 5 + proxy/BUILD.bazel | 2 + proxy/pom.xml | 4 + .../proxy/grpc/GrpcServerBuilder.java | 2 +- ...ava => ProxyAndTlsProtocolNegotiator.java} | 139 ++++++++++++++++-- .../proxy/grpc/constant/AttributeKeys.java | 44 ++++++ .../grpc/interceptor/HeaderInterceptor.java | 32 +++- .../remoting/MultiProtocolRemotingServer.java | 5 +- .../remoting/common/RemotingHelper.java | 42 ++++-- .../remoting/netty/AttributeKeys.java | 45 ++++++ .../remoting/netty/NettyRemotingServer.java | 129 ++++++++++++++-- .../rocketmq/remoting/ProxyProtocolTest.java | 116 +++++++++++++++ .../org/apache/rocketmq/remoting/TlsTest.java | 28 ++-- 15 files changed, 563 insertions(+), 59 deletions(-) create mode 100644 common/src/main/java/org/apache/rocketmq/common/constant/HAProxyConstants.java rename proxy/src/main/java/org/apache/rocketmq/proxy/grpc/{OptionalSSLProtocolNegotiator.java => ProxyAndTlsProtocolNegotiator.java} (51%) create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/grpc/constant/AttributeKeys.java create mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java create mode 100644 remoting/src/test/java/org/apache/rocketmq/remoting/ProxyProtocolTest.java diff --git a/WORKSPACE b/WORKSPACE index fbb694efe..e3a8f37dc 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -104,6 +104,7 @@ maven_install( "software.amazon.awssdk:s3:2.20.29", "com.fasterxml.jackson.core:jackson-databind:2.13.4.2", "com.adobe.testing:s3mock-junit4:2.11.0", + "io.github.aliyunmq:rocketmq-grpc-netty-codec-haproxy:1.0.0", ], fetch_sources = True, repositories = [ diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/HAProxyConstants.java b/common/src/main/java/org/apache/rocketmq/common/constant/HAProxyConstants.java new file mode 100644 index 000000000..c1ae0cca1 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/constant/HAProxyConstants.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.constant; + +public class HAProxyConstants { + + public static final String PROXY_PROTOCOL_PREFIX = "proxy_protocol_"; + public static final String PROXY_PROTOCOL_ADDR = PROXY_PROTOCOL_PREFIX + "addr"; + public static final String PROXY_PROTOCOL_PORT = PROXY_PROTOCOL_PREFIX + "port"; + public static final String PROXY_PROTOCOL_SERVER_ADDR = PROXY_PROTOCOL_PREFIX + "server_addr"; + public static final String PROXY_PROTOCOL_SERVER_PORT = PROXY_PROTOCOL_PREFIX + "server_port"; + public static final String PROXY_PROTOCOL_TLV_PREFIX = PROXY_PROTOCOL_PREFIX + "tlv_0x"; +} diff --git a/pom.xml b/pom.xml index a3b474602..12bc2dbd5 100644 --- a/pom.xml +++ b/pom.xml @@ -888,6 +888,11 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>io.github.aliyunmq</groupId> + <artifactId>rocketmq-grpc-netty-codec-haproxy</artifactId> + <version>1.0.0</version> + </dependency> <dependency> <groupId>com.conversantmedia</groupId> <artifactId>disruptor</artifactId> diff --git a/proxy/BUILD.bazel b/proxy/BUILD.bazel index fcb85e46f..b4f3c16e2 100644 --- a/proxy/BUILD.bazel +++ b/proxy/BUILD.bazel @@ -46,6 +46,7 @@ java_library( "@maven//:io_grpc_grpc_services", "@maven//:io_grpc_grpc_stub", "@maven//:io_netty_netty_all", + "@maven//:io_github_aliyunmq_rocketmq_grpc_netty_codec_haproxy", "@maven//:io_openmessaging_storage_dledger", "@maven//:io_opentelemetry_opentelemetry_api", "@maven//:io_opentelemetry_opentelemetry_exporter_otlp", @@ -94,6 +95,7 @@ java_library( "@maven//:io_grpc_grpc_netty_shaded", "@maven//:io_grpc_grpc_stub", "@maven//:io_netty_netty_all", + "@maven//:io_github_aliyunmq_rocketmq_grpc_netty_codec_haproxy", "@maven//:org_apache_commons_commons_lang3", "@maven//:io_opentelemetry_opentelemetry_exporter_otlp", "@maven//:io_opentelemetry_opentelemetry_exporter_prometheus", diff --git a/proxy/pom.xml b/proxy/pom.xml index f14155737..3fbea107a 100644 --- a/proxy/pom.xml +++ b/proxy/pom.xml @@ -75,6 +75,10 @@ <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java-util</artifactId> </dependency> + <dependency> + <groupId>io.github.aliyunmq</groupId> + <artifactId>rocketmq-grpc-netty-codec-haproxy</artifactId> + </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java index 0ca6a1fcb..437b9216b 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java @@ -50,7 +50,7 @@ public class GrpcServerBuilder { protected GrpcServerBuilder(ThreadPoolExecutor executor, int port) { serverBuilder = NettyServerBuilder.forPort(port); - serverBuilder.protocolNegotiator(new OptionalSSLProtocolNegotiator()); + serverBuilder.protocolNegotiator(new ProxyAndTlsProtocolNegotiator()); // build server int bossLoopNum = ConfigurationManager.getProxyConfig().getGrpcBossLoopNum(); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java similarity index 51% rename from proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java rename to proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java index 670e1c1a2..ceb9becc0 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java @@ -16,36 +16,53 @@ */ package org.apache.rocketmq.proxy.grpc; +import io.grpc.Attributes; import io.grpc.netty.shaded.io.grpc.netty.GrpcHttp2ConnectionHandler; import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiationEvent; import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiator; import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiators; +import io.grpc.netty.shaded.io.grpc.netty.ProtocolNegotiationEvent; import io.grpc.netty.shaded.io.netty.buffer.ByteBuf; import io.grpc.netty.shaded.io.netty.channel.ChannelHandler; import io.grpc.netty.shaded.io.netty.channel.ChannelHandlerContext; +import io.grpc.netty.shaded.io.netty.channel.ChannelInboundHandlerAdapter; import io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder; +import io.grpc.netty.shaded.io.netty.handler.codec.ProtocolDetectionResult; +import io.grpc.netty.shaded.io.netty.handler.codec.ProtocolDetectionState; +import io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyMessage; +import io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyMessageDecoder; +import io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyProtocolVersion; import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth; import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; import io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler; import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.grpc.netty.shaded.io.netty.handler.ssl.util.SelfSignedCertificate; import io.grpc.netty.shaded.io.netty.util.AsciiString; -import java.io.InputStream; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.List; +import io.grpc.netty.shaded.io.netty.util.CharsetUtil; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.constant.HAProxyConstants; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.ProxyConfig; +import org.apache.rocketmq.proxy.grpc.constant.AttributeKeys; import org.apache.rocketmq.remoting.common.TlsMode; import org.apache.rocketmq.remoting.netty.TlsSystemConfig; -public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator.ProtocolNegotiator { +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.List; + +public class ProxyAndTlsProtocolNegotiator implements InternalProtocolNegotiator.ProtocolNegotiator { protected static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); + private static final String HA_PROXY_DECODER = "HAProxyDecoder"; + private static final String HA_PROXY_HANDLER = "HAProxyHandler"; + private static final String TLS_MODE_HANDLER = "TlsModeHandler"; /** * the length of the ssl record header (in bytes) */ @@ -53,7 +70,7 @@ public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator private static SslContext sslContext; - public OptionalSSLProtocolNegotiator() { + public ProxyAndTlsProtocolNegotiator() { sslContext = loadSslContext(); } @@ -64,11 +81,12 @@ public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator @Override public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) { - return new PortUnificationServerHandler(grpcHandler); + return new ProxyAndTlsProtocolHandler(grpcHandler); } @Override - public void close() {} + public void close() { + } private static SslContext loadSslContext() { try { @@ -85,8 +103,8 @@ public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator String tlsCertPath = ConfigurationManager.getProxyConfig().getTlsCertPath(); try (InputStream serverKeyInputStream = Files.newInputStream( Paths.get(tlsKeyPath)); - InputStream serverCertificateStream = Files.newInputStream( - Paths.get(tlsCertPath))) { + InputStream serverCertificateStream = Files.newInputStream( + Paths.get(tlsCertPath))) { SslContext res = GrpcSslContexts.forServer(serverCertificateStream, serverKeyInputStream) .trustManager(InsecureTrustManagerFactory.INSTANCE) @@ -102,12 +120,95 @@ public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator } } - public static class PortUnificationServerHandler extends ByteToMessageDecoder { + private static class ProxyAndTlsProtocolHandler extends ByteToMessageDecoder { + + private final GrpcHttp2ConnectionHandler grpcHandler; + + public ProxyAndTlsProtocolHandler(GrpcHttp2ConnectionHandler grpcHandler) { + this.grpcHandler = grpcHandler; + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { + try { + ProtocolDetectionResult<HAProxyProtocolVersion> ha = HAProxyMessageDecoder.detectProtocol( + in); + if (ha.state() == ProtocolDetectionState.NEEDS_MORE_DATA) { + return; + } + if (ha.state() == ProtocolDetectionState.DETECTED) { + ctx.pipeline().addAfter(ctx.name(), HA_PROXY_DECODER, new HAProxyMessageDecoder()) + .addAfter(HA_PROXY_DECODER, HA_PROXY_HANDLER, new HAProxyMessageHandler()) + .addAfter(HA_PROXY_HANDLER, TLS_MODE_HANDLER, new TlsModeHandler(grpcHandler)); + } else { + ctx.pipeline().addAfter(ctx.name(), TLS_MODE_HANDLER, new TlsModeHandler(grpcHandler)); + } + + ctx.fireUserEventTriggered(InternalProtocolNegotiationEvent.getDefault()); + ctx.pipeline().remove(this); + } catch (Exception e) { + log.error("process proxy protocol negotiator failed.", e); + throw e; + } + } + } + + private static class HAProxyMessageHandler extends ChannelInboundHandlerAdapter { + + private ProtocolNegotiationEvent pne = InternalProtocolNegotiationEvent.getDefault(); + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof HAProxyMessage) { + replaceEventWithMessage((HAProxyMessage) msg); + ctx.fireUserEventTriggered(pne); + } else { + super.channelRead(ctx, msg); + } + ctx.pipeline().remove(this); + } + + /** + * The definition of key refers to the implementation of nginx + * <a href="https://nginx.org/en/docs/http/ngx_http_core_module.html#var_proxy_protocol_addr">ngx_http_core_module</a> + * + * @param msg + */ + private void replaceEventWithMessage(HAProxyMessage msg) { + Attributes.Builder builder = InternalProtocolNegotiationEvent.getAttributes(pne).toBuilder(); + if (StringUtils.isNotBlank(msg.sourceAddress())) { + builder.set(AttributeKeys.PROXY_PROTOCOL_ADDR, msg.sourceAddress()); + } + if (msg.sourcePort() > 0) { + builder.set(AttributeKeys.PROXY_PROTOCOL_PORT, String.valueOf(msg.sourcePort())); + } + if (StringUtils.isNotBlank(msg.destinationAddress())) { + builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR, msg.destinationAddress()); + } + if (msg.destinationPort() > 0) { + builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT, String.valueOf(msg.destinationPort())); + } + if (CollectionUtils.isNotEmpty(msg.tlvs())) { + msg.tlvs().forEach(tlv -> { + Attributes.Key<String> key = AttributeKeys.valueOf( + HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue())); + String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8)); + builder.set(key, value); + }); + } + pne = InternalProtocolNegotiationEvent + .withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build()); + } + } + + private static class TlsModeHandler extends ByteToMessageDecoder { + + private ProtocolNegotiationEvent pne = InternalProtocolNegotiationEvent.getDefault(); private final ChannelHandler ssl; private final ChannelHandler plaintext; - public PortUnificationServerHandler(GrpcHttp2ConnectionHandler grpcHandler) { + public TlsModeHandler(GrpcHttp2ConnectionHandler grpcHandler) { this.ssl = InternalProtocolNegotiators.serverTls(sslContext) .newHandler(grpcHandler); this.plaintext = InternalProtocolNegotiators.serverPlaintext() @@ -115,8 +216,7 @@ public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator } @Override - protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) - throws Exception { + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { TlsMode tlsMode = TlsSystemConfig.tlsMode; if (TlsMode.ENFORCING.equals(tlsMode)) { @@ -134,12 +234,21 @@ public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator ctx.pipeline().addAfter(ctx.name(), null, this.plaintext); } } - ctx.fireUserEventTriggered(InternalProtocolNegotiationEvent.getDefault()); + ctx.fireUserEventTriggered(pne); ctx.pipeline().remove(this); } catch (Exception e) { log.error("process ssl protocol negotiator failed.", e); throw e; } } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof ProtocolNegotiationEvent) { + pne = (ProtocolNegotiationEvent) evt; + } else { + super.userEventTriggered(ctx, evt); + } + } } } \ No newline at end of file diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/constant/AttributeKeys.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/constant/AttributeKeys.java new file mode 100644 index 000000000..096a5ba3d --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/constant/AttributeKeys.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.proxy.grpc.constant; + +import io.grpc.Attributes; +import org.apache.rocketmq.common.constant.HAProxyConstants; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class AttributeKeys { + + public static final Attributes.Key<String> PROXY_PROTOCOL_ADDR = + Attributes.Key.create(HAProxyConstants.PROXY_PROTOCOL_ADDR); + + public static final Attributes.Key<String> PROXY_PROTOCOL_PORT = + Attributes.Key.create(HAProxyConstants.PROXY_PROTOCOL_PORT); + + public static final Attributes.Key<String> PROXY_PROTOCOL_SERVER_ADDR = + Attributes.Key.create(HAProxyConstants.PROXY_PROTOCOL_SERVER_ADDR); + + public static final Attributes.Key<String> PROXY_PROTOCOL_SERVER_PORT = + Attributes.Key.create(HAProxyConstants.PROXY_PROTOCOL_SERVER_PORT); + + private static final Map<String, Attributes.Key<String>> ATTRIBUTES_KEY_MAP = new ConcurrentHashMap<>(); + + public static Attributes.Key<String> valueOf(String name) { + return ATTRIBUTES_KEY_MAP.computeIfAbsent(name, key -> Attributes.Key.create(name)); + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java index 1cbb00361..13893e5ed 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java @@ -18,11 +18,16 @@ package org.apache.rocketmq.proxy.grpc.interceptor; import com.google.common.net.HostAndPort; +import io.grpc.Attributes; import io.grpc.Grpc; import io.grpc.Metadata; import io.grpc.ServerCall; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.constant.HAProxyConstants; +import org.apache.rocketmq.proxy.grpc.constant.AttributeKeys; + import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -33,13 +38,27 @@ public class HeaderInterceptor implements ServerInterceptor { Metadata headers, ServerCallHandler<R, W> next ) { - SocketAddress remoteSocketAddress = call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); - String remoteAddress = parseSocketAddress(remoteSocketAddress); + String remoteAddress = getProxyProtocolAddress(call.getAttributes()); + if (StringUtils.isBlank(remoteAddress)) { + SocketAddress remoteSocketAddress = call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); + remoteAddress = parseSocketAddress(remoteSocketAddress); + } headers.put(InterceptorConstants.REMOTE_ADDRESS, remoteAddress); SocketAddress localSocketAddress = call.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR); String localAddress = parseSocketAddress(localSocketAddress); headers.put(InterceptorConstants.LOCAL_ADDRESS, localAddress); + + for (Attributes.Key<?> key : call.getAttributes().keys()) { + if (!StringUtils.startsWith(key.toString(), HAProxyConstants.PROXY_PROTOCOL_PREFIX)) { + continue; + } + Metadata.Key<String> headerKey + = Metadata.Key.of(key.toString(), Metadata.ASCII_STRING_MARSHALLER); + String headerValue = String.valueOf(call.getAttributes().get(key)); + headers.put(headerKey, headerValue); + } + return next.startCall(call, headers); } @@ -55,4 +74,13 @@ public class HeaderInterceptor implements ServerInterceptor { return ""; } + + private String getProxyProtocolAddress(Attributes attributes) { + String proxyProtocolAddr = attributes.get(AttributeKeys.PROXY_PROTOCOL_ADDR); + String proxyProtocolPort = attributes.get(AttributeKeys.PROXY_PROTOCOL_PORT); + if (StringUtils.isBlank(proxyProtocolAddr) || StringUtils.isBlank(proxyProtocolPort)) { + return null; + } + return proxyProtocolAddr + ":" + proxyProtocolPort; + } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java index 1142132b7..858b1f022 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java @@ -20,8 +20,6 @@ package org.apache.rocketmq.proxy.remoting; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.timeout.IdleStateHandler; -import java.io.IOException; -import java.security.cert.CertificateException; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; @@ -36,6 +34,9 @@ import org.apache.rocketmq.remoting.netty.NettyRemotingServer; import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.remoting.netty.TlsSystemConfig; +import java.io.IOException; +import java.security.cert.CertificateException; + /** * support remoting and http2 protocol at one port */ diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java index 75e25a83a..d0750b678 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java @@ -21,14 +21,8 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.util.Attribute; import io.netty.util.AttributeKey; -import java.io.IOException; -import java.lang.reflect.Field; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; -import java.util.HashMap; -import java.util.Map; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.constant.HAProxyConstants; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.utils.NetworkUtil; import org.apache.rocketmq.logging.org.slf4j.Logger; @@ -43,6 +37,15 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RequestCode; import org.apache.rocketmq.remoting.protocol.ResponseCode; +import java.io.IOException; +import java.lang.reflect.Field; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.HashMap; +import java.util.Map; + public class RemotingHelper { public static final String DEFAULT_CHARSET = "UTF-8"; public static final String DEFAULT_CIDR_ALL = "0.0.0.0/0"; @@ -50,6 +53,9 @@ public class RemotingHelper { private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME); private static final AttributeKey<String> REMOTE_ADDR_KEY = AttributeKey.valueOf("RemoteAddr"); + private static final AttributeKey<String> PROXY_PROTOCOL_ADDR = AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_ADDR); + private static final AttributeKey<String> PROXY_PROTOCOL_PORT = AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_PORT); + public static final AttributeKey<String> CLIENT_ID_KEY = AttributeKey.valueOf("ClientId"); public static final AttributeKey<Integer> VERSION_KEY = AttributeKey.valueOf("Version"); @@ -203,12 +209,16 @@ public class RemotingHelper { if (null == channel) { return ""; } + String addr = getProxyProtocolAddress(channel); + if (StringUtils.isNotBlank(addr)) { + return addr; + } Attribute<String> att = channel.attr(REMOTE_ADDR_KEY); if (att == null) { // mocked in unit test return parseChannelRemoteAddr0(channel); } - String addr = att.get(); + addr = att.get(); if (addr == null) { addr = parseChannelRemoteAddr0(channel); att.set(addr); @@ -216,6 +226,18 @@ public class RemotingHelper { return addr; } + private static String getProxyProtocolAddress(Channel channel) { + if (!channel.hasAttr(PROXY_PROTOCOL_ADDR)) { + return null; + } + String proxyProtocolAddr = getAttributeValue(PROXY_PROTOCOL_ADDR, channel); + String proxyProtocolPort = getAttributeValue(PROXY_PROTOCOL_PORT, channel); + if (StringUtils.isBlank(proxyProtocolAddr) || proxyProtocolPort == null) { + return null; + } + return proxyProtocolAddr + ":" + proxyProtocolPort; + } + private static String parseChannelRemoteAddr0(final Channel channel) { SocketAddress remote = channel.remoteAddress(); final String addr = remote != null ? remote.toString() : ""; @@ -255,7 +277,7 @@ public class RemotingHelper { return ""; } - public static int parseSocketAddressPort(SocketAddress socketAddress) { + public static Integer parseSocketAddressPort(SocketAddress socketAddress) { if (socketAddress instanceof InetSocketAddress) { return ((InetSocketAddress) socketAddress).getPort(); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java new file mode 100644 index 000000000..4e69ab82d --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting.netty; + + +import io.netty.util.AttributeKey; +import org.apache.rocketmq.common.constant.HAProxyConstants; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class AttributeKeys { + + public static final AttributeKey<String> PROXY_PROTOCOL_ADDR = + AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_ADDR); + + public static final AttributeKey<String> PROXY_PROTOCOL_PORT = + AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_PORT); + + public static final AttributeKey<String> PROXY_PROTOCOL_SERVER_ADDR = + AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_SERVER_ADDR); + + public static final AttributeKey<String> PROXY_PROTOCOL_SERVER_PORT = + AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_SERVER_PORT); + + private static final Map<String, AttributeKey<String>> ATTRIBUTE_KEY_MAP = new ConcurrentHashMap<>(); + + public static AttributeKey<String> valueOf(String name) { + return ATTRIBUTE_KEY_MAP.computeIfAbsent(name, AttributeKeys::valueOf); + } +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java index 9f39d672e..94ffd8d07 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java @@ -24,6 +24,7 @@ import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; @@ -36,27 +37,25 @@ import io.netty.channel.epoll.EpollServerSocketChannel; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.ProtocolDetectionResult; +import io.netty.handler.codec.ProtocolDetectionState; +import io.netty.handler.codec.haproxy.HAProxyMessage; +import io.netty.handler.codec.haproxy.HAProxyMessageDecoder; +import io.netty.handler.codec.haproxy.HAProxyProtocolVersion; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.AttributeKey; +import io.netty.util.CharsetUtil; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.TimerTask; import io.netty.util.concurrent.DefaultEventExecutorGroup; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.security.cert.CertificateException; -import java.util.NoSuchElementException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.constant.HAProxyConstants; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.utils.NetworkUtil; import org.apache.rocketmq.logging.org.slf4j.Logger; @@ -71,6 +70,19 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.security.cert.CertificateException; +import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + @SuppressWarnings("NullableProblems") public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer { private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME); @@ -96,6 +108,9 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti private final ConcurrentMap<Integer/*Port*/, NettyRemotingAbstract> remotingServerTable = new ConcurrentHashMap<>(); public static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler"; + public static final String HA_PROXY_DECODER = "HAProxyDecoder"; + public static final String HA_PROXY_HANDLER = "HAProxyHandler"; + public static final String TLS_MODE_HANDLER = "TlsModeHandler"; public static final String TLS_HANDLER_NAME = "sslHandler"; public static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder"; @@ -387,7 +402,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti } private void prepareSharableHandlers() { - handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode); + handshakeHandler = new HandshakeHandler(); encoder = new NettyEncoder(); connectionManageHandler = new NettyConnectManageHandler(); serverHandler = new NettyServerHandler(); @@ -437,11 +452,51 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti @ChannelHandler.Sharable public class HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf> { + private final TlsModeHandler tlsModeHandler; + + public HandshakeHandler() { + tlsModeHandler = new TlsModeHandler(TlsSystemConfig.tlsMode); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) { + try { + ProtocolDetectionResult<HAProxyProtocolVersion> ha = HAProxyMessageDecoder.detectProtocol(in); + if (ha.state() == ProtocolDetectionState.NEEDS_MORE_DATA) { + return; + } + if (ha.state() == ProtocolDetectionState.DETECTED) { + ctx.pipeline().addAfter(defaultEventExecutorGroup, ctx.name(), HA_PROXY_DECODER, new HAProxyMessageDecoder()) + .addAfter(defaultEventExecutorGroup, HA_PROXY_DECODER, HA_PROXY_HANDLER, new HAProxyMessageHandler()) + .addAfter(defaultEventExecutorGroup, HA_PROXY_HANDLER, TLS_MODE_HANDLER, tlsModeHandler); + } else { + ctx.pipeline().addAfter(defaultEventExecutorGroup, ctx.name(), TLS_MODE_HANDLER, tlsModeHandler); + } + + try { + // Remove this handler + ctx.pipeline().remove(this); + } catch (NoSuchElementException e) { + log.error("Error while removing HandshakeHandler", e); + } + + // Hand over this message to the next . + ctx.fireChannelRead(in.retain()); + } catch (Exception e) { + log.error("process proxy protocol negotiator failed.", e); + throw e; + } + } + } + + @ChannelHandler.Sharable + public class TlsModeHandler extends SimpleChannelInboundHandler<ByteBuf> { + private final TlsMode tlsMode; private static final byte HANDSHAKE_MAGIC_CODE = 0x16; - HandshakeHandler(TlsMode tlsMode) { + TlsModeHandler(TlsMode tlsMode) { this.tlsMode = tlsMode; } @@ -461,7 +516,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti case ENFORCING: if (null != sslContext) { ctx.pipeline() - .addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc())) + .addAfter(defaultEventExecutorGroup, TLS_MODE_HANDLER, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc())) .addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder()); log.info("Handlers prepended to channel pipeline to establish SSL connection"); } else { @@ -483,7 +538,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti // Remove this handler ctx.pipeline().remove(this); } catch (NoSuchElementException e) { - log.error("Error while removing HandshakeHandler", e); + log.error("Error while removing TlsModeHandler", e); } // Hand over this message to the next . @@ -706,4 +761,46 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti return NettyRemotingServer.this.getCallbackExecutor(); } } + + public static class HAProxyMessageHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof HAProxyMessage) { + fillChannelWithMessage((HAProxyMessage) msg, ctx.channel()); + } else { + super.channelRead(ctx, msg); + } + ctx.pipeline().remove(this); + } + + /** + * The definition of key refers to the implementation of nginx + * <a href="https://nginx.org/en/docs/http/ngx_http_core_module.html#var_proxy_protocol_addr">ngx_http_core_module</a> + * @param msg + * @param channel + */ + private void fillChannelWithMessage(HAProxyMessage msg, Channel channel) { + if (StringUtils.isNotBlank(msg.sourceAddress())) { + channel.attr(AttributeKeys.PROXY_PROTOCOL_ADDR).set(msg.sourceAddress()); + } + if (msg.sourcePort() > 0) { + channel.attr(AttributeKeys.PROXY_PROTOCOL_PORT).set(String.valueOf(msg.sourcePort())); + } + if (StringUtils.isNotBlank(msg.destinationAddress())) { + channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR).set(msg.destinationAddress()); + } + if (msg.destinationPort() > 0) { + channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT).set(String.valueOf(msg.destinationPort())); + } + if (CollectionUtils.isNotEmpty(msg.tlvs())) { + msg.tlvs().forEach(tlv -> { + AttributeKey<String> key = AttributeKeys.valueOf( + HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue())); + String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8)); + channel.attr(key).set(value); + }); + } + } + } } diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/ProxyProtocolTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/ProxyProtocolTest.java new file mode 100644 index 000000000..c39fd2132 --- /dev/null +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/ProxyProtocolTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.handler.codec.haproxy.HAProxyCommand; +import io.netty.handler.codec.haproxy.HAProxyMessage; +import io.netty.handler.codec.haproxy.HAProxyMessageEncoder; +import io.netty.handler.codec.haproxy.HAProxyProtocolVersion; +import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol; +import org.apache.rocketmq.common.utils.NetworkUtil; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyRemotingClient; +import org.apache.rocketmq.remoting.protocol.LanguageCode; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.Socket; +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.junit.Assert.assertNotNull; + +@RunWith(MockitoJUnitRunner.class) +public class ProxyProtocolTest { + + private RemotingServer remotingServer; + private RemotingClient remotingClient; + + @Before + public void setUp() throws Exception { + NettyClientConfig clientConfig = new NettyClientConfig(); + clientConfig.setUseTLS(false); + + remotingServer = RemotingServerTest.createRemotingServer(); + remotingClient = RemotingServerTest.createRemotingClient(clientConfig); + + await().pollDelay(Duration.ofMillis(10)) + .pollInterval(Duration.ofMillis(10)) + .atMost(20, TimeUnit.SECONDS).until(() -> isHostConnectable(getServerAddress())); + } + + @Test + public void testProxyProtocol() throws Exception { + sendHAProxyMessage(remotingClient); + requestThenAssertResponse(remotingClient); + } + + private void requestThenAssertResponse(RemotingClient remotingClient) throws Exception { + RemotingCommand response = remotingClient.invokeSync(getServerAddress(), createRequest(), 10000 * 3); + assertNotNull(response); + assertThat(response.getLanguage()).isEqualTo(LanguageCode.JAVA); + assertThat(response.getExtFields()).hasSize(2); + assertThat(response.getExtFields().get("messageTitle")).isEqualTo("Welcome"); + } + + private void sendHAProxyMessage(RemotingClient remotingClient) throws Exception { + Method getAndCreateChannel = NettyRemotingClient.class.getDeclaredMethod("getAndCreateChannel", String.class); + getAndCreateChannel.setAccessible(true); + NettyRemotingClient nettyRemotingClient = (NettyRemotingClient) remotingClient; + Channel channel = (Channel) getAndCreateChannel.invoke(nettyRemotingClient, getServerAddress()); + HAProxyMessage message = new HAProxyMessage(HAProxyProtocolVersion.V2, HAProxyCommand.PROXY, + HAProxyProxiedProtocol.TCP4, "127.0.0.1", "127.0.0.2", 8000, 9000); + + ByteBuf byteBuf = Unpooled.directBuffer(); + Method encode = HAProxyMessageEncoder.class.getDeclaredMethod("encodeV2", HAProxyMessage.class, ByteBuf.class); + encode.setAccessible(true); + encode.invoke(HAProxyMessageEncoder.INSTANCE, message, byteBuf); + channel.writeAndFlush(byteBuf).sync(); + } + + private static RemotingCommand createRequest() { + RequestHeader requestHeader = new RequestHeader(); + requestHeader.setCount(1); + requestHeader.setMessageTitle("Welcome"); + return RemotingCommand.createRequestCommand(0, requestHeader); + } + + + private String getServerAddress() { + return "localhost:" + remotingServer.localListenPort(); + } + + private boolean isHostConnectable(String addr) { + try (Socket socket = new Socket()) { + socket.connect(NetworkUtil.string2SocketAddress(addr)); + return true; + } catch (IOException ignored) { + } + return false; + } +} diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java index 3da7abf57..de7edbbfb 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java @@ -17,19 +17,6 @@ package org.apache.rocketmq.remoting; -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileOutputStream; -import java.io.FileWriter; -import java.io.IOException; -import java.io.InputStream; -import java.io.PrintWriter; -import java.net.Socket; -import java.time.Duration; -import java.util.UUID; -import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.utils.NetworkUtil; import org.apache.rocketmq.remoting.common.TlsMode; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; @@ -47,6 +34,20 @@ import org.junit.rules.TestName; import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileOutputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintWriter; +import java.net.Socket; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_CLIENT_AUTHSERVER; import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_CLIENT_CERTPATH; import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_CLIENT_KEYPASSWORD; @@ -234,6 +235,7 @@ public class TlsTest { @Test public void serverAcceptsUntrustedClientCert() throws Exception { requestThenAssertResponse(); +// Thread.sleep(1000000L); } /** -- 2.32.0.windows.2 From 4f840afcb04f5cc328795896198c6fba96ff37ec Mon Sep 17 00:00:00 2001 From: mxsm <ljbmxsm@gmail.com> Date: Wed, 5 Jul 2023 11:03:52 +0800 Subject: [PATCH 3/5] [ISSUE #6960] Added Slot formatting sketch comments (#6961) --- .../java/org/apache/rocketmq/store/timer/Slot.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/Slot.java b/store/src/main/java/org/apache/rocketmq/store/timer/Slot.java index b91193b94..2da846cee 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/Slot.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/Slot.java @@ -16,9 +16,17 @@ */ package org.apache.rocketmq.store.timer; +/** + * Represents a slot of timing wheel. Format: + * ┌────────────┬───────────┬───────────┬───────────┬───────────┐ + * │delayed time│ first pos │ last pos │ num │ magic │ + * ├────────────┼───────────┼───────────┼───────────┼───────────┤ + * │ 8bytes │ 8bytes │ 8bytes │ 4bytes │ 4bytes │ + * └────────────┴───────────┴───────────┴───────────┴───────────┘ + */ public class Slot { public static final short SIZE = 32; - public final long timeMs; + public final long timeMs; //delayed time public final long firstPos; public final long lastPos; public final int num; -- 2.32.0.windows.2 From 58550f074ec101c0a158ede0df1839950e08837a Mon Sep 17 00:00:00 2001 From: rongtong <jinrongtong5@163.com> Date: Mon, 10 Jul 2023 14:13:18 +0800 Subject: [PATCH 4/5] [ISSUE #7008] Fix the issue of protocol parsing failure when using haproxy and tls together (#7009) --- .../remoting/netty/NettyRemotingServer.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java index 94ffd8d07..445f06cc6 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java @@ -459,13 +459,13 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti } @Override - protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) { + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) { try { - ProtocolDetectionResult<HAProxyProtocolVersion> ha = HAProxyMessageDecoder.detectProtocol(in); - if (ha.state() == ProtocolDetectionState.NEEDS_MORE_DATA) { + ProtocolDetectionResult<HAProxyProtocolVersion> detectionResult = HAProxyMessageDecoder.detectProtocol(byteBuf); + if (detectionResult.state() == ProtocolDetectionState.NEEDS_MORE_DATA) { return; } - if (ha.state() == ProtocolDetectionState.DETECTED) { + if (detectionResult.state() == ProtocolDetectionState.DETECTED) { ctx.pipeline().addAfter(defaultEventExecutorGroup, ctx.name(), HA_PROXY_DECODER, new HAProxyMessageDecoder()) .addAfter(defaultEventExecutorGroup, HA_PROXY_DECODER, HA_PROXY_HANDLER, new HAProxyMessageHandler()) .addAfter(defaultEventExecutorGroup, HA_PROXY_HANDLER, TLS_MODE_HANDLER, tlsModeHandler); @@ -481,7 +481,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti } // Hand over this message to the next . - ctx.fireChannelRead(in.retain()); + ctx.fireChannelRead(byteBuf.retain()); } catch (Exception e) { log.error("process proxy protocol negotiator failed.", e); throw e; @@ -503,8 +503,8 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) { - // Peek the first byte to determine if the content is starting with TLS handshake - byte b = msg.getByte(0); + // Peek the current read index byte to determine if the content is starting with TLS handshake + byte b = msg.getByte(msg.readerIndex()); if (b == HANDSHAKE_MAGIC_CODE) { switch (tlsMode) { -- 2.32.0.windows.2 From 8e6b5e62bd4da78c0a7d265891c52685fcffd08a Mon Sep 17 00:00:00 2001 From: Zhouxiang Zhan <zhouxzhan@apache.org> Date: Mon, 10 Jul 2023 20:14:17 +0800 Subject: [PATCH 5/5] [ISSUE #6999] Add interface ReceiptHandleManager (#7000) * Add interface ReceiptHandleManager * fix unit test * fix --- .../processor/ReceiptHandleProcessor.java | 10 +- .../receipt/DefaultReceiptHandleManager.java | 282 ++++++++++++++++++ .../service/receipt/ReceiptHandleManager.java | 260 +--------------- ...a => DefaultReceiptHandleManagerTest.java} | 34 +-- 4 files changed, 307 insertions(+), 279 deletions(-) create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java rename proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/{ReceiptHandleManagerTest.java => DefaultReceiptHandleManagerTest.java} (93%) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java index 9c7e8dea9..fc49e7622 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java @@ -28,12 +28,12 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.common.RenewEvent; import org.apache.rocketmq.proxy.common.MessageReceiptHandle; import org.apache.rocketmq.proxy.common.ProxyContext; -import org.apache.rocketmq.proxy.service.receipt.ReceiptHandleManager; +import org.apache.rocketmq.proxy.service.receipt.DefaultReceiptHandleManager; import org.apache.rocketmq.proxy.service.ServiceManager; public class ReceiptHandleProcessor extends AbstractProcessor { protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); - protected ReceiptHandleManager receiptHandleManager; + protected DefaultReceiptHandleManager receiptHandleManager; public ReceiptHandleProcessor(MessagingProcessor messagingProcessor, ServiceManager serviceManager) { super(messagingProcessor, serviceManager); @@ -51,7 +51,7 @@ public class ReceiptHandleProcessor extends AbstractProcessor { event.getFuture().complete(v); }); }; - this.receiptHandleManager = new ReceiptHandleManager(serviceManager.getMetadataService(), serviceManager.getConsumerManager(), eventListener); + this.receiptHandleManager = new DefaultReceiptHandleManager(serviceManager.getMetadataService(), serviceManager.getConsumerManager(), eventListener); } protected ProxyContext createContext(String actionName) { @@ -59,11 +59,11 @@ public class ReceiptHandleProcessor extends AbstractProcessor { } public void addReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, MessageReceiptHandle messageReceiptHandle) { - receiptHandleManager.addReceiptHandle(channel, group, msgID, messageReceiptHandle); + receiptHandleManager.addReceiptHandle(ctx, channel, group, msgID, messageReceiptHandle); } public MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, String receiptHandle) { - return receiptHandleManager.removeReceiptHandle(channel, group, msgID, receiptHandle); + return receiptHandleManager.removeReceiptHandle(ctx, channel, group, msgID, receiptHandle); } public static class ReceiptHandleGroupKey { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java new file mode 100644 index 000000000..c7633d658 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.service.receipt; + +import com.google.common.base.Stopwatch; +import io.netty.channel.Channel; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.broker.client.ClientChannelInfo; +import org.apache.rocketmq.broker.client.ConsumerGroupEvent; +import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener; +import org.apache.rocketmq.broker.client.ConsumerManager; +import org.apache.rocketmq.client.consumer.AckResult; +import org.apache.rocketmq.client.consumer.AckStatus; +import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.consumer.ReceiptHandle; +import org.apache.rocketmq.common.state.StateEventListener; +import org.apache.rocketmq.common.thread.ThreadPoolMonitor; +import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; +import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils; +import org.apache.rocketmq.common.utils.StartAndShutdown; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.proxy.common.RenewEvent; +import org.apache.rocketmq.proxy.common.MessageReceiptHandle; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.common.ProxyException; +import org.apache.rocketmq.proxy.common.ProxyExceptionCode; +import org.apache.rocketmq.proxy.common.ReceiptHandleGroup; +import org.apache.rocketmq.proxy.common.RenewStrategyPolicy; +import org.apache.rocketmq.proxy.common.channel.ChannelHelper; +import org.apache.rocketmq.proxy.common.utils.ExceptionUtils; +import org.apache.rocketmq.proxy.config.ConfigurationManager; +import org.apache.rocketmq.proxy.config.ProxyConfig; +import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor; +import org.apache.rocketmq.proxy.service.metadata.MetadataService; +import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; + +public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implements ReceiptHandleManager { + protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); + protected final MetadataService metadataService; + protected final ConsumerManager consumerManager; + protected final ConcurrentMap<ReceiptHandleProcessor.ReceiptHandleGroupKey, ReceiptHandleGroup> receiptHandleGroupMap; + protected final StateEventListener<RenewEvent> eventListener; + protected final static RetryPolicy RENEW_POLICY = new RenewStrategyPolicy(); + protected final ScheduledExecutorService scheduledExecutorService = + Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RenewalScheduledThread_")); + protected final ThreadPoolExecutor renewalWorkerService; + + public DefaultReceiptHandleManager(MetadataService metadataService, ConsumerManager consumerManager, StateEventListener<RenewEvent> eventListener) { + this.metadataService = metadataService; + this.consumerManager = consumerManager; + this.eventListener = eventListener; + ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); + this.renewalWorkerService = ThreadPoolMonitor.createAndMonitor( + proxyConfig.getRenewThreadPoolNums(), + proxyConfig.getRenewMaxThreadPoolNums(), + 1, TimeUnit.MINUTES, + "RenewalWorkerThread", + proxyConfig.getRenewThreadPoolQueueCapacity() + ); + consumerManager.appendConsumerIdsChangeListener(new ConsumerIdsChangeListener() { + @Override + public void handle(ConsumerGroupEvent event, String group, Object... args) { + if (ConsumerGroupEvent.CLIENT_UNREGISTER.equals(event)) { + if (args == null || args.length < 1) { + return; + } + if (args[0] instanceof ClientChannelInfo) { + ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0]; + if (ChannelHelper.isRemote(clientChannelInfo.getChannel())) { + // if the channel sync from other proxy is expired, not to clear data of connect to current proxy + return; + } + clearGroup(new ReceiptHandleProcessor.ReceiptHandleGroupKey(clientChannelInfo.getChannel(), group)); + log.info("clear handle of this client when client unregister. group:{}, clientChannelInfo:{}", group, clientChannelInfo); + } + } + } + + @Override + public void shutdown() { + + } + }); + this.receiptHandleGroupMap = new ConcurrentHashMap<>(); + this.renewalWorkerService.setRejectedExecutionHandler((r, executor) -> log.warn("add renew task failed. queueSize:{}", executor.getQueue().size())); + this.appendStartAndShutdown(new StartAndShutdown() { + @Override + public void start() throws Exception { + scheduledExecutorService.scheduleWithFixedDelay(() -> scheduleRenewTask(), 0, + ConfigurationManager.getProxyConfig().getRenewSchedulePeriodMillis(), TimeUnit.MILLISECONDS); + } + + @Override + public void shutdown() throws Exception { + scheduledExecutorService.shutdown(); + clearAllHandle(); + } + }); + } + + public void addReceiptHandle(ProxyContext context, Channel channel, String group, String msgID, MessageReceiptHandle messageReceiptHandle) { + ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group), + k -> new ReceiptHandleGroup()).put(msgID, messageReceiptHandle); + } + + public MessageReceiptHandle removeReceiptHandle(ProxyContext context, Channel channel, String group, String msgID, String receiptHandle) { + ReceiptHandleGroup handleGroup = receiptHandleGroupMap.get(new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group)); + if (handleGroup == null) { + return null; + } + return handleGroup.remove(msgID, receiptHandle); + } + + protected boolean clientIsOffline(ReceiptHandleProcessor.ReceiptHandleGroupKey groupKey) { + return this.consumerManager.findChannel(groupKey.getGroup(), groupKey.getChannel()) == null; + } + + protected void scheduleRenewTask() { + Stopwatch stopwatch = Stopwatch.createStarted(); + try { + ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); + for (Map.Entry<ReceiptHandleProcessor.ReceiptHandleGroupKey, ReceiptHandleGroup> entry : receiptHandleGroupMap.entrySet()) { + ReceiptHandleProcessor.ReceiptHandleGroupKey key = entry.getKey(); + if (clientIsOffline(key)) { + clearGroup(key); + continue; + } + + ReceiptHandleGroup group = entry.getValue(); + group.scan((msgID, handleStr, v) -> { + long current = System.currentTimeMillis(); + ReceiptHandle handle = ReceiptHandle.decode(v.getReceiptHandleStr()); + if (handle.getNextVisibleTime() - current > proxyConfig.getRenewAheadTimeMillis()) { + return; + } + renewalWorkerService.submit(() -> renewMessage(group, msgID, handleStr)); + }); + } + } catch (Exception e) { + log.error("unexpect error when schedule renew task", e); + } + + log.debug("scan for renewal done. cost:{}ms", stopwatch.elapsed().toMillis()); + } + + protected void renewMessage(ReceiptHandleGroup group, String msgID, String handleStr) { + try { + group.computeIfPresent(msgID, handleStr, this::startRenewMessage); + } catch (Exception e) { + log.error("error when renew message. msgID:{}, handleStr:{}", msgID, handleStr, e); + } + } + + protected CompletableFuture<MessageReceiptHandle> startRenewMessage(MessageReceiptHandle messageReceiptHandle) { + CompletableFuture<MessageReceiptHandle> resFuture = new CompletableFuture<>(); + ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); + long current = System.currentTimeMillis(); + try { + if (messageReceiptHandle.getRenewRetryTimes() >= proxyConfig.getMaxRenewRetryTimes()) { + log.warn("handle has exceed max renewRetryTimes. handle:{}", messageReceiptHandle); + return CompletableFuture.completedFuture(null); + } + if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) { + CompletableFuture<AckResult> future = new CompletableFuture<>(); + eventListener.fireEvent(new RenewEvent(messageReceiptHandle, RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), future)); + future.whenComplete((ackResult, throwable) -> { + if (throwable != null) { + log.error("error when renew. handle:{}", messageReceiptHandle, throwable); + if (renewExceptionNeedRetry(throwable)) { + messageReceiptHandle.incrementAndGetRenewRetryTimes(); + resFuture.complete(messageReceiptHandle); + } else { + resFuture.complete(null); + } + } else if (AckStatus.OK.equals(ackResult.getStatus())) { + messageReceiptHandle.updateReceiptHandle(ackResult.getExtraInfo()); + messageReceiptHandle.resetRenewRetryTimes(); + messageReceiptHandle.incrementRenewTimes(); + resFuture.complete(messageReceiptHandle); + } else { + log.error("renew response is not ok. result:{}, handle:{}", ackResult, messageReceiptHandle); + resFuture.complete(null); + } + }); + } else { + ProxyContext context = createContext("RenewMessage"); + SubscriptionGroupConfig subscriptionGroupConfig = + metadataService.getSubscriptionGroupConfig(context, messageReceiptHandle.getGroup()); + if (subscriptionGroupConfig == null) { + log.error("group's subscriptionGroupConfig is null when renew. handle: {}", messageReceiptHandle); + return CompletableFuture.completedFuture(null); + } + RetryPolicy retryPolicy = subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy(); + CompletableFuture<AckResult> future = new CompletableFuture<>(); + eventListener.fireEvent(new RenewEvent(messageReceiptHandle, retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), future)); + future.whenComplete((ackResult, throwable) -> { + if (throwable != null) { + log.error("error when nack in renew. handle:{}", messageReceiptHandle, throwable); + } + resFuture.complete(null); + }); + } + } catch (Throwable t) { + log.error("unexpect error when renew message, stop to renew it. handle:{}", messageReceiptHandle, t); + resFuture.complete(null); + } + return resFuture; + } + + protected void clearGroup(ReceiptHandleProcessor.ReceiptHandleGroupKey key) { + if (key == null) { + return; + } + ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); + ReceiptHandleGroup handleGroup = receiptHandleGroupMap.remove(key); + if (handleGroup == null) { + return; + } + handleGroup.scan((msgID, handle, v) -> { + try { + handleGroup.computeIfPresent(msgID, handle, messageReceiptHandle -> { + CompletableFuture<AckResult> future = new CompletableFuture<>(); + eventListener.fireEvent(new RenewEvent(messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), future)); + return CompletableFuture.completedFuture(null); + }); + } catch (Exception e) { + log.error("error when clear handle for group. key:{}", key, e); + } + }); + } + + protected void clearAllHandle() { + log.info("start clear all handle in receiptHandleProcessor"); + Set<ReceiptHandleProcessor.ReceiptHandleGroupKey> keySet = receiptHandleGroupMap.keySet(); + for (ReceiptHandleProcessor.ReceiptHandleGroupKey key : keySet) { + clearGroup(key); + } + log.info("clear all handle in receiptHandleProcessor done"); + } + + protected boolean renewExceptionNeedRetry(Throwable t) { + t = ExceptionUtils.getRealException(t); + if (t instanceof ProxyException) { + ProxyException proxyException = (ProxyException) t; + if (ProxyExceptionCode.INVALID_BROKER_NAME.equals(proxyException.getCode()) || + ProxyExceptionCode.INVALID_RECEIPT_HANDLE.equals(proxyException.getCode())) { + return false; + } + } + return true; + } + + protected ProxyContext createContext(String actionName) { + return ProxyContext.createForInner(this.getClass().getSimpleName() + actionName); + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java index f3b805624..6a8888e97 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java @@ -17,266 +17,12 @@ package org.apache.rocketmq.proxy.service.receipt; -import com.google.common.base.Stopwatch; import io.netty.channel.Channel; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import org.apache.rocketmq.broker.client.ClientChannelInfo; -import org.apache.rocketmq.broker.client.ConsumerGroupEvent; -import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener; -import org.apache.rocketmq.broker.client.ConsumerManager; -import org.apache.rocketmq.client.consumer.AckResult; -import org.apache.rocketmq.client.consumer.AckStatus; -import org.apache.rocketmq.common.ThreadFactoryImpl; -import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.common.consumer.ReceiptHandle; -import org.apache.rocketmq.common.state.StateEventListener; -import org.apache.rocketmq.common.thread.ThreadPoolMonitor; -import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; -import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils; -import org.apache.rocketmq.common.utils.StartAndShutdown; -import org.apache.rocketmq.logging.org.slf4j.Logger; -import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; -import org.apache.rocketmq.proxy.common.RenewEvent; import org.apache.rocketmq.proxy.common.MessageReceiptHandle; import org.apache.rocketmq.proxy.common.ProxyContext; -import org.apache.rocketmq.proxy.common.ProxyException; -import org.apache.rocketmq.proxy.common.ProxyExceptionCode; -import org.apache.rocketmq.proxy.common.ReceiptHandleGroup; -import org.apache.rocketmq.proxy.common.RenewStrategyPolicy; -import org.apache.rocketmq.proxy.common.channel.ChannelHelper; -import org.apache.rocketmq.proxy.common.utils.ExceptionUtils; -import org.apache.rocketmq.proxy.config.ConfigurationManager; -import org.apache.rocketmq.proxy.config.ProxyConfig; -import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor; -import org.apache.rocketmq.proxy.service.metadata.MetadataService; -import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy; -import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; -public class ReceiptHandleManager extends AbstractStartAndShutdown { - protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); - protected final MetadataService metadataService; - protected final ConsumerManager consumerManager; - protected final ConcurrentMap<ReceiptHandleProcessor.ReceiptHandleGroupKey, ReceiptHandleGroup> receiptHandleGroupMap; - protected final StateEventListener<RenewEvent> eventListener; - protected final static RetryPolicy RENEW_POLICY = new RenewStrategyPolicy(); - protected final ScheduledExecutorService scheduledExecutorService = - Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RenewalScheduledThread_")); - protected final ThreadPoolExecutor renewalWorkerService; +public interface ReceiptHandleManager { + void addReceiptHandle(ProxyContext context, Channel channel, String group, String msgID, MessageReceiptHandle messageReceiptHandle); - public ReceiptHandleManager(MetadataService metadataService, ConsumerManager consumerManager, StateEventListener<RenewEvent> eventListener) { - this.metadataService = metadataService; - this.consumerManager = consumerManager; - this.eventListener = eventListener; - ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); - this.renewalWorkerService = ThreadPoolMonitor.createAndMonitor( - proxyConfig.getRenewThreadPoolNums(), - proxyConfig.getRenewMaxThreadPoolNums(), - 1, TimeUnit.MINUTES, - "RenewalWorkerThread", - proxyConfig.getRenewThreadPoolQueueCapacity() - ); - consumerManager.appendConsumerIdsChangeListener(new ConsumerIdsChangeListener() { - @Override - public void handle(ConsumerGroupEvent event, String group, Object... args) { - if (ConsumerGroupEvent.CLIENT_UNREGISTER.equals(event)) { - if (args == null || args.length < 1) { - return; - } - if (args[0] instanceof ClientChannelInfo) { - ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0]; - if (ChannelHelper.isRemote(clientChannelInfo.getChannel())) { - // if the channel sync from other proxy is expired, not to clear data of connect to current proxy - return; - } - clearGroup(new ReceiptHandleProcessor.ReceiptHandleGroupKey(clientChannelInfo.getChannel(), group)); - log.info("clear handle of this client when client unregister. group:{}, clientChannelInfo:{}", group, clientChannelInfo); - } - } - } - - @Override - public void shutdown() { - - } - }); - this.receiptHandleGroupMap = new ConcurrentHashMap<>(); - this.renewalWorkerService.setRejectedExecutionHandler((r, executor) -> log.warn("add renew task failed. queueSize:{}", executor.getQueue().size())); - this.appendStartAndShutdown(new StartAndShutdown() { - @Override - public void start() throws Exception { - scheduledExecutorService.scheduleWithFixedDelay(() -> scheduleRenewTask(), 0, - ConfigurationManager.getProxyConfig().getRenewSchedulePeriodMillis(), TimeUnit.MILLISECONDS); - } - - @Override - public void shutdown() throws Exception { - scheduledExecutorService.shutdown(); - clearAllHandle(); - } - }); - } - - public void addReceiptHandle(Channel channel, String group, String msgID, MessageReceiptHandle messageReceiptHandle) { - ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group), - k -> new ReceiptHandleGroup()).put(msgID, messageReceiptHandle); - } - - public MessageReceiptHandle removeReceiptHandle(Channel channel, String group, String msgID, String receiptHandle) { - ReceiptHandleGroup handleGroup = receiptHandleGroupMap.get(new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group)); - if (handleGroup == null) { - return null; - } - return handleGroup.remove(msgID, receiptHandle); - } - - protected boolean clientIsOffline(ReceiptHandleProcessor.ReceiptHandleGroupKey groupKey) { - return this.consumerManager.findChannel(groupKey.getGroup(), groupKey.getChannel()) == null; - } - - public void scheduleRenewTask() { - Stopwatch stopwatch = Stopwatch.createStarted(); - try { - ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); - for (Map.Entry<ReceiptHandleProcessor.ReceiptHandleGroupKey, ReceiptHandleGroup> entry : receiptHandleGroupMap.entrySet()) { - ReceiptHandleProcessor.ReceiptHandleGroupKey key = entry.getKey(); - if (clientIsOffline(key)) { - clearGroup(key); - continue; - } - - ReceiptHandleGroup group = entry.getValue(); - group.scan((msgID, handleStr, v) -> { - long current = System.currentTimeMillis(); - ReceiptHandle handle = ReceiptHandle.decode(v.getReceiptHandleStr()); - if (handle.getNextVisibleTime() - current > proxyConfig.getRenewAheadTimeMillis()) { - return; - } - renewalWorkerService.submit(() -> renewMessage(group, msgID, handleStr)); - }); - } - } catch (Exception e) { - log.error("unexpect error when schedule renew task", e); - } - - log.debug("scan for renewal done. cost:{}ms", stopwatch.elapsed().toMillis()); - } - - protected void renewMessage(ReceiptHandleGroup group, String msgID, String handleStr) { - try { - group.computeIfPresent(msgID, handleStr, this::startRenewMessage); - } catch (Exception e) { - log.error("error when renew message. msgID:{}, handleStr:{}", msgID, handleStr, e); - } - } - - protected CompletableFuture<MessageReceiptHandle> startRenewMessage(MessageReceiptHandle messageReceiptHandle) { - CompletableFuture<MessageReceiptHandle> resFuture = new CompletableFuture<>(); - ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); - long current = System.currentTimeMillis(); - try { - if (messageReceiptHandle.getRenewRetryTimes() >= proxyConfig.getMaxRenewRetryTimes()) { - log.warn("handle has exceed max renewRetryTimes. handle:{}", messageReceiptHandle); - return CompletableFuture.completedFuture(null); - } - if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) { - CompletableFuture<AckResult> future = new CompletableFuture<>(); - eventListener.fireEvent(new RenewEvent(messageReceiptHandle, RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), future)); - future.whenComplete((ackResult, throwable) -> { - if (throwable != null) { - log.error("error when renew. handle:{}", messageReceiptHandle, throwable); - if (renewExceptionNeedRetry(throwable)) { - messageReceiptHandle.incrementAndGetRenewRetryTimes(); - resFuture.complete(messageReceiptHandle); - } else { - resFuture.complete(null); - } - } else if (AckStatus.OK.equals(ackResult.getStatus())) { - messageReceiptHandle.updateReceiptHandle(ackResult.getExtraInfo()); - messageReceiptHandle.resetRenewRetryTimes(); - messageReceiptHandle.incrementRenewTimes(); - resFuture.complete(messageReceiptHandle); - } else { - log.error("renew response is not ok. result:{}, handle:{}", ackResult, messageReceiptHandle); - resFuture.complete(null); - } - }); - } else { - ProxyContext context = createContext("RenewMessage"); - SubscriptionGroupConfig subscriptionGroupConfig = - metadataService.getSubscriptionGroupConfig(context, messageReceiptHandle.getGroup()); - if (subscriptionGroupConfig == null) { - log.error("group's subscriptionGroupConfig is null when renew. handle: {}", messageReceiptHandle); - return CompletableFuture.completedFuture(null); - } - RetryPolicy retryPolicy = subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy(); - CompletableFuture<AckResult> future = new CompletableFuture<>(); - eventListener.fireEvent(new RenewEvent(messageReceiptHandle, retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), future)); - future.whenComplete((ackResult, throwable) -> { - if (throwable != null) { - log.error("error when nack in renew. handle:{}", messageReceiptHandle, throwable); - } - resFuture.complete(null); - }); - } - } catch (Throwable t) { - log.error("unexpect error when renew message, stop to renew it. handle:{}", messageReceiptHandle, t); - resFuture.complete(null); - } - return resFuture; - } - - protected void clearGroup(ReceiptHandleProcessor.ReceiptHandleGroupKey key) { - if (key == null) { - return; - } - ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); - ReceiptHandleGroup handleGroup = receiptHandleGroupMap.remove(key); - if (handleGroup == null) { - return; - } - handleGroup.scan((msgID, handle, v) -> { - try { - handleGroup.computeIfPresent(msgID, handle, messageReceiptHandle -> { - CompletableFuture<AckResult> future = new CompletableFuture<>(); - eventListener.fireEvent(new RenewEvent(messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), future)); - return CompletableFuture.completedFuture(null); - }); - } catch (Exception e) { - log.error("error when clear handle for group. key:{}", key, e); - } - }); - } - - public void clearAllHandle() { - log.info("start clear all handle in receiptHandleProcessor"); - Set<ReceiptHandleProcessor.ReceiptHandleGroupKey> keySet = receiptHandleGroupMap.keySet(); - for (ReceiptHandleProcessor.ReceiptHandleGroupKey key : keySet) { - clearGroup(key); - } - log.info("clear all handle in receiptHandleProcessor done"); - } - - protected boolean renewExceptionNeedRetry(Throwable t) { - t = ExceptionUtils.getRealException(t); - if (t instanceof ProxyException) { - ProxyException proxyException = (ProxyException) t; - if (ProxyExceptionCode.INVALID_BROKER_NAME.equals(proxyException.getCode()) || - ProxyExceptionCode.INVALID_RECEIPT_HANDLE.equals(proxyException.getCode())) { - return false; - } - } - return true; - } - - protected ProxyContext createContext(String actionName) { - return ProxyContext.createForInner(this.getClass().getSimpleName() + actionName); - } + MessageReceiptHandle removeReceiptHandle(ProxyContext context, Channel channel, String group, String msgID, String receiptHandle); } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManagerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java similarity index 93% rename from proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManagerTest.java rename to proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java index 877c9fd6f..7c6943e44 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManagerTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java @@ -62,8 +62,8 @@ import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -public class ReceiptHandleManagerTest extends BaseServiceTest { - private ReceiptHandleManager receiptHandleManager; +public class DefaultReceiptHandleManagerTest extends BaseServiceTest { + private DefaultReceiptHandleManager receiptHandleManager; @Mock protected MessagingProcessor messagingProcessor; @Mock @@ -87,7 +87,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest { @Before public void setup() { - receiptHandleManager = new ReceiptHandleManager(metadataService, consumerManager, new StateEventListener<RenewEvent>() { + receiptHandleManager = new DefaultReceiptHandleManager(metadataService, consumerManager, new StateEventListener<RenewEvent>() { @Override public void fireEvent(RenewEvent event) { MessageReceiptHandle messageReceiptHandle = event.getMessageReceiptHandle(); @@ -125,7 +125,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest { @Test public void testAddReceiptHandle() { Channel channel = new LocalChannel(); - receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig()); Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); receiptHandleManager.scheduleRenewTask(); @@ -152,9 +152,9 @@ public class ReceiptHandleManagerTest extends BaseServiceTest { .build().encode(); MessageReceiptHandle messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, receiptHandle, MESSAGE_ID, OFFSET, RECONSUME_TIMES); - receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); } - receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig()); Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); receiptHandleManager.scheduleRenewTask(); @@ -170,7 +170,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest { public void testRenewReceiptHandle() { ProxyConfig config = ConfigurationManager.getProxyConfig(); Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); - receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); @@ -216,7 +216,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest { public void testRenewExceedMaxRenewTimes() { Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); - receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>(); ackResultFuture.completeExceptionally(new MQClientException(0, "error")); @@ -246,7 +246,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest { public void testRenewWithInvalidHandle() { Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); - receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>(); ackResultFuture.completeExceptionally(new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "error")); @@ -270,7 +270,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest { ProxyConfig config = ConfigurationManager.getProxyConfig(); Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); - receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); AtomicInteger count = new AtomicInteger(0); List<CompletableFuture<AckResult>> futureList = new ArrayList<>(); @@ -348,7 +348,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest { messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET, RECONSUME_TIMES); Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); - receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); @@ -382,7 +382,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest { messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET, RECONSUME_TIMES); Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); - receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(null); Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyLong())) @@ -418,7 +418,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest { messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET, RECONSUME_TIMES); Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); - receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); @@ -431,8 +431,8 @@ public class ReceiptHandleManagerTest extends BaseServiceTest { @Test public void testRemoveReceiptHandle() { Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); - receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); - receiptHandleManager.removeReceiptHandle(channel, GROUP, MSG_ID, receiptHandle); + receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.removeReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, receiptHandle); SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); receiptHandleManager.scheduleRenewTask(); @@ -444,7 +444,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest { @Test public void testClearGroup() { Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); - receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); receiptHandleManager.clearGroup(new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, GROUP)); SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); @@ -459,7 +459,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest { ArgumentCaptor<ConsumerIdsChangeListener> listenerArgumentCaptor = ArgumentCaptor.forClass(ConsumerIdsChangeListener.class); Mockito.verify(consumerManager, Mockito.times(1)).appendConsumerIdsChangeListener(listenerArgumentCaptor.capture()); Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); - receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); listenerArgumentCaptor.getValue().handle(ConsumerGroupEvent.CLIENT_UNREGISTER, GROUP, new ClientChannelInfo(channel, "", LanguageCode.JAVA, 0)); assertTrue(receiptHandleManager.receiptHandleGroupMap.isEmpty()); } -- 2.32.0.windows.2
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.
浙ICP备2022010568号-2