Skip to content

Commit

Permalink
Filters: register metrics on Prometheus (#129)
Browse files Browse the repository at this point in the history
Add metrics to track the processing time:
pulsar_jmsfilter_processingtime_ondispatch
pulsar_jmsfilter_preprocessing_time_onpublish
pulsar_jmsfilter_processing_time_onpublish
  • Loading branch information
eolivelli authored Apr 16, 2024
1 parent 1bd6606 commit b9f5bde
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
package com.datastax.oss.pulsar.jms.selectors;

import io.netty.buffer.ByteBuf;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Histogram;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import javax.jms.Destination;
import javax.jms.JMSException;
Expand All @@ -48,10 +51,40 @@
public class JMSFilter implements EntryFilter {

private final ConcurrentHashMap<String, SelectorSupport> selectors = new ConcurrentHashMap<>();
private static final Histogram filterProcessingTime =
Histogram.build()
.name("pulsar_jmsfilter_processingtime_ondispatch")
.help(
"Time taken to compute filters on the broker while dispatching messages to consumers")
.labelNames("topic", "subscription")
.create();

private static final AtomicBoolean metricRegistered = new AtomicBoolean(false);

JMSFilter(boolean registerMetrics) {
if (registerMetrics) {
if (metricRegistered.compareAndSet(false, true)) {
log.info("Registering JMSFilter metrics");
CollectorRegistry.defaultRegistry.register(filterProcessingTime);
}
}
}

public JMSFilter() {
// Pulsar 3.x created multiple instances of the filter, one per Dispatcher
this(true);
}

@Override
public FilterResult filterEntry(Entry entry, FilterContext context) {
return filterEntry(entry, context, false);
long start = System.nanoTime();
try {
return filterEntry(entry, context, false);
} finally {
filterProcessingTime
.labels(context.getSubscription().getTopicName(), context.getSubscription().getName())
.observe(System.nanoTime() - start);
}
}

public FilterResult filterEntry(Entry entry, FilterContext context, boolean onMessagePublish) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package com.datastax.oss.pulsar.jms.selectors;

import io.netty.buffer.ByteBuf;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Histogram;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Collections;
Expand Down Expand Up @@ -49,7 +51,23 @@
@Slf4j
public class JMSPublishFilters implements BrokerInterceptor {
private static final String JMS_FILTER_METADATA = "jms-msg-metadata";
private final JMSFilter filter = new JMSFilter();

private static final Histogram filterProcessingTimeOnPublish =
Histogram.build()
.name("pulsar_jmsfilter_preprocessing_time_onpublish")
.help(
"Time taken to pre-process the message on the broker while accepting messages from producers before applying filters")
.labelNames("topic")
.create();
private static final Histogram filterProcessingTimeOnProduce =
Histogram.build()
.name("pulsar_jmsfilter_processing_time_onpublish")
.help(
"Time taken to process the message on the broker while accepting messages from producers and applying filters")
.labelNames("topic", "subscription")
.create();

private final JMSFilter filter = new JMSFilter(false);
private boolean enabled = false;

private static final Field dispatchMessagesThreadFieldPersistentDispatcherMultipleConsumers;
Expand Down Expand Up @@ -85,6 +103,15 @@ public void initialize(PulsarService pulsarService) {
.getProperties()
.getProperty("jmsApplyFiltersOnPublish", "true"));
log.info("jmsApplyFiltersOnPublish={}", enabled);

try {
log.info("Registering JMSFilter metrics");
CollectorRegistry.defaultRegistry.register(filterProcessingTimeOnPublish);
CollectorRegistry.defaultRegistry.register(filterProcessingTimeOnProduce);
} catch (IllegalArgumentException alreadyRegistered) {
// ignore
log.info("Filter metrics already registered", alreadyRegistered);
}
}

@Override
Expand All @@ -98,28 +125,33 @@ public void onMessagePublish(
|| publishContext.getNumberOfMessages() > 1) {
return;
}
long now = System.nanoTime();
try {
for (Subscription subscription : producer.getTopic().getSubscriptions().values()) {
if (!(subscription instanceof PersistentSubscription)) {
continue;
}
Map<String, String> subscriptionProperties = subscription.getSubscriptionProperties();
if (!subscriptionProperties.containsKey("jms.selector")) {
continue;
}

for (Subscription subscription : producer.getTopic().getSubscriptions().values()) {
if (!(subscription instanceof PersistentSubscription)) {
continue;
}
Map<String, String> subscriptionProperties = subscription.getSubscriptionProperties();
if (!subscriptionProperties.containsKey("jms.selector")) {
continue;
}

// we must make a copy because the ByteBuf will be released
MessageMetadata messageMetadata =
new MessageMetadata()
.copyFrom(
Commands.peekMessageMetadata(headersAndPayload, "jms-filter-on-publish", -1));
// we must make a copy because the ByteBuf will be released
MessageMetadata messageMetadata =
new MessageMetadata()
.copyFrom(
Commands.peekMessageMetadata(headersAndPayload, "jms-filter-on-publish", -1));

publishContext.setProperty(JMS_FILTER_METADATA, messageMetadata);
// as soon as we find a good reason to apply the filters in messageProduced
// we can exit
return;
publishContext.setProperty(JMS_FILTER_METADATA, messageMetadata);
// as soon as we find a good reason to apply the filters in messageProduced
// we can exit
return;
}
} finally {
filterProcessingTimeOnPublish
.labels(producer.getTopic().getName())
.observe(System.nanoTime() - now);
}
;
}

@Override
Expand All @@ -141,31 +173,40 @@ public void messageProduced(
if (messageMetadata.hasNumMessagesInBatch()) {
return;
}

for (Subscription subscription : producer.getTopic().getSubscriptions().values()) {
scheduleOnDispatchThread(
subscription,
() -> {
FilterContext filterContext = new FilterContext();
filterContext.setSubscription(subscription);
filterContext.setMsgMetadata(messageMetadata);
filterContext.setConsumer(null);
Entry entry = null; // we would need the Entry only in case of batch messages
EntryFilter.FilterResult filterResult = filter.filterEntry(entry, filterContext, true);
if (filterResult == EntryFilter.FilterResult.REJECT) {
if (log.isDebugEnabled()) {
log.debug(
"Reject message {}:{} for subscription {}",
ledgerId,
entryId,
subscription.getName());
long now = System.nanoTime();
try {
FilterContext filterContext = new FilterContext();
filterContext.setSubscription(subscription);
filterContext.setMsgMetadata(messageMetadata);
filterContext.setConsumer(null);
Entry entry = null; // we would need the Entry only in case of batch messages
EntryFilter.FilterResult filterResult =
filter.filterEntry(entry, filterContext, true);
if (filterResult == EntryFilter.FilterResult.REJECT) {
if (log.isDebugEnabled()) {
log.debug(
"Reject message {}:{} for subscription {}",
ledgerId,
entryId,
subscription.getName());
}
// ir is possible that calling this method in this thread may affect
// performance
// let's keep it simple for now, we can optimize it later
subscription.acknowledgeMessage(
Collections.singletonList(new PositionImpl(ledgerId, entryId)),
CommandAck.AckType.Individual,
null);
}
// ir is possible that calling this method in this thread may affect
// performance
// let's keep it simple for now, we can optimize it later
subscription.acknowledgeMessage(
Collections.singletonList(new PositionImpl(ledgerId, entryId)),
CommandAck.AckType.Individual,
null);
} finally {
filterProcessingTimeOnProduce
.labels(producer.getTopic().getName(), subscription.getName())
.observe(System.nanoTime() - now);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
Expand Down Expand Up @@ -103,6 +105,18 @@ public void beforeAll(ExtensionContext extensionContext) {
PulsarAdmin.builder()
.serviceHttpUrl("http://localhost:" + pulsarContainer.getMappedPort(8080))
.build();
Awaitility.await()
.until(
() -> {
List<String> tenants = admin.tenants().getTenants();
log.info("Tenants: {}", tenants);
if (!tenants.contains("public")) {
return false;
}
List<String> namespaces = admin.namespaces().getNamespaces("public");
log.info("Namespaces: {}", namespaces);
return namespaces.contains("public/default");
});
if (onContainerReady != null) {
onContainerReady.accept(this);
}
Expand Down

0 comments on commit b9f5bde

Please sign in to comment.