Skip to content

Commit

Permalink
Adds OTel support to UBI plugin (#24)
Browse files Browse the repository at this point in the history
* #20 Adding OTel support to UBI plugin.

Signed-off-by: jzonthemtn <[email protected]>

* #20 Updating readme.

Signed-off-by: jzonthemtn <[email protected]>

* #20 Removing wildcard import.

Signed-off-by: jzonthemtn <[email protected]>

* #20 Using NoopTracer instead of null.

Signed-off-by: jzonthemtn <[email protected]>

* #20 Updating readme for ways to send query data to OpenSearch.

Signed-off-by: jzonthemtn <[email protected]>

---------

Signed-off-by: jzonthemtn <[email protected]>
  • Loading branch information
jzonthemtn authored Oct 1, 2024
1 parent 57ab27a commit dca7e02
Show file tree
Hide file tree
Showing 13 changed files with 72 additions and 18 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ build
build-idea/
out/

volumes/
8 changes: 5 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
FROM opensearchproject/opensearch:2.14.0
FROM opensearchproject/opensearch:2.16.0

COPY ./build/distributions/opensearch-ubi-2.14.0.0.zip /tmp/
COPY ./build/distributions/opensearch-ubi-2.16.0.0.zip /tmp/

RUN /usr/share/opensearch/bin/opensearch-plugin install --batch telemetry-otel
RUN /usr/share/opensearch/bin/opensearch-plugin install --batch file:/tmp/opensearch-ubi-2.16.0.0.zip

RUN /usr/share/opensearch/bin/opensearch-plugin install --batch file:/tmp/opensearch-ubi-2.14.0.0.zip
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ For details on the JSON Schema used by UBI to send and receive queries and event
* [Query Response Schema](https://o19s.github.io/ubi/docs/html/query.response.schema.html)
* [Event Schema](https://o19s.github.io/ubi/docs/html/event.schema.html)

## UBI, Data Prepper, and Open Telemetry

The UBI plugin can store UBI query data in one of three ways:

- By directly indexing the UBI query data in the `ubi_queries` index in the same OpenSearch cluster as the plugin.
- By sending the UBI query data as JSON to a Data Prepper [http](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/http/) source. The Data Prepper endpoint is provided via the `ubi.dataprepper.url` setting.
- By sending the UBI query data as Open Telemetry traces. This utilizes the native OpenSearch OTel capabilities which are exposed via the `TelemetryAwarePlugin` interface. As UBI queries are received, trace events will be generated. OpenSearch must be configured as described in [Distributed tracing](https://opensearch.org/docs/latest/observing-your-data/trace/distributed-tracing/) for the events to be sent.

## Getting Help

* Start with the [Documentation](https://opensearch.org/docs/latest/search-plugins/ubi/index/) site to how to use this plugin.
Expand Down
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ thirdPartyAudit.enabled = false

dependencies {
runtimeOnly "org.apache.logging.log4j:log4j-core:${versions.log4j}"
api "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}"
api "org.apache.httpcomponents:httpcore:${versions.httpcore}"
api "org.apache.httpcomponents:httpclient:${versions.httpclient}"
api "commons-logging:commons-logging:${versions.commonslogging}"

yamlRestTestImplementation "org.apache.logging.log4j:log4j-core:${versions.log4j}"
}

Expand Down
12 changes: 11 additions & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ services:
- 2021:2021
volumes:
- ./dataprepper/pipelines.yaml:/usr/share/data-prepper/pipelines/pipelines.yaml
- ./dataprepper/data-prepper-config.yaml:/usr/share/data-prepper/config/data-prepper-config.yaml
- ./dataprepper/data-prepper-config.yaml:/usr/share/data-prepper/config/data-prepper-config.yaml
networks:
- ubi-dev-os-net

Expand All @@ -24,6 +24,16 @@ services:
logger.level: info
OPENSEARCH_INITIAL_ADMIN_PASSWORD: SuperSecretPassword_123
#ubi.dataprepper.url: "http://dataprepper-dev-os:2021/log/ingest"
# otel
telemetry.feature.tracer.enabled: true
telemetry.tracer.enabled: true
telemetry.tracer.sampler.probability: 1.0
opensearch.experimental.feature.telemetry.enabled: true
telemetry.otel.tracer.span.exporter.class: io.opentelemetry.exporter.logging.LoggingSpanExporter
telemetry.otel.tracer.exporter.batch_size: 1
telemetry.otel.tracer.exporter.max_queue_size: 3
volumes:
- "./volumes/logs:/usr/share/opensearch/logs"
ulimits:
memlock:
soft: -1
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
opensearchVersion = 3.0.0-SNAPSHOT
ubiVersion = 3.0.0.0-SNAPSHOT
ubiVersion = 3.0.0.0-SNAPSHOT
1 change: 0 additions & 1 deletion licenses/jackson-annotations-2.17.1.jar.sha1

This file was deleted.

1 change: 1 addition & 0 deletions licenses/jackson-annotations-2.17.2.jar.sha1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
147b7b9412ffff24339f8aba080b292448e08698
1 change: 0 additions & 1 deletion licenses/jackson-databind-2.17.1.jar.sha1

This file was deleted.

1 change: 1 addition & 0 deletions licenses/jackson-databind-2.17.2.jar.sha1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
e6deb029e5901e027c129341fac39e515066b68c
36 changes: 31 additions & 5 deletions src/main/java/org/opensearch/ubi/UbiActionFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@

package org.opensearch.ubi;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
Expand Down Expand Up @@ -39,6 +35,9 @@
import org.opensearch.env.Environment;
import org.opensearch.search.SearchHit;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanBuilder;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.ubi.ext.UbiParameters;

import java.io.ByteArrayOutputStream;
Expand Down Expand Up @@ -69,15 +68,18 @@ public class UbiActionFilter implements ActionFilter {

private final Client client;
private final Environment environment;
private final Tracer tracer;

/**
* Creates a new filter.
* @param client An OpenSearch {@link Client}.
* @param environment The OpenSearch {@link Environment}.
* @param tracer An Open Telemetry {@link Tracer tracer}.
*/
public UbiActionFilter(Client client, Environment environment) {
public UbiActionFilter(Client client, Environment environment, Tracer tracer) {
this.client = client;
this.environment = environment;
this.tracer = tracer;
}

@Override
Expand Down Expand Up @@ -139,6 +141,8 @@ public void onResponse(Response response) {
final QueryResponse queryResponse = new QueryResponse(queryId, queryResponseId, queryResponseHitIds);
final QueryRequest queryRequest = new QueryRequest(queryId, userQuery, userId, query, queryAttributes, queryResponse);

sendOtelTrace(task, tracer, queryRequest);

final String dataPrepperUrl = environment.settings().get(UbiSettings.DATA_PREPPER_URL);
if(dataPrepperUrl != null) {
sendToDataPrepper(dataPrepperUrl, queryRequest);
Expand Down Expand Up @@ -303,4 +307,26 @@ private String getResourceFile(final String fileName) {
}
}

private void sendOtelTrace(final Task task, final Tracer tracer, final QueryRequest queryRequest) {

final Span span = tracer.startSpan(SpanBuilder.from(task, "ubi_search"));

span.addAttribute("ubi.user_id", queryRequest.getQueryId());
span.addAttribute("ubi.query", queryRequest.getQuery());
span.addAttribute("ubi.user_query", queryRequest.getUserQuery());
span.addAttribute("ubi.client_id", queryRequest.getClientId());
span.addAttribute("ubi.timestamp", queryRequest.getTimestamp());

for (final String key : queryRequest.getQueryAttributes().keySet()) {
span.addAttribute("ubi.attribute." + key, queryRequest.getQueryAttributes().get(key));
}

span.addAttribute("ubi.query_response.response_id", queryRequest.getQueryResponse().getQueryResponseId());
span.addAttribute("ubi.query_response.query_id", queryRequest.getQueryResponse().getQueryId());
span.addAttribute("ubi.query_response.response_id", String.join(",", queryRequest.getQueryResponse().getQueryResponseObjectIds()));

span.endSpan();

}

}
12 changes: 8 additions & 4 deletions src/main/java/org/opensearch/ubi/UbiPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SearchPlugin;
import org.opensearch.plugins.TelemetryAwarePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.ubi.ext.UbiParametersExtBuilder;
import org.opensearch.watcher.ResourceWatcherService;
Expand All @@ -38,7 +40,7 @@
/**
* OpenSearch User Behavior Insights
*/
public class UbiPlugin extends Plugin implements ActionPlugin, SearchPlugin {
public class UbiPlugin extends Plugin implements ActionPlugin, SearchPlugin, TelemetryAwarePlugin {

private ActionFilter ubiActionFilter;

Expand Down Expand Up @@ -69,10 +71,12 @@ public Collection<Object> createComponents(
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
Supplier<RepositoriesService> repositoriesServiceSupplier,
Tracer tracer,
MetricsRegistry metricsRegistry
) {

this.ubiActionFilter = new UbiActionFilter(client, environment);
this.ubiActionFilter = new UbiActionFilter(client, environment, tracer);
return Collections.emptyList();

}
Expand Down
5 changes: 3 additions & 2 deletions src/test/java/org/opensearch/ubi/UbiActionFilterTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.search.SearchHits;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.ubi.ext.UbiParameters;
import org.opensearch.ubi.ext.UbiParametersExtBuilder;
Expand Down Expand Up @@ -61,7 +62,7 @@ public void testApplyWithoutUbiBlock() {
final ActionFuture<IndicesExistsResponse> actionFuture = mock(ActionFuture.class);
when(indicesAdminClient.exists(any(IndicesExistsRequest.class))).thenReturn(actionFuture);

final UbiActionFilter ubiActionFilter = new UbiActionFilter(client, environment);
final UbiActionFilter ubiActionFilter = new UbiActionFilter(client, environment, NoopTracer.INSTANCE);
final ActionListener<SearchResponse> listener = mock(ActionListener.class);

final SearchRequest request = mock(SearchRequest.class);
Expand Down Expand Up @@ -118,7 +119,7 @@ public void testApplyWithUbiBlockWithoutQueryId() {
final ActionFuture<IndicesExistsResponse> actionFuture = mock(ActionFuture.class);
when(indicesAdminClient.exists(any(IndicesExistsRequest.class))).thenReturn(actionFuture);

final UbiActionFilter ubiActionFilter = new UbiActionFilter(client, environment);
final UbiActionFilter ubiActionFilter = new UbiActionFilter(client, environment, NoopTracer.INSTANCE);
final ActionListener<SearchResponse> listener = mock(ActionListener.class);

final SearchRequest request = mock(SearchRequest.class);
Expand Down

0 comments on commit dca7e02

Please sign in to comment.