Skip to content

Commit

Permalink
more config caching
Browse files Browse the repository at this point in the history
  • Loading branch information
dlg99 committed Apr 12, 2024
1 parent 21b91db commit 0d20c1e
Showing 1 changed file with 70 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
Expand All @@ -57,10 +58,19 @@
@Slf4j
public class BrokerTracing implements BrokerInterceptor {

public enum EventReasons {
ADMINISTRATIVE,
MESSAGE,
TRANSACTION,
SERVLET,
}

private static final TraceLevel defaultTraceLevel = TraceLevel.BASIC;

private static final LoadingCache<PulsarService, Set<EventReasons>> jmsTracingEventList =
CacheBuilder.newBuilder()
.maximumSize(10)
.expireAfterWrite(1, java.util.concurrent.TimeUnit.MINUTES)
.expireAfterWrite(1, TimeUnit.MINUTES)
.build(
new CacheLoader<PulsarService, Set<EventReasons>>() {
public Set<EventReasons> load(PulsarService pulsar) {
Expand Down Expand Up @@ -89,7 +99,7 @@ public Set<EventReasons> load(PulsarService pulsar) {
private static final LoadingCache<PulsarService, TraceLevel> traceLevelForService =
CacheBuilder.newBuilder()
.maximumSize(10)
.expireAfterWrite(1, java.util.concurrent.TimeUnit.MINUTES)
.expireAfterWrite(1, TimeUnit.MINUTES)
.build(
new CacheLoader<PulsarService, TraceLevel>() {
public TraceLevel load(PulsarService pulsar) {
Expand All @@ -108,14 +118,50 @@ public TraceLevel load(PulsarService pulsar) {
}
});

public enum EventReasons {
ADMINISTRATIVE,
MESSAGE,
TRANSACTION,
SERVLET,
}
private static final LoadingCache<Subscription, TraceLevel> traceLevelForSubscription =
CacheBuilder.newBuilder()
.expireAfterWrite(10, TimeUnit.SECONDS)
.build(
new CacheLoader<Subscription, TraceLevel>() {
public TraceLevel load(Subscription sub) {
if (sub.getSubscriptionProperties() == null
|| !sub.getSubscriptionProperties().containsKey("trace")) {
return TraceLevel.NONE;
}

private static final TraceLevel defaultTraceLevel = TraceLevel.BASIC;
try {
return TraceLevel.valueOf(sub.getSubscriptionProperties().get("trace"));
} catch (IllegalArgumentException e) {
log.warn(
"Invalid tracing level: {}. Setting to NONE for subscription {}",
sub.getSubscriptionProperties().get("trace"),
sub);
return TraceLevel.NONE;
}
}
});

private static final LoadingCache<Producer, TraceLevel> traceLevelForProducer =
CacheBuilder.newBuilder()
.expireAfterWrite(10, TimeUnit.SECONDS)
.build(
new CacheLoader<Producer, TraceLevel>() {
public TraceLevel load(Producer producer) {
if (producer.getMetadata() == null
|| !producer.getMetadata().containsKey("trace")) {
return TraceLevel.NONE;
}
try {
return TraceLevel.valueOf(producer.getMetadata().get("trace"));
} catch (IllegalArgumentException e) {
log.warn(
"Invalid tracing level: {}. Setting to NONE for producer {}",
producer.getMetadata().get("trace"),
producer);
return TraceLevel.NONE;
}
}
});

public void initialize(PulsarService pulsarService) {
log.info("Initializing BrokerTracing");
Expand All @@ -127,10 +173,14 @@ public void close() {
}

private Set<EventReasons> getEnabledEvents(ServerCnx cnx) {
if (cnx == null) return new HashSet<>();

return getEnabledEvents(cnx.getBrokerService().getPulsar());
}

private Set<EventReasons> getEnabledEvents(PulsarService pulsar) {
if (pulsar == null) return new HashSet<>();

try {
return jmsTracingEventList.get(pulsar);
} catch (ExecutionException e) {
Expand All @@ -140,6 +190,7 @@ private Set<EventReasons> getEnabledEvents(PulsarService pulsar) {
}

private static TraceLevel getTracingLevel(ServerCnx cnx) {
if (cnx == null) return defaultTraceLevel;

try {
return traceLevelForService.get(cnx.getBrokerService().getPulsar());
Expand All @@ -150,35 +201,23 @@ private static TraceLevel getTracingLevel(ServerCnx cnx) {
}

private static TraceLevel getTracingLevel(Subscription sub) {
if (sub == null
|| sub.getSubscriptionProperties() == null
|| !sub.getSubscriptionProperties().containsKey("trace")) {
return TraceLevel.NONE;
}
if (sub == null) return TraceLevel.NONE;

try {
return TraceLevel.valueOf(sub.getSubscriptionProperties().get("trace"));
} catch (IllegalArgumentException e) {
log.warn(
"Invalid tracing level: {}. Setting to NONE for subscription {}",
sub.getSubscriptionProperties().get("trace"),
sub);
return traceLevelForSubscription.get(sub);
} catch (ExecutionException e) {
log.error("Error getting tracing level", e);
return TraceLevel.NONE;
}
}

private static TraceLevel getTracingLevel(Producer producer) {
if (producer == null
|| producer.getMetadata() == null
|| !producer.getMetadata().containsKey("trace")) {
return TraceLevel.NONE;
}
if (producer == null) return TraceLevel.NONE;

try {
return TraceLevel.valueOf(producer.getMetadata().get("trace"));
} catch (IllegalArgumentException e) {
log.warn(
"Invalid tracing level: {}. Setting to NONE for producer {}",
producer.getMetadata().get("trace"),
producer);
return traceLevelForProducer.get(producer);
} catch (ExecutionException e) {
log.error("Error getting tracing level", e);
return TraceLevel.NONE;
}
}
Expand Down

0 comments on commit 0d20c1e

Please sign in to comment.