From 0d20c1e409620b7f8daf4932a147697bf77c00a7 Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Fri, 12 Apr 2024 16:42:36 -0700 Subject: [PATCH] more config caching --- .../oss/pulsar/jms/tracing/BrokerTracing.java | 101 ++++++++++++------ 1 file changed, 70 insertions(+), 31 deletions(-) diff --git a/pulsar-jms-tracing/src/main/java/com/datastax/oss/pulsar/jms/tracing/BrokerTracing.java b/pulsar-jms-tracing/src/main/java/com/datastax/oss/pulsar/jms/tracing/BrokerTracing.java index d3b7f248..3e0b91b8 100644 --- a/pulsar-jms-tracing/src/main/java/com/datastax/oss/pulsar/jms/tracing/BrokerTracing.java +++ b/pulsar-jms-tracing/src/main/java/com/datastax/oss/pulsar/jms/tracing/BrokerTracing.java @@ -36,6 +36,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.servlet.ServletException; import javax.servlet.ServletRequest; @@ -57,10 +58,19 @@ @Slf4j public class BrokerTracing implements BrokerInterceptor { + public enum EventReasons { + ADMINISTRATIVE, + MESSAGE, + TRANSACTION, + SERVLET, + } + + private static final TraceLevel defaultTraceLevel = TraceLevel.BASIC; + private static final LoadingCache> jmsTracingEventList = CacheBuilder.newBuilder() .maximumSize(10) - .expireAfterWrite(1, java.util.concurrent.TimeUnit.MINUTES) + .expireAfterWrite(1, TimeUnit.MINUTES) .build( new CacheLoader>() { public Set load(PulsarService pulsar) { @@ -89,7 +99,7 @@ public Set load(PulsarService pulsar) { private static final LoadingCache traceLevelForService = CacheBuilder.newBuilder() .maximumSize(10) - .expireAfterWrite(1, java.util.concurrent.TimeUnit.MINUTES) + .expireAfterWrite(1, TimeUnit.MINUTES) .build( new CacheLoader() { public TraceLevel load(PulsarService pulsar) { @@ -108,14 +118,50 @@ public TraceLevel load(PulsarService pulsar) { } }); - public enum EventReasons { - ADMINISTRATIVE, - MESSAGE, - TRANSACTION, - SERVLET, - } + private static final LoadingCache traceLevelForSubscription = + CacheBuilder.newBuilder() + .expireAfterWrite(10, TimeUnit.SECONDS) + .build( + new CacheLoader() { + public TraceLevel load(Subscription sub) { + if (sub.getSubscriptionProperties() == null + || !sub.getSubscriptionProperties().containsKey("trace")) { + return TraceLevel.NONE; + } - private static final TraceLevel defaultTraceLevel = TraceLevel.BASIC; + try { + return TraceLevel.valueOf(sub.getSubscriptionProperties().get("trace")); + } catch (IllegalArgumentException e) { + log.warn( + "Invalid tracing level: {}. Setting to NONE for subscription {}", + sub.getSubscriptionProperties().get("trace"), + sub); + return TraceLevel.NONE; + } + } + }); + + private static final LoadingCache traceLevelForProducer = + CacheBuilder.newBuilder() + .expireAfterWrite(10, TimeUnit.SECONDS) + .build( + new CacheLoader() { + public TraceLevel load(Producer producer) { + if (producer.getMetadata() == null + || !producer.getMetadata().containsKey("trace")) { + return TraceLevel.NONE; + } + try { + return TraceLevel.valueOf(producer.getMetadata().get("trace")); + } catch (IllegalArgumentException e) { + log.warn( + "Invalid tracing level: {}. Setting to NONE for producer {}", + producer.getMetadata().get("trace"), + producer); + return TraceLevel.NONE; + } + } + }); public void initialize(PulsarService pulsarService) { log.info("Initializing BrokerTracing"); @@ -127,10 +173,14 @@ public void close() { } private Set getEnabledEvents(ServerCnx cnx) { + if (cnx == null) return new HashSet<>(); + return getEnabledEvents(cnx.getBrokerService().getPulsar()); } private Set getEnabledEvents(PulsarService pulsar) { + if (pulsar == null) return new HashSet<>(); + try { return jmsTracingEventList.get(pulsar); } catch (ExecutionException e) { @@ -140,6 +190,7 @@ private Set getEnabledEvents(PulsarService pulsar) { } private static TraceLevel getTracingLevel(ServerCnx cnx) { + if (cnx == null) return defaultTraceLevel; try { return traceLevelForService.get(cnx.getBrokerService().getPulsar()); @@ -150,35 +201,23 @@ private static TraceLevel getTracingLevel(ServerCnx cnx) { } private static TraceLevel getTracingLevel(Subscription sub) { - if (sub == null - || sub.getSubscriptionProperties() == null - || !sub.getSubscriptionProperties().containsKey("trace")) { - return TraceLevel.NONE; - } + if (sub == null) return TraceLevel.NONE; + try { - return TraceLevel.valueOf(sub.getSubscriptionProperties().get("trace")); - } catch (IllegalArgumentException e) { - log.warn( - "Invalid tracing level: {}. Setting to NONE for subscription {}", - sub.getSubscriptionProperties().get("trace"), - sub); + return traceLevelForSubscription.get(sub); + } catch (ExecutionException e) { + log.error("Error getting tracing level", e); return TraceLevel.NONE; } } private static TraceLevel getTracingLevel(Producer producer) { - if (producer == null - || producer.getMetadata() == null - || !producer.getMetadata().containsKey("trace")) { - return TraceLevel.NONE; - } + if (producer == null) return TraceLevel.NONE; + try { - return TraceLevel.valueOf(producer.getMetadata().get("trace")); - } catch (IllegalArgumentException e) { - log.warn( - "Invalid tracing level: {}. Setting to NONE for producer {}", - producer.getMetadata().get("trace"), - producer); + return traceLevelForProducer.get(producer); + } catch (ExecutionException e) { + log.error("Error getting tracing level", e); return TraceLevel.NONE; } }