Skip to content

Commit

Permalink
Merge branch 'main' into 29-multisearch
Browse files Browse the repository at this point in the history
Signed-off-by: Jeff Zemerick <[email protected]>
  • Loading branch information
jzonthemtn authored Oct 3, 2024
2 parents 9bb33ce + dca7e02 commit b7afe28
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 10 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ build
build-idea/
out/

volumes/
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ ARG UBI_VERSION="3.0.0.0-SNAPSHOT"

COPY ./build/distributions/opensearch-ubi-${UBI_VERSION}.zip /tmp/

RUN /usr/share/opensearch/bin/opensearch-plugin install --batch file:/tmp/opensearch-ubi-${UBI_VERSION}.zip
RUN /usr/share/opensearch/bin/opensearch-plugin install --batch telemetry-otel
RUN /usr/share/opensearch/bin/opensearch-plugin install --batch file:/tmp/opensearch-ubi-${UBI_VERSION}.zip
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,17 @@ For details on the JSON Schema used by UBI to send and receive queries and event
* [Query Response Schema](https://o19s.github.io/ubi/docs/html/query.response.schema.html)
* [Event Schema](https://o19s.github.io/ubi/docs/html/event.schema.html)

## UBI, Data Prepper, and Open Telemetry

The UBI plugin can store UBI query data in one of three ways:

- By directly indexing the UBI query data in the `ubi_queries` index in the same OpenSearch cluster as the plugin.
- By sending the UBI query data as JSON to a Data Prepper [http](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/http/) source. The Data Prepper endpoint is provided via the `ubi.dataprepper.url` setting.
- By sending the UBI query data as Open Telemetry traces. This utilizes the native OpenSearch OTel capabilities which are exposed via the `TelemetryAwarePlugin` interface. As UBI queries are received, trace events will be generated. OpenSearch must be configured as described in [Distributed tracing](https://opensearch.org/docs/latest/observing-your-data/trace/distributed-tracing/) for the events to be sent.

## Getting Help

* Start with the [Documentation](https://opensearch.org/docs/latest/search-plugins/ubi/index/) site to how to use this plugin.
* For questions or help getting started, please find us in the [OpenSearch Slack](https://opensearch.org/slack.html) in the `#plugins` channel.
* For bugs or feature requests, please create [a new issue](https://github.com/o19s/opensearch-ubi/issues/new/choose).

Expand Down
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ thirdPartyAudit.enabled = false

dependencies {
runtimeOnly "org.apache.logging.log4j:log4j-core:${versions.log4j}"
api "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}"
api "org.apache.httpcomponents:httpcore:${versions.httpcore}"
api "org.apache.httpcomponents:httpclient:${versions.httpclient}"
api "commons-logging:commons-logging:${versions.commonslogging}"

yamlRestTestImplementation "org.apache.logging.log4j:log4j-core:${versions.log4j}"
}

Expand Down
12 changes: 11 additions & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ services:
- 2021:2021
volumes:
- ./dataprepper/pipelines.yaml:/usr/share/data-prepper/pipelines/pipelines.yaml
- ./dataprepper/data-prepper-config.yaml:/usr/share/data-prepper/config/data-prepper-config.yaml
- ./dataprepper/data-prepper-config.yaml:/usr/share/data-prepper/config/data-prepper-config.yaml
networks:
- ubi-dev-os-net

Expand All @@ -24,6 +24,16 @@ services:
logger.level: info
OPENSEARCH_INITIAL_ADMIN_PASSWORD: SuperSecretPassword_123
#ubi.dataprepper.url: "http://dataprepper-dev-os:2021/log/ingest"
# otel
telemetry.feature.tracer.enabled: true
telemetry.tracer.enabled: true
telemetry.tracer.sampler.probability: 1.0
opensearch.experimental.feature.telemetry.enabled: true
telemetry.otel.tracer.span.exporter.class: io.opentelemetry.exporter.logging.LoggingSpanExporter
telemetry.otel.tracer.exporter.batch_size: 1
telemetry.otel.tracer.exporter.max_queue_size: 3
volumes:
- "./volumes/logs:/usr/share/opensearch/logs"
ulimits:
memlock:
soft: -1
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
opensearchVersion = 3.0.0-SNAPSHOT
ubiVersion = 3.0.0.0-SNAPSHOT
ubiVersion = 3.0.0.0-SNAPSHOT
30 changes: 29 additions & 1 deletion src/main/java/org/opensearch/ubi/UbiActionFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
import org.opensearch.env.Environment;
import org.opensearch.search.SearchHit;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanBuilder;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.ubi.ext.UbiParameters;

import java.io.ByteArrayOutputStream;
Expand Down Expand Up @@ -67,15 +70,18 @@ public class UbiActionFilter implements ActionFilter {

private final Client client;
private final Environment environment;
private final Tracer tracer;

/**
* Creates a new filter.
* @param client An OpenSearch {@link Client}.
* @param environment The OpenSearch {@link Environment}.
* @param tracer An Open Telemetry {@link Tracer tracer}.
*/
public UbiActionFilter(Client client, Environment environment) {
public UbiActionFilter(Client client, Environment environment, Tracer tracer) {
this.client = client;
this.environment = environment;
this.tracer = tracer;
}

@Override
Expand Down Expand Up @@ -319,4 +325,26 @@ private String getResourceFile(final String fileName) {
}
}

private void sendOtelTrace(final Task task, final Tracer tracer, final QueryRequest queryRequest) {

final Span span = tracer.startSpan(SpanBuilder.from(task, "ubi_search"));

span.addAttribute("ubi.user_id", queryRequest.getQueryId());
span.addAttribute("ubi.query", queryRequest.getQuery());
span.addAttribute("ubi.user_query", queryRequest.getUserQuery());
span.addAttribute("ubi.client_id", queryRequest.getClientId());
span.addAttribute("ubi.timestamp", queryRequest.getTimestamp());

for (final String key : queryRequest.getQueryAttributes().keySet()) {
span.addAttribute("ubi.attribute." + key, queryRequest.getQueryAttributes().get(key));
}

span.addAttribute("ubi.query_response.response_id", queryRequest.getQueryResponse().getQueryResponseId());
span.addAttribute("ubi.query_response.query_id", queryRequest.getQueryResponse().getQueryId());
span.addAttribute("ubi.query_response.response_id", String.join(",", queryRequest.getQueryResponse().getQueryResponseObjectIds()));

span.endSpan();

}

}
12 changes: 8 additions & 4 deletions src/main/java/org/opensearch/ubi/UbiPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SearchPlugin;
import org.opensearch.plugins.TelemetryAwarePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.ubi.ext.UbiParametersExtBuilder;
import org.opensearch.watcher.ResourceWatcherService;
Expand All @@ -38,7 +40,7 @@
/**
* OpenSearch User Behavior Insights
*/
public class UbiPlugin extends Plugin implements ActionPlugin, SearchPlugin {
public class UbiPlugin extends Plugin implements ActionPlugin, SearchPlugin, TelemetryAwarePlugin {

private ActionFilter ubiActionFilter;

Expand Down Expand Up @@ -69,10 +71,12 @@ public Collection<Object> createComponents(
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
Supplier<RepositoriesService> repositoriesServiceSupplier,
Tracer tracer,
MetricsRegistry metricsRegistry
) {

this.ubiActionFilter = new UbiActionFilter(client, environment);
this.ubiActionFilter = new UbiActionFilter(client, environment, tracer);
return Collections.emptyList();

}
Expand Down
5 changes: 3 additions & 2 deletions src/test/java/org/opensearch/ubi/UbiActionFilterTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.search.SearchHits;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.ubi.ext.UbiParameters;
import org.opensearch.ubi.ext.UbiParametersExtBuilder;
Expand Down Expand Up @@ -61,7 +62,7 @@ public void testApplyWithoutUbiBlock() {
final ActionFuture<IndicesExistsResponse> actionFuture = mock(ActionFuture.class);
when(indicesAdminClient.exists(any(IndicesExistsRequest.class))).thenReturn(actionFuture);

final UbiActionFilter ubiActionFilter = new UbiActionFilter(client, environment);
final UbiActionFilter ubiActionFilter = new UbiActionFilter(client, environment, NoopTracer.INSTANCE);
final ActionListener<SearchResponse> listener = mock(ActionListener.class);

final SearchRequest request = mock(SearchRequest.class);
Expand Down Expand Up @@ -118,7 +119,7 @@ public void testApplyWithUbiBlockWithoutQueryId() {
final ActionFuture<IndicesExistsResponse> actionFuture = mock(ActionFuture.class);
when(indicesAdminClient.exists(any(IndicesExistsRequest.class))).thenReturn(actionFuture);

final UbiActionFilter ubiActionFilter = new UbiActionFilter(client, environment);
final UbiActionFilter ubiActionFilter = new UbiActionFilter(client, environment, NoopTracer.INSTANCE);
final ActionListener<SearchResponse> listener = mock(ActionListener.class);

final SearchRequest request = mock(SearchRequest.class);
Expand Down

0 comments on commit b7afe28

Please sign in to comment.