From 469d59449a23d05a2eeb29e106b65ad532b740b6 Mon Sep 17 00:00:00 2001 From: xiaoma20082008 <10076393+xiaoma20082008@users.noreply.github.com> Date: Wed, 23 Oct 2024 01:15:21 +0800 Subject: [PATCH 1/4] refactor(proxy): refactor proxy server --- .../apache/rocketmq/proxy/ProxyStartup.java | 161 ++--------------- .../rocketmq/proxy/grpc/GrpcServer.java | 11 +- .../proxy/grpc/GrpcServerBuilder.java | 105 ++++++----- .../proxy/mixed/MixedProxyServer.java | 71 ++++++++ .../proxy/mixed/MixedProxyServerBuilder.java | 47 +++++ .../remoting/RemotingProtocolServer.java | 29 +-- .../RemotingProtocolServerBuilder.java | 30 ++++ .../rocketmq/proxy/spi/ProxyServer.java | 32 ++++ .../rocketmq/proxy/spi/ProxyServerBase.java | 61 +++++++ .../proxy/spi/ProxyServerFactory.java | 31 ++++ .../proxy/spi/ProxyServerFactoryBase.java | 51 ++++++ .../proxy/spi/ProxyServerInitializer.java | 170 ++++++++++++++++++ ...ache.rocketmq.proxy.spi.ProxyServerFactory | 1 + ...ache.rocketmq.proxy.spi.ProxyServerFactory | 3 + .../rocketmq/proxy/ProxyStartupTest.java | 156 +--------------- 15 files changed, 585 insertions(+), 374 deletions(-) create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/mixed/MixedProxyServer.java create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/mixed/MixedProxyServerBuilder.java create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServerBuilder.java create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServer.java create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerBase.java create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerFactory.java create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerFactoryBase.java create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerInitializer.java create mode 100644 proxy/src/main/resources/META-INF/service/org.apache.rocketmq.proxy.spi.ProxyServerFactory create mode 100644 proxy/src/main/resources/META-INF/services/org.apache.rocketmq.proxy.spi.ProxyServerFactory diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java index 3b2ca99bfd0..07afe8a26db 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java @@ -17,43 +17,27 @@ package org.apache.rocketmq.proxy; -import com.google.common.collect.Lists; -import io.grpc.protobuf.services.ChannelzService; -import io.grpc.protobuf.services.ProtoReflectionService; -import java.util.Date; -import java.util.List; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; -import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.acl.AccessValidator; import org.apache.rocketmq.acl.plain.PlainAccessValidator; -import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.broker.BrokerStartup; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.common.thread.ThreadPoolMonitor; +import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; import org.apache.rocketmq.common.utils.ServiceProvider; +import org.apache.rocketmq.common.utils.StartAndShutdown; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; -import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; -import org.apache.rocketmq.common.utils.StartAndShutdown; -import org.apache.rocketmq.proxy.config.Configuration; -import org.apache.rocketmq.proxy.config.ConfigurationManager; -import org.apache.rocketmq.proxy.config.ProxyConfig; -import org.apache.rocketmq.proxy.grpc.GrpcServer; -import org.apache.rocketmq.proxy.grpc.GrpcServerBuilder; -import org.apache.rocketmq.proxy.grpc.v2.GrpcMessagingApplication; -import org.apache.rocketmq.proxy.metrics.ProxyMetricsManager; -import org.apache.rocketmq.proxy.processor.DefaultMessagingProcessor; -import org.apache.rocketmq.proxy.processor.MessagingProcessor; -import org.apache.rocketmq.proxy.remoting.RemotingProtocolServer; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.proxy.spi.ProxyServer; +import org.apache.rocketmq.proxy.spi.ProxyServerFactory; +import org.apache.rocketmq.proxy.spi.ProxyServerInitializer; import org.apache.rocketmq.srvutil.ServerUtil; +import java.util.Date; +import java.util.List; + public class ProxyStartup { private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); private static final ProxyStartAndShutdown PROXY_START_AND_SHUTDOWN = new ProxyStartAndShutdown(); @@ -69,28 +53,14 @@ public static void main(String[] args) { try { // parse argument from command line CommandLineArgument commandLineArgument = parseCommandLineArgument(args); - initConfiguration(commandLineArgument); - // init thread pool monitor for proxy. - initThreadPoolMonitor(); + ProxyServerFactory factory = ServiceProvider.loadClass(ProxyServerFactory.class); + ProxyServer server = factory + .withInitializer(new ProxyServerInitializer(commandLineArgument)) + .withAccessValidators(loadAccessValidators()) + .get(); - ThreadPoolExecutor executor = createServerExecutor(); - - MessagingProcessor messagingProcessor = createMessagingProcessor(); - - List accessValidators = loadAccessValidators(); - // create grpcServer - GrpcServer grpcServer = GrpcServerBuilder.newBuilder(executor, ConfigurationManager.getProxyConfig().getGrpcServerPort()) - .addService(createServiceProcessor(messagingProcessor)) - .addService(ChannelzService.newInstance(100)) - .addService(ProtoReflectionService.newInstance()) - .configInterceptor(accessValidators) - .shutdownTime(ConfigurationManager.getProxyConfig().getGrpcShutdownTimeSeconds(), TimeUnit.SECONDS) - .build(); - PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(grpcServer); - - RemotingProtocolServer remotingServer = new RemotingProtocolServer(messagingProcessor, accessValidators); - PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(remotingServer); + server.getStartAndShutdowns().forEach(PROXY_START_AND_SHUTDOWN::appendStartAndShutdown); // start servers one by one. PROXY_START_AND_SHUTDOWN.start(); @@ -123,20 +93,9 @@ protected static List loadAccessValidators() { return accessValidators; } - protected static void initConfiguration(CommandLineArgument commandLineArgument) throws Exception { - if (StringUtils.isNotBlank(commandLineArgument.getProxyConfigPath())) { - System.setProperty(Configuration.CONFIG_PATH_PROPERTY, commandLineArgument.getProxyConfigPath()); - } - ConfigurationManager.initEnv(); - ConfigurationManager.intConfig(); - setConfigFromCommandLineArgument(commandLineArgument); - log.info("Current configuration: " + ConfigurationManager.formatProxyConfig()); - - } - protected static CommandLineArgument parseCommandLineArgument(String[] args) { CommandLine commandLine = ServerUtil.parseCmdLine("mqproxy", args, - buildCommandlineOptions(), new DefaultParser()); + buildCommandlineOptions(), new DefaultParser()); if (commandLine == null) { throw new RuntimeException("parse command line argument failed"); } @@ -164,94 +123,4 @@ private static Options buildCommandlineOptions() { return options; } - private static void setConfigFromCommandLineArgument(CommandLineArgument commandLineArgument) { - if (StringUtils.isNotBlank(commandLineArgument.getNamesrvAddr())) { - ConfigurationManager.getProxyConfig().setNamesrvAddr(commandLineArgument.getNamesrvAddr()); - } - if (StringUtils.isNotBlank(commandLineArgument.getBrokerConfigPath())) { - ConfigurationManager.getProxyConfig().setBrokerConfigPath(commandLineArgument.getBrokerConfigPath()); - } - if (StringUtils.isNotBlank(commandLineArgument.getProxyMode())) { - ConfigurationManager.getProxyConfig().setProxyMode(commandLineArgument.getProxyMode()); - } - } - - protected static MessagingProcessor createMessagingProcessor() { - String proxyModeStr = ConfigurationManager.getProxyConfig().getProxyMode(); - MessagingProcessor messagingProcessor; - - if (ProxyMode.isClusterMode(proxyModeStr)) { - messagingProcessor = DefaultMessagingProcessor.createForClusterMode(); - ProxyMetricsManager proxyMetricsManager = ProxyMetricsManager.initClusterMode(ConfigurationManager.getProxyConfig()); - PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(proxyMetricsManager); - } else if (ProxyMode.isLocalMode(proxyModeStr)) { - BrokerController brokerController = createBrokerController(); - ProxyMetricsManager.initLocalMode(brokerController.getBrokerMetricsManager(), ConfigurationManager.getProxyConfig()); - StartAndShutdown brokerControllerWrapper = new StartAndShutdown() { - @Override - public void start() throws Exception { - brokerController.start(); - String tip = "The broker[" + brokerController.getBrokerConfig().getBrokerName() + ", " - + brokerController.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); - if (null != brokerController.getBrokerConfig().getNamesrvAddr()) { - tip += " and name server is " + brokerController.getBrokerConfig().getNamesrvAddr(); - } - log.info(tip); - } - - @Override - public void shutdown() throws Exception { - brokerController.shutdown(); - } - }; - PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(brokerControllerWrapper); - messagingProcessor = DefaultMessagingProcessor.createForLocalMode(brokerController); - } else { - throw new IllegalArgumentException("try to start grpc server with wrong mode, use 'local' or 'cluster'"); - } - PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(messagingProcessor); - return messagingProcessor; - } - - private static GrpcMessagingApplication createServiceProcessor(MessagingProcessor messagingProcessor) { - GrpcMessagingApplication application = GrpcMessagingApplication.create(messagingProcessor); - PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(application); - return application; - } - - protected static BrokerController createBrokerController() { - ProxyConfig config = ConfigurationManager.getProxyConfig(); - List brokerStartupArgList = Lists.newArrayList("-c", config.getBrokerConfigPath()); - if (StringUtils.isNotBlank(config.getNamesrvAddr())) { - brokerStartupArgList.add("-n"); - brokerStartupArgList.add(config.getNamesrvAddr()); - } - String[] brokerStartupArgs = brokerStartupArgList.toArray(new String[0]); - return BrokerStartup.createBrokerController(brokerStartupArgs); - } - - public static ThreadPoolExecutor createServerExecutor() { - ProxyConfig config = ConfigurationManager.getProxyConfig(); - int threadPoolNums = config.getGrpcThreadPoolNums(); - int threadPoolQueueCapacity = config.getGrpcThreadPoolQueueCapacity(); - ThreadPoolExecutor executor = ThreadPoolMonitor.createAndMonitor( - threadPoolNums, - threadPoolNums, - 1, TimeUnit.MINUTES, - "GrpcRequestExecutorThread", - threadPoolQueueCapacity - ); - PROXY_START_AND_SHUTDOWN.appendShutdown(executor::shutdown); - return executor; - } - - public static void initThreadPoolMonitor() { - ProxyConfig config = ConfigurationManager.getProxyConfig(); - ThreadPoolMonitor.config( - LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME), - LoggerFactory.getLogger(LoggerName.PROXY_WATER_MARK_LOGGER_NAME), - config.isEnablePrintJstack(), config.getPrintJstackInMillis(), - config.getPrintThreadPoolStatusInMillis()); - ThreadPoolMonitor.init(); - } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java index d5b896fe144..73406fa335f 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java @@ -17,14 +17,15 @@ package org.apache.rocketmq.proxy.grpc; -import java.util.concurrent.TimeUnit; import io.grpc.Server; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; -import org.apache.rocketmq.common.utils.StartAndShutdown; +import org.apache.rocketmq.proxy.spi.ProxyServerBase; + +import java.util.concurrent.TimeUnit; -public class GrpcServer implements StartAndShutdown { +public class GrpcServer extends ProxyServerBase { private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); private final Server server; @@ -39,11 +40,13 @@ protected GrpcServer(Server server, long timeout, TimeUnit unit) { this.unit = unit; } + @Override public void start() throws Exception { this.server.start(); log.info("grpc server start successfully."); } + @Override public void shutdown() { try { this.server.shutdown().awaitTermination(timeout, unit); @@ -52,4 +55,4 @@ public void shutdown() { e.printStackTrace(); } } -} \ No newline at end of file +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java index a26ed6b9c90..f7d98e07d3d 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java @@ -16,41 +16,61 @@ */ package org.apache.rocketmq.proxy.grpc; -import io.grpc.BindableService; -import io.grpc.ServerInterceptor; -import io.grpc.ServerServiceDefinition; import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; import io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoopGroup; import io.grpc.netty.shaded.io.netty.channel.epoll.EpollServerSocketChannel; import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup; import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioServerSocketChannel; -import java.util.List; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import org.apache.rocketmq.acl.AccessValidator; +import io.grpc.protobuf.services.ChannelzService; +import io.grpc.protobuf.services.ProtoReflectionService; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.thread.ThreadPoolMonitor; +import org.apache.rocketmq.common.utils.StartAndShutdown; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.config.ConfigurationManager; +import org.apache.rocketmq.proxy.config.ProxyConfig; import org.apache.rocketmq.proxy.grpc.interceptor.AuthenticationInterceptor; import org.apache.rocketmq.proxy.grpc.interceptor.ContextInterceptor; import org.apache.rocketmq.proxy.grpc.interceptor.GlobalExceptionInterceptor; import org.apache.rocketmq.proxy.grpc.interceptor.HeaderInterceptor; +import org.apache.rocketmq.proxy.grpc.v2.GrpcMessagingApplication; +import org.apache.rocketmq.proxy.spi.ProxyServerFactoryBase; -public class GrpcServerBuilder { - private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); - protected NettyServerBuilder serverBuilder; - - protected long time = 30; - - protected TimeUnit unit = TimeUnit.SECONDS; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; - public static GrpcServerBuilder newBuilder(ThreadPoolExecutor executor, int port) { - return new GrpcServerBuilder(executor, port); - } +public class GrpcServerBuilder extends ProxyServerFactoryBase { + private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); - protected GrpcServerBuilder(ThreadPoolExecutor executor, int port) { - serverBuilder = NettyServerBuilder.forPort(port); + @Override + public GrpcServer build() { + int port = ConfigurationManager.getProxyConfig().getGrpcServerPort(); + // + ThreadPoolExecutor executor = createServerExecutor(); + initializer.getStartAndShutdowns().add(new StartAndShutdown() { + @Override + public void shutdown() throws Exception { + executor.shutdown(); + } + + @Override + public void start() throws Exception { + + } + }); + // + GrpcMessagingApplication application = GrpcMessagingApplication.create(initializer.getMessagingProcessor()); + initializer.getStartAndShutdowns().add(application); + // + NettyServerBuilder serverBuilder = NettyServerBuilder.forPort(port); + serverBuilder.addService(application) + .addService(ChannelzService.newInstance(100)) + .addService(ProtoReflectionService.newInstance()) + .intercept(new AuthenticationInterceptor(validators)) + .intercept(new GlobalExceptionInterceptor()) + .intercept(new ContextInterceptor()) + .intercept(new HeaderInterceptor()); serverBuilder.protocolNegotiator(new ProxyAndTlsProtocolNegotiator()); @@ -79,43 +99,20 @@ protected GrpcServerBuilder(ThreadPoolExecutor executor, int port) { "grpc server has built. port: {}, tlsKeyPath: {}, tlsCertPath: {}, threadPool: {}, queueCapacity: {}, " + "boosLoop: {}, workerLoop: {}, maxInboundMessageSize: {}", port, bossLoopNum, workerLoopNum, maxInboundMessageSize); - } - public GrpcServerBuilder shutdownTime(long time, TimeUnit unit) { - this.time = time; - this.unit = unit; - return this; + return new GrpcServer(serverBuilder.build(), ConfigurationManager.getProxyConfig().getGrpcShutdownTimeSeconds(), TimeUnit.SECONDS); } - public GrpcServerBuilder addService(BindableService service) { - this.serverBuilder.addService(service); - return this; - } - - public GrpcServerBuilder addService(ServerServiceDefinition service) { - this.serverBuilder.addService(service); - return this; - } - - public GrpcServerBuilder appendInterceptor(ServerInterceptor interceptor) { - this.serverBuilder.intercept(interceptor); - return this; - } - - public GrpcServer build() { - return new GrpcServer(this.serverBuilder.build(), time, unit); - } - - public GrpcServerBuilder configInterceptor(List accessValidators) { - // grpc interceptors, including acl, logging etc. - this.serverBuilder - .intercept(new AuthenticationInterceptor(accessValidators)); - - this.serverBuilder - .intercept(new GlobalExceptionInterceptor()) - .intercept(new ContextInterceptor()) - .intercept(new HeaderInterceptor()); - - return this; + private ThreadPoolExecutor createServerExecutor() { + ProxyConfig config = ConfigurationManager.getProxyConfig(); + int threadPoolNums = config.getGrpcThreadPoolNums(); + int threadPoolQueueCapacity = config.getGrpcThreadPoolQueueCapacity(); + return ThreadPoolMonitor.createAndMonitor( + threadPoolNums, + threadPoolNums, + 1, TimeUnit.MINUTES, + "GrpcRequestExecutorThread", + threadPoolQueueCapacity + ); } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/mixed/MixedProxyServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/mixed/MixedProxyServer.java new file mode 100644 index 00000000000..53d9b185363 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/mixed/MixedProxyServer.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.mixed; + +import org.apache.rocketmq.common.utils.StartAndShutdown; +import org.apache.rocketmq.proxy.spi.ProxyServer; +import org.apache.rocketmq.proxy.spi.ProxyServerBase; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class MixedProxyServer extends ProxyServerBase { + + private List proxyServers; + + @Override + public void shutdown() throws Exception { + for (ProxyServer proxyServer : proxyServers) { + proxyServer.shutdown(); + } + } + + @Override + public void start() throws Exception { + for (ProxyServer proxyServer : this.proxyServers) { + proxyServer.start(); + } + } + + @Override + public void preShutdown() throws Exception { + for (ProxyServer proxyServer : this.proxyServers) { + proxyServer.preShutdown(); + } + } + + @Override + public List getStartAndShutdowns() { + Set startAndShutdowns = new HashSet<>(); + for (ProxyServer proxyServer : this.proxyServers) { + startAndShutdowns.addAll(proxyServer.getStartAndShutdowns()); + } + return new ArrayList<>(startAndShutdowns); + } + + public void setProxyServers(List proxyServers) { + this.proxyServers = proxyServers; + } + + public List getProxyServers() { + return proxyServers; + } + +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/mixed/MixedProxyServerBuilder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/mixed/MixedProxyServerBuilder.java new file mode 100644 index 00000000000..4a55730e5e7 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/mixed/MixedProxyServerBuilder.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.mixed; + +import org.apache.rocketmq.common.utils.ServiceProvider; +import org.apache.rocketmq.proxy.spi.ProxyServer; +import org.apache.rocketmq.proxy.spi.ProxyServerBase; +import org.apache.rocketmq.proxy.spi.ProxyServerFactory; +import org.apache.rocketmq.proxy.spi.ProxyServerFactoryBase; + +import java.util.ArrayList; +import java.util.List; + +public class MixedProxyServerBuilder extends ProxyServerFactoryBase { + + @Override + protected ProxyServerBase build() { + String prefix = "META-INF/services/" + ProxyServerFactory.class.getName(); + List factories = ServiceProvider.load(prefix, ProxyServerFactory.class); + List proxyServers = new ArrayList(); + for (ProxyServerFactory psf : factories) { + if (psf instanceof MixedProxyServerBuilder) { + continue; + } + proxyServers.add(psf.withAccessValidators(validators).withInitializer(initializer).get()); + } + MixedProxyServer server = new MixedProxyServer(); + server.setProxyServers(proxyServers); + return server; + } + +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java index 14c7c0db6fa..fe73f852f8c 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java @@ -19,52 +19,37 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.channel.Channel; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import org.apache.rocketmq.acl.AccessValidator; import org.apache.rocketmq.auth.config.AuthConfig; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.future.FutureTaskExt; import org.apache.rocketmq.common.thread.ThreadPoolMonitor; import org.apache.rocketmq.common.thread.ThreadPoolStatusMonitor; -import org.apache.rocketmq.common.utils.StartAndShutdown; import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.ProxyConfig; import org.apache.rocketmq.proxy.processor.MessagingProcessor; -import org.apache.rocketmq.proxy.remoting.activity.AckMessageActivity; -import org.apache.rocketmq.proxy.remoting.activity.ChangeInvisibleTimeActivity; -import org.apache.rocketmq.proxy.remoting.activity.ClientManagerActivity; -import org.apache.rocketmq.proxy.remoting.activity.ConsumerManagerActivity; -import org.apache.rocketmq.proxy.remoting.activity.GetTopicRouteActivity; -import org.apache.rocketmq.proxy.remoting.activity.PopMessageActivity; -import org.apache.rocketmq.proxy.remoting.activity.PullMessageActivity; -import org.apache.rocketmq.proxy.remoting.activity.SendMessageActivity; -import org.apache.rocketmq.proxy.remoting.activity.TransactionActivity; +import org.apache.rocketmq.proxy.remoting.activity.*; import org.apache.rocketmq.proxy.remoting.channel.RemotingChannelManager; import org.apache.rocketmq.proxy.remoting.pipeline.AuthenticationPipeline; import org.apache.rocketmq.proxy.remoting.pipeline.AuthorizationPipeline; import org.apache.rocketmq.proxy.remoting.pipeline.ContextInitPipeline; import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; +import org.apache.rocketmq.proxy.spi.ProxyServerBase; import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RemotingServer; -import org.apache.rocketmq.remoting.netty.NettyRemotingServer; -import org.apache.rocketmq.remoting.netty.NettyServerConfig; -import org.apache.rocketmq.remoting.netty.RequestTask; -import org.apache.rocketmq.remoting.netty.ResponseFuture; -import org.apache.rocketmq.remoting.netty.TlsSystemConfig; +import org.apache.rocketmq.remoting.netty.*; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RequestCode; import org.apache.rocketmq.remoting.protocol.ResponseCode; -public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOutClient { +import java.util.List; +import java.util.concurrent.*; + +public class RemotingProtocolServer extends ProxyServerBase implements RemotingProxyOutClient { private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); protected final MessagingProcessor messagingProcessor; diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServerBuilder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServerBuilder.java new file mode 100644 index 00000000000..42493603316 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServerBuilder.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting; + +import org.apache.rocketmq.proxy.spi.ProxyServerBase; +import org.apache.rocketmq.proxy.spi.ProxyServerFactoryBase; + +public class RemotingProtocolServerBuilder extends ProxyServerFactoryBase { + + @Override + protected ProxyServerBase build() { + return new RemotingProtocolServer(initializer.getMessagingProcessor(), validators); + } + +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServer.java new file mode 100644 index 00000000000..a0ab182d7f7 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServer.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.proxy.spi; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.utils.StartAndShutdown; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; + +import java.util.List; + +public interface ProxyServer extends StartAndShutdown { + + BrokerController getBrokerController(); + + List getStartAndShutdowns(); + + MessagingProcessor getMessagingProcessor(); +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerBase.java b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerBase.java new file mode 100644 index 00000000000..d77365e90d6 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerBase.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.spi; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.utils.StartAndShutdown; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; + +import java.util.List; + +public abstract class ProxyServerBase implements ProxyServer { + + private BrokerController brokerController; + private List startAndShutdowns; + private MessagingProcessor messagingProcessor; + + @Override + public List getStartAndShutdowns() { + return startAndShutdowns; + } + + @Override + public BrokerController getBrokerController() { + return brokerController; + } + + @Override + public MessagingProcessor getMessagingProcessor() { + return messagingProcessor; + } + + public ProxyServerBase setBrokerController(BrokerController brokerController) { + this.brokerController = brokerController; + return this; + } + + public ProxyServerBase setStartAndShutdowns(List startAndShutdowns) { + this.startAndShutdowns = startAndShutdowns; + return this; + } + + public ProxyServerBase setMessagingProcessor(MessagingProcessor messagingProcessor) { + this.messagingProcessor = messagingProcessor; + return this; + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerFactory.java b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerFactory.java new file mode 100644 index 00000000000..de1042271cd --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerFactory.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.spi; + +import org.apache.rocketmq.acl.AccessValidator; + +import java.util.List; +import java.util.function.Supplier; + +public interface ProxyServerFactory extends Supplier { + + ProxyServerFactory withAccessValidators(List accessValidators); + + ProxyServerFactory withInitializer(ProxyServerInitializer proxyServerInitializer); + +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerFactoryBase.java b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerFactoryBase.java new file mode 100644 index 00000000000..61a8fedc76a --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerFactoryBase.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.spi; + +import org.apache.rocketmq.acl.AccessValidator; + +import java.util.List; + +public abstract class ProxyServerFactoryBase implements ProxyServerFactory { + + protected List validators; + protected ProxyServerInitializer initializer; + + @Override + public ProxyServerFactory withAccessValidators(List accessValidators) { + this.validators = accessValidators; + return this; + } + + @Override + public ProxyServerFactory withInitializer(ProxyServerInitializer proxyServerInitializer) { + this.initializer = proxyServerInitializer; + return this; + } + + @Override + public final ProxyServerBase get() { + ProxyServerBase serverBase = build(); + serverBase.setBrokerController(this.initializer.getBrokerController()); + serverBase.setStartAndShutdowns(this.initializer.getStartAndShutdowns()); + serverBase.setMessagingProcessor(this.initializer.getMessagingProcessor()); + return serverBase; + } + + protected abstract ProxyServerBase build(); +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerInitializer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerInitializer.java new file mode 100644 index 00000000000..b8fd3e4891a --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerInitializer.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.spi; + +import com.google.common.collect.Lists; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.BrokerStartup; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.thread.ThreadPoolMonitor; +import org.apache.rocketmq.common.utils.StartAndShutdown; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.proxy.CommandLineArgument; +import org.apache.rocketmq.proxy.ProxyMode; +import org.apache.rocketmq.proxy.config.Configuration; +import org.apache.rocketmq.proxy.config.ConfigurationManager; +import org.apache.rocketmq.proxy.config.ProxyConfig; +import org.apache.rocketmq.proxy.metrics.ProxyMetricsManager; +import org.apache.rocketmq.proxy.processor.DefaultMessagingProcessor; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +public class ProxyServerInitializer { + + private final CommandLineArgument commandLineArgument; + + private final MessagingProcessor messagingProcessor; + private final ProxyMetricsManager proxyMetricsManager; + private final BrokerController brokerController; + private final StartAndShutdown brokerStartAndShutdownWrapper; + private final List startAndShutdowns = new ArrayList(); + + public ProxyServerInitializer(CommandLineArgument commandLineArgument) throws Exception { + this.commandLineArgument = Objects.requireNonNull(commandLineArgument, "commandLineArgument is null"); + initializeConfiguration(); + validateConfiguration(); + initializeThreadPoolMonitor(); + // init services + String proxyModeStr = ConfigurationManager.getProxyConfig().getProxyMode(); + if (ProxyMode.isClusterMode(proxyModeStr)) { + this.brokerController = null; + this.brokerStartAndShutdownWrapper = new StartAndShutdown() { + @Override + public void shutdown() throws Exception { + + } + + @Override + public void start() throws Exception { + + } + }; + this.messagingProcessor = DefaultMessagingProcessor.createForClusterMode(); + this.proxyMetricsManager = ProxyMetricsManager.initClusterMode(ConfigurationManager.getProxyConfig()); + } else { + this.brokerController = createBrokerController(); + ProxyMetricsManager.initLocalMode(brokerController.getBrokerMetricsManager(), ConfigurationManager.getProxyConfig()); + this.brokerStartAndShutdownWrapper = new StartAndShutdown() { + @Override + public void start() throws Exception { + brokerController.start(); + } + + @Override + public void shutdown() throws Exception { + brokerController.shutdown(); + } + }; + this.messagingProcessor = DefaultMessagingProcessor.createForLocalMode(brokerController); + this.proxyMetricsManager = null; + } + // + initializeStartAndShutdowns(); + } + + private void initializeConfiguration() throws Exception { + if (StringUtils.isNotBlank(commandLineArgument.getProxyConfigPath())) { + System.setProperty(Configuration.CONFIG_PATH_PROPERTY, commandLineArgument.getProxyConfigPath()); + } + ConfigurationManager.initEnv(); + ConfigurationManager.intConfig(); + if (StringUtils.isNotBlank(commandLineArgument.getNamesrvAddr())) { + ConfigurationManager.getProxyConfig().setNamesrvAddr(commandLineArgument.getNamesrvAddr()); + } + if (StringUtils.isNotBlank(commandLineArgument.getBrokerConfigPath())) { + ConfigurationManager.getProxyConfig().setBrokerConfigPath(commandLineArgument.getBrokerConfigPath()); + } + if (StringUtils.isNotBlank(commandLineArgument.getProxyMode())) { + ConfigurationManager.getProxyConfig().setProxyMode(commandLineArgument.getProxyMode()); + } + } + + private void validateConfiguration() throws Exception { + String proxyModeStr = ConfigurationManager.getProxyConfig().getProxyMode(); + if (!ProxyMode.isClusterMode(proxyModeStr) || !ProxyMode.isLocalMode(proxyModeStr)) { + throw new IllegalArgumentException("try to start proxy server with wrong mode, use 'local' or 'cluster'"); + } + } + + private void initializeThreadPoolMonitor() { + ProxyConfig config = ConfigurationManager.getProxyConfig(); + ThreadPoolMonitor.config( + LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME), + LoggerFactory.getLogger(LoggerName.PROXY_WATER_MARK_LOGGER_NAME), + config.isEnablePrintJstack(), config.getPrintJstackInMillis(), + config.getPrintThreadPoolStatusInMillis()); + ThreadPoolMonitor.init(); + } + + private BrokerController createBrokerController() { + ProxyConfig config = ConfigurationManager.getProxyConfig(); + List brokerStartupArgList = Lists.newArrayList("-c", config.getBrokerConfigPath()); + if (StringUtils.isNotBlank(config.getNamesrvAddr())) { + brokerStartupArgList.add("-n"); + brokerStartupArgList.add(config.getNamesrvAddr()); + } + String[] brokerStartupArgs = brokerStartupArgList.toArray(new String[0]); + return BrokerStartup.createBrokerController(brokerStartupArgs); + } + + private void initializeStartAndShutdowns() throws Exception { + this.startAndShutdowns.add(this.brokerStartAndShutdownWrapper); + this.startAndShutdowns.add(this.messagingProcessor); + if (this.proxyMetricsManager != null) { + this.startAndShutdowns.add(this.proxyMetricsManager); + } + } + + public CommandLineArgument getCommandLineArgument() { + return commandLineArgument; + } + + public MessagingProcessor getMessagingProcessor() { + return messagingProcessor; + } + + public ProxyMetricsManager getProxyMetricsManager() { + return proxyMetricsManager; + } + + public BrokerController getBrokerController() { + return brokerController; + } + + public StartAndShutdown getBrokerStartAndShutdownWrapper() { + return brokerStartAndShutdownWrapper; + } + + public List getStartAndShutdowns() { + return startAndShutdowns; + } +} diff --git a/proxy/src/main/resources/META-INF/service/org.apache.rocketmq.proxy.spi.ProxyServerFactory b/proxy/src/main/resources/META-INF/service/org.apache.rocketmq.proxy.spi.ProxyServerFactory new file mode 100644 index 00000000000..2a33699c9be --- /dev/null +++ b/proxy/src/main/resources/META-INF/service/org.apache.rocketmq.proxy.spi.ProxyServerFactory @@ -0,0 +1 @@ +org.apache.rocketmq.proxy.mixed.MixedProxyServerBuilder diff --git a/proxy/src/main/resources/META-INF/services/org.apache.rocketmq.proxy.spi.ProxyServerFactory b/proxy/src/main/resources/META-INF/services/org.apache.rocketmq.proxy.spi.ProxyServerFactory new file mode 100644 index 00000000000..0b484543214 --- /dev/null +++ b/proxy/src/main/resources/META-INF/services/org.apache.rocketmq.proxy.spi.ProxyServerFactory @@ -0,0 +1,3 @@ +org.apache.rocketmq.proxy.grpc.GrpcServerBuilder +org.apache.rocketmq.proxy.remoting.RemotingProtocolServerBuilder +org.apache.rocketmq.proxy.mixed.MixedProxyServerBuilder diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/ProxyStartupTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/ProxyStartupTest.java index 58213df4adf..6e3982ea835 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/ProxyStartupTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/ProxyStartupTest.java @@ -19,41 +19,24 @@ import com.google.common.base.Preconditions; import com.google.common.base.Splitter; -import io.opentelemetry.sdk.OpenTelemetrySdk; -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Iterator; -import java.util.UUID; -import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.broker.BrokerStartup; -import org.apache.rocketmq.broker.metrics.BrokerMetricsManager; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.proxy.config.Configuration; -import org.apache.rocketmq.proxy.config.ConfigurationManager; -import org.apache.rocketmq.proxy.config.ProxyConfig; -import org.apache.rocketmq.proxy.processor.DefaultMessagingProcessor; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.MockedStatic; -import org.mockito.Mockito; import org.springframework.core.io.Resource; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Iterator; +import java.util.UUID; + import static org.apache.rocketmq.proxy.config.ConfigurationManager.RMQ_PROXY_HOME; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertSame; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockStatic; public class ProxyStartupTest { @@ -160,121 +143,6 @@ public void testParseAndInitCommandLineArgument() throws Exception { assertEquals(proxyMode, commandLineArgument.getProxyMode()); assertEquals(namesrvAddr, commandLineArgument.getNamesrvAddr()); - ProxyStartup.initConfiguration(commandLineArgument); - - ProxyConfig config = ConfigurationManager.getProxyConfig(); - assertEquals(brokerConfigPath, config.getBrokerConfigPath()); - assertEquals(proxyMode, config.getProxyMode()); - assertEquals(namesrvAddr, config.getNamesrvAddr()); - } - - @Test - public void testLocalModeWithNameSrvAddrByProperty() throws Exception { - String namesrvAddr = "namesrvAddr"; - System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr); - CommandLineArgument commandLineArgument = ProxyStartup.parseCommandLineArgument(new String[] { - "-pm", "local" - }); - ProxyStartup.initConfiguration(commandLineArgument); - - ProxyConfig config = ConfigurationManager.getProxyConfig(); - assertEquals(namesrvAddr, config.getNamesrvAddr()); - - validateBrokerCreateArgsWithNamsrvAddr(config, namesrvAddr); - } - - private void validateBrokerCreateArgsWithNamsrvAddr(ProxyConfig config, String namesrvAddr) { - try (MockedStatic brokerStartupMocked = mockStatic(BrokerStartup.class); - MockedStatic messagingProcessorMocked = mockStatic(DefaultMessagingProcessor.class)) { - ArgumentCaptor args = ArgumentCaptor.forClass(Object.class); - BrokerController brokerControllerMocked = mock(BrokerController.class); - BrokerMetricsManager brokerMetricsManagerMocked = mock(BrokerMetricsManager.class); - Mockito.when(brokerMetricsManagerMocked.getBrokerMeter()).thenReturn(OpenTelemetrySdk.builder().build().getMeter("test")); - Mockito.when(brokerControllerMocked.getBrokerMetricsManager()).thenReturn(brokerMetricsManagerMocked); - brokerStartupMocked.when(() -> BrokerStartup.createBrokerController((String[]) args.capture())) - .thenReturn(brokerControllerMocked); - messagingProcessorMocked.when(() -> DefaultMessagingProcessor.createForLocalMode(any(), any())) - .thenReturn(mock(DefaultMessagingProcessor.class)); - - ProxyStartup.createMessagingProcessor(); - String[] passArgs = (String[]) args.getValue(); - assertEquals("-c", passArgs[0]); - assertEquals(config.getBrokerConfigPath(), passArgs[1]); - assertEquals("-n", passArgs[2]); - assertEquals(namesrvAddr, passArgs[3]); - assertEquals(4, passArgs.length); - } - } - - @Test - public void testLocalModeWithNameSrvAddrByConfigFile() throws Exception { - String namesrvAddr = "namesrvAddr"; - System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "foo"); - Path configFilePath = Files.createTempFile("testLocalModeWithNameSrvAddrByConfigFile", ".json"); - String configData = "{\n" + - " \"namesrvAddr\": \"namesrvAddr\"\n" + - "}"; - Files.write(configFilePath, configData.getBytes(StandardCharsets.UTF_8)); - - CommandLineArgument commandLineArgument = ProxyStartup.parseCommandLineArgument(new String[] { - "-pm", "local", - "-pc", configFilePath.toAbsolutePath().toString() - }); - ProxyStartup.initConfiguration(commandLineArgument); - - ProxyConfig config = ConfigurationManager.getProxyConfig(); - assertEquals(namesrvAddr, config.getNamesrvAddr()); - - validateBrokerCreateArgsWithNamsrvAddr(config, namesrvAddr); - } - - @Test - public void testLocalModeWithNameSrvAddrByCommandLine() throws Exception { - String namesrvAddr = "namesrvAddr"; - System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "foo"); - Path configFilePath = Files.createTempFile("testLocalModeWithNameSrvAddrByCommandLine", ".json"); - String configData = "{\n" + - " \"namesrvAddr\": \"foo\"\n" + - "}"; - Files.write(configFilePath, configData.getBytes(StandardCharsets.UTF_8)); - - CommandLineArgument commandLineArgument = ProxyStartup.parseCommandLineArgument(new String[] { - "-pm", "local", - "-pc", configFilePath.toAbsolutePath().toString(), - "-n", namesrvAddr - }); - ProxyStartup.initConfiguration(commandLineArgument); - - ProxyConfig config = ConfigurationManager.getProxyConfig(); - assertEquals(namesrvAddr, config.getNamesrvAddr()); - - validateBrokerCreateArgsWithNamsrvAddr(config, namesrvAddr); - } - - @Test - public void testLocalModeWithAllArgs() throws Exception { - String namesrvAddr = "namesrvAddr"; - System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "foo"); - Path configFilePath = Files.createTempFile("testLocalMode", ".json"); - String configData = "{\n" + - " \"namesrvAddr\": \"foo\"\n" + - "}"; - Files.write(configFilePath, configData.getBytes(StandardCharsets.UTF_8)); - Path brokerConfigFilePath = Files.createTempFile("testLocalModeBrokerConfig", ".json"); - - CommandLineArgument commandLineArgument = ProxyStartup.parseCommandLineArgument(new String[] { - "-pm", "local", - "-pc", configFilePath.toAbsolutePath().toString(), - "-n", namesrvAddr, - "-bc", brokerConfigFilePath.toAbsolutePath().toString() - }); - ProxyStartup.initConfiguration(commandLineArgument); - - ProxyConfig config = ConfigurationManager.getProxyConfig(); - assertEquals(namesrvAddr, config.getNamesrvAddr()); - assertEquals(brokerConfigFilePath.toAbsolutePath().toString(), config.getBrokerConfigPath()); - - validateBrokerCreateArgsWithNamsrvAddr(config, namesrvAddr); } @Test @@ -282,14 +150,6 @@ public void testClusterMode() throws Exception { CommandLineArgument commandLineArgument = ProxyStartup.parseCommandLineArgument(new String[] { "-pm", "cluster" }); - ProxyStartup.initConfiguration(commandLineArgument); - - try (MockedStatic messagingProcessorMocked = mockStatic(DefaultMessagingProcessor.class)) { - DefaultMessagingProcessor processor = mock(DefaultMessagingProcessor.class); - messagingProcessorMocked.when(DefaultMessagingProcessor::createForClusterMode) - .thenReturn(processor); - - assertSame(processor, ProxyStartup.createMessagingProcessor()); - } + assertEquals("cluster", commandLineArgument.getProxyMode()); } } From 7f2c2d358698315800b9c65cf6703bd3f253f18d Mon Sep 17 00:00:00 2001 From: xiaoma20082008 <10076393+xiaoma20082008@users.noreply.github.com> Date: Wed, 23 Oct 2024 09:58:10 +0800 Subject: [PATCH 2/4] fix(proxy): fix wrong nums of StartAndShutdown --- .../rocketmq/proxy/grpc/GrpcServerBuilder.java | 4 ++-- .../rocketmq/proxy/spi/ProxyServerFactoryBase.java | 12 +++++++++++- .../rocketmq/proxy/spi/ProxyServerInitializer.java | 2 +- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java index f7d98e07d3d..94ea64fa672 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java @@ -48,7 +48,7 @@ public GrpcServer build() { int port = ConfigurationManager.getProxyConfig().getGrpcServerPort(); // ThreadPoolExecutor executor = createServerExecutor(); - initializer.getStartAndShutdowns().add(new StartAndShutdown() { + appendStartAndShutdown(new StartAndShutdown() { @Override public void shutdown() throws Exception { executor.shutdown(); @@ -61,7 +61,7 @@ public void start() throws Exception { }); // GrpcMessagingApplication application = GrpcMessagingApplication.create(initializer.getMessagingProcessor()); - initializer.getStartAndShutdowns().add(application); + appendStartAndShutdown(application); // NettyServerBuilder serverBuilder = NettyServerBuilder.forPort(port); serverBuilder.addService(application) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerFactoryBase.java b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerFactoryBase.java index 61a8fedc76a..204f3c096b0 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerFactoryBase.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerFactoryBase.java @@ -18,13 +18,16 @@ package org.apache.rocketmq.proxy.spi; import org.apache.rocketmq.acl.AccessValidator; +import org.apache.rocketmq.common.utils.StartAndShutdown; +import java.util.ArrayList; import java.util.List; public abstract class ProxyServerFactoryBase implements ProxyServerFactory { protected List validators; protected ProxyServerInitializer initializer; + protected final List startAndShutdowns = new ArrayList(); @Override public ProxyServerFactory withAccessValidators(List accessValidators) { @@ -41,11 +44,18 @@ public ProxyServerFactory withInitializer(ProxyServerInitializer proxyServerInit @Override public final ProxyServerBase get() { ProxyServerBase serverBase = build(); + this.initializer.getStartAndShutdowns().forEach(this::appendStartAndShutdown); serverBase.setBrokerController(this.initializer.getBrokerController()); - serverBase.setStartAndShutdowns(this.initializer.getStartAndShutdowns()); + serverBase.setStartAndShutdowns(this.startAndShutdowns); serverBase.setMessagingProcessor(this.initializer.getMessagingProcessor()); return serverBase; } + protected void appendStartAndShutdown(StartAndShutdown sas) { + if (sas != null && !this.startAndShutdowns.contains(sas)) { + this.startAndShutdowns.add(sas); + } + } + protected abstract ProxyServerBase build(); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerInitializer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerInitializer.java index b8fd3e4891a..7f165e3c7bd 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerInitializer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerInitializer.java @@ -110,7 +110,7 @@ private void initializeConfiguration() throws Exception { private void validateConfiguration() throws Exception { String proxyModeStr = ConfigurationManager.getProxyConfig().getProxyMode(); - if (!ProxyMode.isClusterMode(proxyModeStr) || !ProxyMode.isLocalMode(proxyModeStr)) { + if (!ProxyMode.isClusterMode(proxyModeStr) && !ProxyMode.isLocalMode(proxyModeStr)) { throw new IllegalArgumentException("try to start proxy server with wrong mode, use 'local' or 'cluster'"); } } From 75e4e063b22e5e2bfb3dcd3c34f753b08c5c537d Mon Sep 17 00:00:00 2001 From: xiaoma20082008 <10076393+xiaoma20082008@users.noreply.github.com> Date: Wed, 23 Oct 2024 10:01:18 +0800 Subject: [PATCH 3/4] fix(proxy): fix wrong package imports --- .../remoting/RemotingProtocolServer.java | 25 +++++++++++++++---- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java index fe73f852f8c..12cf7481490 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java @@ -19,6 +19,12 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.channel.Channel; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.acl.AccessValidator; import org.apache.rocketmq.auth.config.AuthConfig; import org.apache.rocketmq.common.constant.LoggerName; @@ -31,7 +37,15 @@ import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.ProxyConfig; import org.apache.rocketmq.proxy.processor.MessagingProcessor; -import org.apache.rocketmq.proxy.remoting.activity.*; +import org.apache.rocketmq.proxy.remoting.activity.AckMessageActivity; +import org.apache.rocketmq.proxy.remoting.activity.ChangeInvisibleTimeActivity; +import org.apache.rocketmq.proxy.remoting.activity.ClientManagerActivity; +import org.apache.rocketmq.proxy.remoting.activity.ConsumerManagerActivity; +import org.apache.rocketmq.proxy.remoting.activity.GetTopicRouteActivity; +import org.apache.rocketmq.proxy.remoting.activity.PopMessageActivity; +import org.apache.rocketmq.proxy.remoting.activity.PullMessageActivity; +import org.apache.rocketmq.proxy.remoting.activity.SendMessageActivity; +import org.apache.rocketmq.proxy.remoting.activity.TransactionActivity; import org.apache.rocketmq.proxy.remoting.channel.RemotingChannelManager; import org.apache.rocketmq.proxy.remoting.pipeline.AuthenticationPipeline; import org.apache.rocketmq.proxy.remoting.pipeline.AuthorizationPipeline; @@ -41,14 +55,15 @@ import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RemotingServer; -import org.apache.rocketmq.remoting.netty.*; +import org.apache.rocketmq.remoting.netty.NettyRemotingServer; +import org.apache.rocketmq.remoting.netty.NettyServerConfig; +import org.apache.rocketmq.remoting.netty.RequestTask; +import org.apache.rocketmq.remoting.netty.ResponseFuture; +import org.apache.rocketmq.remoting.netty.TlsSystemConfig; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RequestCode; import org.apache.rocketmq.remoting.protocol.ResponseCode; -import java.util.List; -import java.util.concurrent.*; - public class RemotingProtocolServer extends ProxyServerBase implements RemotingProxyOutClient { private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); From e70d8b290b43cfd507b6a9d2a135faeaef47d6c4 Mon Sep 17 00:00:00 2001 From: xiaoma20082008 <10076393+xiaoma20082008@users.noreply.github.com> Date: Mon, 4 Nov 2024 23:25:01 +0800 Subject: [PATCH 4/4] feat(proxy): format code --- .../apache/rocketmq/proxy/ProxyStartup.java | 13 ++++----- .../rocketmq/proxy/grpc/GrpcServer.java | 4 +-- .../proxy/grpc/GrpcServerBuilder.java | 29 +++++++++---------- .../proxy/mixed/MixedProxyServer.java | 8 ++--- .../proxy/mixed/MixedProxyServerBuilder.java | 6 ++-- .../RemotingProtocolServerBuilder.java | 1 - .../rocketmq/proxy/spi/ProxyServer.java | 3 +- .../rocketmq/proxy/spi/ProxyServerBase.java | 4 +-- .../proxy/spi/ProxyServerFactory.java | 4 +-- .../proxy/spi/ProxyServerFactoryBase.java | 6 ++-- .../proxy/spi/ProxyServerInitializer.java | 16 +++++----- ...ache.rocketmq.proxy.spi.ProxyServerFactory | 18 ++++++++++++ .../rocketmq/proxy/ProxyStartupTest.java | 17 ++++++----- 13 files changed, 66 insertions(+), 63 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java index 07afe8a26db..92d0e877a90 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java @@ -17,6 +17,8 @@ package org.apache.rocketmq.proxy; +import java.util.Date; +import java.util.List; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Option; @@ -35,9 +37,6 @@ import org.apache.rocketmq.proxy.spi.ProxyServerInitializer; import org.apache.rocketmq.srvutil.ServerUtil; -import java.util.Date; -import java.util.List; - public class ProxyStartup { private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); private static final ProxyStartAndShutdown PROXY_START_AND_SHUTDOWN = new ProxyStartAndShutdown(); @@ -56,9 +55,9 @@ public static void main(String[] args) { ProxyServerFactory factory = ServiceProvider.loadClass(ProxyServerFactory.class); ProxyServer server = factory - .withInitializer(new ProxyServerInitializer(commandLineArgument)) - .withAccessValidators(loadAccessValidators()) - .get(); + .withInitializer(new ProxyServerInitializer(commandLineArgument)) + .withAccessValidators(loadAccessValidators()) + .get(); server.getStartAndShutdowns().forEach(PROXY_START_AND_SHUTDOWN::appendStartAndShutdown); @@ -95,7 +94,7 @@ protected static List loadAccessValidators() { protected static CommandLineArgument parseCommandLineArgument(String[] args) { CommandLine commandLine = ServerUtil.parseCmdLine("mqproxy", args, - buildCommandlineOptions(), new DefaultParser()); + buildCommandlineOptions(), new DefaultParser()); if (commandLine == null) { throw new RuntimeException("parse command line argument failed"); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java index 73406fa335f..caab6ae60a7 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java @@ -14,17 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.rocketmq.proxy.grpc; import io.grpc.Server; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.spi.ProxyServerBase; -import java.util.concurrent.TimeUnit; - public class GrpcServer extends ProxyServerBase { private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java index 94ea64fa672..20a2441e6f4 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java @@ -23,6 +23,8 @@ import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioServerSocketChannel; import io.grpc.protobuf.services.ChannelzService; import io.grpc.protobuf.services.ProtoReflectionService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.thread.ThreadPoolMonitor; import org.apache.rocketmq.common.utils.StartAndShutdown; @@ -37,9 +39,6 @@ import org.apache.rocketmq.proxy.grpc.v2.GrpcMessagingApplication; import org.apache.rocketmq.proxy.spi.ProxyServerFactoryBase; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - public class GrpcServerBuilder extends ProxyServerFactoryBase { private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); @@ -65,12 +64,12 @@ public void start() throws Exception { // NettyServerBuilder serverBuilder = NettyServerBuilder.forPort(port); serverBuilder.addService(application) - .addService(ChannelzService.newInstance(100)) - .addService(ProtoReflectionService.newInstance()) - .intercept(new AuthenticationInterceptor(validators)) - .intercept(new GlobalExceptionInterceptor()) - .intercept(new ContextInterceptor()) - .intercept(new HeaderInterceptor()); + .addService(ChannelzService.newInstance(100)) + .addService(ProtoReflectionService.newInstance()) + .intercept(new AuthenticationInterceptor(validators)) + .intercept(new GlobalExceptionInterceptor()) + .intercept(new ContextInterceptor()) + .intercept(new HeaderInterceptor()); serverBuilder.protocolNegotiator(new ProxyAndTlsProtocolNegotiator()); @@ -93,7 +92,7 @@ public void start() throws Exception { } serverBuilder.maxInboundMessageSize(maxInboundMessageSize) - .maxConnectionIdle(idleTimeMills, TimeUnit.MILLISECONDS); + .maxConnectionIdle(idleTimeMills, TimeUnit.MILLISECONDS); log.info( "grpc server has built. port: {}, tlsKeyPath: {}, tlsCertPath: {}, threadPool: {}, queueCapacity: {}, " @@ -108,11 +107,11 @@ private ThreadPoolExecutor createServerExecutor() { int threadPoolNums = config.getGrpcThreadPoolNums(); int threadPoolQueueCapacity = config.getGrpcThreadPoolQueueCapacity(); return ThreadPoolMonitor.createAndMonitor( - threadPoolNums, - threadPoolNums, - 1, TimeUnit.MINUTES, - "GrpcRequestExecutorThread", - threadPoolQueueCapacity + threadPoolNums, + threadPoolNums, + 1, TimeUnit.MINUTES, + "GrpcRequestExecutorThread", + threadPoolQueueCapacity ); } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/mixed/MixedProxyServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/mixed/MixedProxyServer.java index 53d9b185363..02a46749f48 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/mixed/MixedProxyServer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/mixed/MixedProxyServer.java @@ -14,17 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.rocketmq.proxy.mixed; -import org.apache.rocketmq.common.utils.StartAndShutdown; -import org.apache.rocketmq.proxy.spi.ProxyServer; -import org.apache.rocketmq.proxy.spi.ProxyServerBase; - import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.rocketmq.common.utils.StartAndShutdown; +import org.apache.rocketmq.proxy.spi.ProxyServer; +import org.apache.rocketmq.proxy.spi.ProxyServerBase; public class MixedProxyServer extends ProxyServerBase { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/mixed/MixedProxyServerBuilder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/mixed/MixedProxyServerBuilder.java index 4a55730e5e7..80f89d1d9d9 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/mixed/MixedProxyServerBuilder.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/mixed/MixedProxyServerBuilder.java @@ -14,18 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.rocketmq.proxy.mixed; +import java.util.ArrayList; +import java.util.List; import org.apache.rocketmq.common.utils.ServiceProvider; import org.apache.rocketmq.proxy.spi.ProxyServer; import org.apache.rocketmq.proxy.spi.ProxyServerBase; import org.apache.rocketmq.proxy.spi.ProxyServerFactory; import org.apache.rocketmq.proxy.spi.ProxyServerFactoryBase; -import java.util.ArrayList; -import java.util.List; - public class MixedProxyServerBuilder extends ProxyServerFactoryBase { @Override diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServerBuilder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServerBuilder.java index 42493603316..8fc2617492d 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServerBuilder.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServerBuilder.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.rocketmq.proxy.remoting; import org.apache.rocketmq.proxy.spi.ProxyServerBase; diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServer.java index a0ab182d7f7..f2e5ef92761 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServer.java @@ -16,12 +16,11 @@ */ package org.apache.rocketmq.proxy.spi; +import java.util.List; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.utils.StartAndShutdown; import org.apache.rocketmq.proxy.processor.MessagingProcessor; -import java.util.List; - public interface ProxyServer extends StartAndShutdown { BrokerController getBrokerController(); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerBase.java b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerBase.java index d77365e90d6..c6a7f5a2d44 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerBase.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerBase.java @@ -14,15 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.rocketmq.proxy.spi; +import java.util.List; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.utils.StartAndShutdown; import org.apache.rocketmq.proxy.processor.MessagingProcessor; -import java.util.List; - public abstract class ProxyServerBase implements ProxyServer { private BrokerController brokerController; diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerFactory.java b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerFactory.java index de1042271cd..7d5f77e90f1 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerFactory.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerFactory.java @@ -14,13 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.rocketmq.proxy.spi; -import org.apache.rocketmq.acl.AccessValidator; - import java.util.List; import java.util.function.Supplier; +import org.apache.rocketmq.acl.AccessValidator; public interface ProxyServerFactory extends Supplier { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerFactoryBase.java b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerFactoryBase.java index 204f3c096b0..3b5ce78ff95 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerFactoryBase.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerFactoryBase.java @@ -14,14 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.rocketmq.proxy.spi; -import org.apache.rocketmq.acl.AccessValidator; -import org.apache.rocketmq.common.utils.StartAndShutdown; - import java.util.ArrayList; import java.util.List; +import org.apache.rocketmq.acl.AccessValidator; +import org.apache.rocketmq.common.utils.StartAndShutdown; public abstract class ProxyServerFactoryBase implements ProxyServerFactory { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerInitializer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerInitializer.java index 7f165e3c7bd..511bef21fe4 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerInitializer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerInitializer.java @@ -14,10 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.rocketmq.proxy.spi; import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerStartup; @@ -34,10 +36,6 @@ import org.apache.rocketmq.proxy.processor.DefaultMessagingProcessor; import org.apache.rocketmq.proxy.processor.MessagingProcessor; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; - public class ProxyServerInitializer { private final CommandLineArgument commandLineArgument; @@ -118,10 +116,10 @@ private void validateConfiguration() throws Exception { private void initializeThreadPoolMonitor() { ProxyConfig config = ConfigurationManager.getProxyConfig(); ThreadPoolMonitor.config( - LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME), - LoggerFactory.getLogger(LoggerName.PROXY_WATER_MARK_LOGGER_NAME), - config.isEnablePrintJstack(), config.getPrintJstackInMillis(), - config.getPrintThreadPoolStatusInMillis()); + LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME), + LoggerFactory.getLogger(LoggerName.PROXY_WATER_MARK_LOGGER_NAME), + config.isEnablePrintJstack(), config.getPrintJstackInMillis(), + config.getPrintThreadPoolStatusInMillis()); ThreadPoolMonitor.init(); } diff --git a/proxy/src/main/resources/META-INF/services/org.apache.rocketmq.proxy.spi.ProxyServerFactory b/proxy/src/main/resources/META-INF/services/org.apache.rocketmq.proxy.spi.ProxyServerFactory index 0b484543214..1db063b17b6 100644 --- a/proxy/src/main/resources/META-INF/services/org.apache.rocketmq.proxy.spi.ProxyServerFactory +++ b/proxy/src/main/resources/META-INF/services/org.apache.rocketmq.proxy.spi.ProxyServerFactory @@ -1,3 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# org.apache.rocketmq.proxy.grpc.GrpcServerBuilder org.apache.rocketmq.proxy.remoting.RemotingProtocolServerBuilder org.apache.rocketmq.proxy.mixed.MixedProxyServerBuilder diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/ProxyStartupTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/ProxyStartupTest.java index 6e3982ea835..f78de87dec9 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/ProxyStartupTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/ProxyStartupTest.java @@ -19,6 +19,16 @@ import com.google.common.base.Preconditions; import com.google.common.base.Splitter; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Iterator; +import java.util.UUID; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.proxy.config.Configuration; import org.junit.After; @@ -28,13 +38,6 @@ import org.springframework.core.io.Resource; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; -import java.io.*; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Iterator; -import java.util.UUID; - import static org.apache.rocketmq.proxy.config.ConfigurationManager.RMQ_PROXY_HOME; import static org.junit.Assert.assertEquals;