Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#529 EventBusAppender instance pollution in Java Heap Space #530

Merged
merged 1 commit into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.swisspush.gateleen.core.util.*;
import org.swisspush.gateleen.hook.queueingstrategy.*;
import org.swisspush.gateleen.hook.reducedpropagation.ReducedPropagationManager;
import org.swisspush.gateleen.logging.LogAppenderRepository;
import org.swisspush.gateleen.logging.LoggingResourceManager;
import org.swisspush.gateleen.monitoring.MonitoringHandler;
import org.swisspush.gateleen.queue.expiry.ExpiryCheckHandler;
Expand Down Expand Up @@ -107,6 +108,7 @@
private final ResourceStorage hookStorage;
private final MonitoringHandler monitoringHandler;
private final LoggingResourceManager loggingResourceManager;
private final LogAppenderRepository logAppenderRepository;
private final HttpClient selfClient;
private final String userProfilePath;
private final String hookRootUri;
Expand Down Expand Up @@ -137,9 +139,9 @@
* @param hookRootUri hookRootUri
*/
public HookHandler(Vertx vertx, HttpClient selfClient, final ResourceStorage storage,
LoggingResourceManager loggingResourceManager, MonitoringHandler monitoringHandler,
LoggingResourceManager loggingResourceManager, LogAppenderRepository logAppenderRepository, MonitoringHandler monitoringHandler,
String userProfilePath, String hookRootUri) {
this(vertx, selfClient, storage, loggingResourceManager, monitoringHandler, userProfilePath, hookRootUri,
this(vertx, selfClient, storage, loggingResourceManager, logAppenderRepository, monitoringHandler, userProfilePath, hookRootUri,

Check warning on line 144 in gateleen-hook/src/main/java/org/swisspush/gateleen/hook/HookHandler.java

View check run for this annotation

Codecov / codecov/patch

gateleen-hook/src/main/java/org/swisspush/gateleen/hook/HookHandler.java#L144

Added line #L144 was not covered by tests
new QueueClient(vertx, monitoringHandler));
}

Expand All @@ -157,16 +159,16 @@
* @param requestQueue requestQueue
*/
public HookHandler(Vertx vertx, HttpClient selfClient, final ResourceStorage storage,
LoggingResourceManager loggingResourceManager, MonitoringHandler monitoringHandler,
LoggingResourceManager loggingResourceManager, LogAppenderRepository logAppenderRepository, MonitoringHandler monitoringHandler,
String userProfilePath, String hookRootUri, RequestQueue requestQueue) {
this(vertx, selfClient, storage, loggingResourceManager, monitoringHandler, userProfilePath, hookRootUri,
this(vertx, selfClient, storage, loggingResourceManager, logAppenderRepository, monitoringHandler, userProfilePath, hookRootUri,

Check warning on line 164 in gateleen-hook/src/main/java/org/swisspush/gateleen/hook/HookHandler.java

View check run for this annotation

Codecov / codecov/patch

gateleen-hook/src/main/java/org/swisspush/gateleen/hook/HookHandler.java#L164

Added line #L164 was not covered by tests
requestQueue, false);
}

public HookHandler(Vertx vertx, HttpClient selfClient, final ResourceStorage storage,
LoggingResourceManager loggingResourceManager, MonitoringHandler monitoringHandler,
LoggingResourceManager loggingResourceManager, LogAppenderRepository logAppenderRepository, MonitoringHandler monitoringHandler,
String userProfilePath, String hookRootUri, RequestQueue requestQueue, boolean listableRoutes) {
this(vertx, selfClient, storage, loggingResourceManager, monitoringHandler, userProfilePath, hookRootUri,
this(vertx, selfClient, storage, loggingResourceManager, logAppenderRepository, monitoringHandler, userProfilePath, hookRootUri,

Check warning on line 171 in gateleen-hook/src/main/java/org/swisspush/gateleen/hook/HookHandler.java

View check run for this annotation

Codecov / codecov/patch

gateleen-hook/src/main/java/org/swisspush/gateleen/hook/HookHandler.java#L171

Added line #L171 was not covered by tests
requestQueue, false, null);
}

Expand All @@ -185,18 +187,18 @@
* @param reducedPropagationManager reducedPropagationManager
*/
public HookHandler(Vertx vertx, HttpClient selfClient, final ResourceStorage storage,
LoggingResourceManager loggingResourceManager, MonitoringHandler monitoringHandler,
LoggingResourceManager loggingResourceManager, LogAppenderRepository logAppenderRepository, MonitoringHandler monitoringHandler,
String userProfilePath, String hookRootUri, RequestQueue requestQueue, boolean listableRoutes,
ReducedPropagationManager reducedPropagationManager) {
this(vertx, selfClient, storage, loggingResourceManager, monitoringHandler, userProfilePath, hookRootUri,
this(vertx, selfClient, storage, loggingResourceManager, logAppenderRepository, monitoringHandler, userProfilePath, hookRootUri,
requestQueue, listableRoutes, reducedPropagationManager, null, storage);
}

public HookHandler(Vertx vertx, HttpClient selfClient, final ResourceStorage storage,
LoggingResourceManager loggingResourceManager, MonitoringHandler monitoringHandler,
LoggingResourceManager loggingResourceManager, LogAppenderRepository logAppenderRepository, MonitoringHandler monitoringHandler,
String userProfilePath, String hookRootUri, RequestQueue requestQueue, boolean listableRoutes,
ReducedPropagationManager reducedPropagationManager, Handler doneHandler, ResourceStorage hookStorage) {
this(vertx, selfClient, storage, loggingResourceManager, monitoringHandler, userProfilePath, hookRootUri,
this(vertx, selfClient, storage, loggingResourceManager, logAppenderRepository, monitoringHandler, userProfilePath, hookRootUri,
requestQueue, listableRoutes, reducedPropagationManager, doneHandler, hookStorage, Router.DEFAULT_ROUTER_MULTIPLIER);
}

Expand All @@ -220,7 +222,7 @@
* the number of {@link Router} instances within a cluster
*/
public HookHandler(Vertx vertx, HttpClient selfClient, final ResourceStorage userProfileStorage,
LoggingResourceManager loggingResourceManager, MonitoringHandler monitoringHandler,
LoggingResourceManager loggingResourceManager, LogAppenderRepository logAppenderRepository, MonitoringHandler monitoringHandler,
String userProfilePath, String hookRootUri, RequestQueue requestQueue, boolean listableRoutes,
ReducedPropagationManager reducedPropagationManager, Handler doneHandler, ResourceStorage hookStorage,
int routeMultiplier) {
Expand All @@ -229,6 +231,7 @@
this.selfClient = selfClient;
this.userProfileStorage = userProfileStorage;
this.loggingResourceManager = loggingResourceManager;
this.logAppenderRepository = logAppenderRepository;
this.monitoringHandler = monitoringHandler;
this.userProfilePath = userProfilePath;
this.hookRootUri = hookRootUri;
Expand Down Expand Up @@ -1618,7 +1621,8 @@
* @return Route
*/
private Route createRoute(String urlPattern, HttpHook hook) {
return new Route(vertx, userProfileStorage, loggingResourceManager, monitoringHandler, userProfilePath, hook, urlPattern, selfClient);
return new Route(vertx, userProfileStorage, loggingResourceManager, logAppenderRepository, monitoringHandler,
userProfilePath, hook, urlPattern, selfClient);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.gateleen.core.storage.ResourceStorage;
import org.swisspush.gateleen.logging.LogAppenderRepository;
import org.swisspush.gateleen.logging.LoggingResourceManager;
import org.swisspush.gateleen.monitoring.MonitoringHandler;
import org.swisspush.gateleen.routing.Forwarder;
Expand Down Expand Up @@ -44,6 +45,7 @@ public class Route {

private Vertx vertx;
private LoggingResourceManager loggingResourceManager;
private LogAppenderRepository logAppenderRepository;
private MonitoringHandler monitoringHandler;
private String userProfilePath;
private ResourceStorage storage;
Expand Down Expand Up @@ -76,10 +78,12 @@ public class Route {
* @param httpHook httpHook
* @param urlPattern - this can be a listener or a normal urlPattern (eg. for a route)
*/
public Route(Vertx vertx, ResourceStorage storage, LoggingResourceManager loggingResourceManager, MonitoringHandler monitoringHandler, String userProfilePath, HttpHook httpHook, String urlPattern, HttpClient selfClient) {
public Route(Vertx vertx, ResourceStorage storage, LoggingResourceManager loggingResourceManager, LogAppenderRepository logAppenderRepository,
MonitoringHandler monitoringHandler, String userProfilePath, HttpHook httpHook, String urlPattern, HttpClient selfClient) {
this.vertx = vertx;
this.storage = storage;
this.loggingResourceManager = loggingResourceManager;
this.logAppenderRepository = logAppenderRepository;
this.monitoringHandler = monitoringHandler;
this.userProfilePath = userProfilePath;
this.httpHook = httpHook;
Expand All @@ -97,7 +101,8 @@ public Route(Vertx vertx, ResourceStorage storage, LoggingResourceManager loggin
* Creates the forwarder for this hook.
*/
private void createForwarder() {
forwarder = new Forwarder(vertx, client, rule, storage, loggingResourceManager, monitoringHandler, userProfilePath, null);
forwarder = new Forwarder(vertx, client, rule, storage, loggingResourceManager, logAppenderRepository,
monitoringHandler, userProfilePath, null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.swisspush.gateleen.core.http.*;
import org.swisspush.gateleen.core.storage.MockResourceStorage;
import org.swisspush.gateleen.hook.reducedpropagation.ReducedPropagationManager;
import org.swisspush.gateleen.logging.LogAppenderRepository;
import org.swisspush.gateleen.logging.LoggingResourceManager;
import org.swisspush.gateleen.monitoring.MonitoringHandler;
import org.swisspush.gateleen.queue.expiry.ExpiryCheckHandler;
Expand Down Expand Up @@ -49,6 +50,7 @@ public class HookHandlerTest {
private HttpClient httpClient;
private MockResourceStorage storage;
private LoggingResourceManager loggingResourceManager;
private LogAppenderRepository logAppenderRepository;
private MonitoringHandler monitoringHandler;
private RequestQueue requestQueue;
private ReducedPropagationManager reducedPropagationManager;
Expand All @@ -66,12 +68,13 @@ public void setUp() {
Mockito.when(httpClient.request(any(HttpMethod.class), anyString())).thenReturn(Mockito.mock(Future.class));
storage = new MockResourceStorage();
loggingResourceManager = Mockito.mock(LoggingResourceManager.class);
logAppenderRepository = Mockito.mock(LogAppenderRepository.class);
monitoringHandler = Mockito.mock(MonitoringHandler.class);
requestQueue = Mockito.mock(RequestQueue.class);
reducedPropagationManager = Mockito.mock(ReducedPropagationManager.class);


hookHandler = new HookHandler(vertx, httpClient, storage, loggingResourceManager, monitoringHandler,
hookHandler = new HookHandler(vertx, httpClient, storage, loggingResourceManager, logAppenderRepository, monitoringHandler,
"userProfilePath", HOOK_ROOT_URI, requestQueue, false, reducedPropagationManager);
hookHandler.init();
}
Expand Down Expand Up @@ -218,7 +221,7 @@ public boolean matches(Object argument) {

@Test
public void testListenerEnqueueWithReducedPropagationQueueingStrategyButNoManager(TestContext context) throws InterruptedException {
hookHandler = new HookHandler(vertx, httpClient, storage, loggingResourceManager, monitoringHandler,
hookHandler = new HookHandler(vertx, httpClient, storage, loggingResourceManager, logAppenderRepository, monitoringHandler,
"userProfilePath", HOOK_ROOT_URI, requestQueue, false, null);
hookHandler.init();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.swisspush.gateleen.logging;

import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import org.apache.logging.log4j.core.Appender;

import java.util.HashMap;
import java.util.Map;

import static org.swisspush.gateleen.logging.LoggingResourceManager.UPDATE_ADDRESS;

/**
* Default implementation of the {@link LogAppenderRepository} caching the {@link Appender} instances in a {@link Map}
*
* @author https://github.com/mcweba [Marc-Andre Weber]
*/
public class DefaultLogAppenderRepository implements LogAppenderRepository {

private Map<String, Appender> appenderMap = new HashMap<>();

public DefaultLogAppenderRepository(Vertx vertx) {
vertx.eventBus().consumer(UPDATE_ADDRESS, (Handler<Message<Boolean>>) event -> clearRepository());
}

@Override
public boolean hasAppender(String name) {
return appenderMap.containsKey(name);
}

@Override
public void addAppender(String name, Appender appender) {
appenderMap.put(name, appender);
}

@Override
public Appender getAppender(String name) {
return appenderMap.get(name);
}

@Override
public void clearRepository() {
appenderMap.clear();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.swisspush.gateleen.logging;

import org.apache.logging.log4j.core.Appender;

/**
* A repository holding {@link Appender} instances. The repository allows to reuse an appender
* instead of creating a new one for every log statement
*
* @author https://github.com/mcweba [Marc-Andre Weber]
*/
public interface LogAppenderRepository {

boolean hasAppender(String name);

void addAppender(String name, Appender appender);

Appender getAppender(String name);

void clearRepository();
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
private Buffer responsePayload;
private LoggingResource loggingResource;
private EventBus eventBus;
private LogAppenderRepository logAppenderRepository;

private String currentDestination;

Expand All @@ -64,11 +65,11 @@
private static final String DEFAULT = "default";

private Map<String, org.apache.logging.log4j.Logger> loggers = new HashMap<>();
private Map<String, Appender> appenders = new HashMap<>();

private Logger log;

public LoggingHandler(LoggingResourceManager loggingResourceManager, HttpServerRequest request, EventBus eventBus) {
public LoggingHandler(LoggingResourceManager loggingResourceManager, LogAppenderRepository logAppenderRepository, HttpServerRequest request, EventBus eventBus) {
this.logAppenderRepository = logAppenderRepository;
this.request = request;
this.eventBus = eventBus;
this.loggingResource = loggingResourceManager.getLoggingResource();
Expand Down Expand Up @@ -187,7 +188,7 @@
* @return
*/
private Appender getEventBusAppender(String filterDestination, Map<String, String> destinationOptions) {
if (!appenders.containsKey(filterDestination)) {
if (!logAppenderRepository.hasAppender(filterDestination)) {

/*
* <appender name="requestLogEventBusAppender" class="EventBusAppender">
Expand All @@ -204,9 +205,9 @@
.add(META_DATA, destinationOptions.get(META_DATA)))
.setTransmissionMode(EventBusWriter.TransmissionMode.fromString(destinationOptions.get(TRANSMISSION)))
.setLayout(PatternLayout.createDefaultLayout()).build();
appenders.put(filterDestination, appender);
logAppenderRepository.addAppender(filterDestination, appender);
}
return appenders.get(filterDestination);
return logAppenderRepository.getAppender(filterDestination);
}

/**
Expand All @@ -220,7 +221,7 @@
* @return
*/
private Appender getFileAppender(String filterDestination, String fileName) {
if (!appenders.containsKey(filterDestination)) {
if (!logAppenderRepository.hasAppender(filterDestination)) {

/*
* <appender name="requestLogFileAppender" class="org.apache.log4j.DailyRollingFileAppender">
Expand All @@ -242,10 +243,10 @@
builder.withAppend(true);
PatternLayout layout = PatternLayout.createDefaultLayout();
builder.setLayout(layout);
appenders.put(filterDestination, builder.build());
logAppenderRepository.addAppender(filterDestination, builder.build());

Check warning on line 246 in gateleen-logging/src/main/java/org/swisspush/gateleen/logging/LoggingHandler.java

View check run for this annotation

Codecov / codecov/patch

gateleen-logging/src/main/java/org/swisspush/gateleen/logging/LoggingHandler.java#L246

Added line #L246 was not covered by tests
}

return appenders.get(filterDestination);
return logAppenderRepository.getAppender(filterDestination);

Check warning on line 249 in gateleen-logging/src/main/java/org/swisspush/gateleen/logging/LoggingHandler.java

View check run for this annotation

Codecov / codecov/patch

gateleen-logging/src/main/java/org/swisspush/gateleen/logging/LoggingHandler.java#L249

Added line #L249 was not covered by tests
}

public void setResponse(HttpClientResponse response) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
*/
public class LoggingResourceManager implements LoggableResource {

private static final String UPDATE_ADDRESS = "gateleen.logging-updated";
static final String UPDATE_ADDRESS = "gateleen.logging-updated";

private final String loggingUri;
private final ResourceStorage storage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
public class RequestLoggingConsumer {
private final Vertx vertx;
private final LoggingResourceManager loggingResourceManager;
private final LogAppenderRepository logAppenderRepository;

public RequestLoggingConsumer(Vertx vertx, LoggingResourceManager loggingResourceManager) {
public RequestLoggingConsumer(Vertx vertx, LoggingResourceManager loggingResourceManager, LogAppenderRepository logAppenderRepository) {

Check warning on line 29 in gateleen-logging/src/main/java/org/swisspush/gateleen/logging/RequestLoggingConsumer.java

View check run for this annotation

Codecov / codecov/patch

gateleen-logging/src/main/java/org/swisspush/gateleen/logging/RequestLoggingConsumer.java#L29

Added line #L29 was not covered by tests
this.vertx = vertx;
this.loggingResourceManager = loggingResourceManager;
this.logAppenderRepository = logAppenderRepository;

Check warning on line 32 in gateleen-logging/src/main/java/org/swisspush/gateleen/logging/RequestLoggingConsumer.java

View check run for this annotation

Codecov / codecov/patch

gateleen-logging/src/main/java/org/swisspush/gateleen/logging/RequestLoggingConsumer.java#L32

Added line #L32 was not covered by tests

vertx.eventBus().localConsumer(Address.requestLoggingConsumerAddress(), (Handler<Message<JsonObject>>) event -> {
try {
Expand Down Expand Up @@ -66,7 +68,7 @@
* @param responseHeaders the response headers
*/
private void logRequest(final HttpServerRequest request, final int status, Buffer data, final MultiMap responseHeaders) {
final LoggingHandler loggingHandler = new LoggingHandler(loggingResourceManager, request, vertx.eventBus());
final LoggingHandler loggingHandler = new LoggingHandler(loggingResourceManager, logAppenderRepository, request, vertx.eventBus());

Check warning on line 71 in gateleen-logging/src/main/java/org/swisspush/gateleen/logging/RequestLoggingConsumer.java

View check run for this annotation

Codecov / codecov/patch

gateleen-logging/src/main/java/org/swisspush/gateleen/logging/RequestLoggingConsumer.java#L71

Added line #L71 was not covered by tests
if (HttpMethod.PUT == request.method() || HttpMethod.POST == request.method()) {
loggingHandler.appendRequestPayload(data);
} else if (HttpMethod.GET == request.method()) {
Expand Down
Loading