Skip to content

Commit

Permalink
Adds support for multisearch (#30)
Browse files Browse the repository at this point in the history
* #29 Adding support for msearch.

Signed-off-by: jzonthemtn <[email protected]>

* #29 Updating docker and dependencies for 3.0.0

Signed-off-by: jzonthemtn <[email protected]>

* #29 Simplifying docker for 3.0.0.

Signed-off-by: jzonthemtn <[email protected]>

* #29 Removing comment and will write issue for it.

Signed-off-by: jzonthemtn <[email protected]>

---------

Signed-off-by: jzonthemtn <[email protected]>
Signed-off-by: Jeff Zemerick <[email protected]>
  • Loading branch information
jzonthemtn authored Oct 4, 2024
1 parent dca7e02 commit 1e92b6d
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 94 deletions.
11 changes: 7 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
FROM opensearchproject/opensearch:2.16.0
FROM opensearchstaging/opensearch:3.0.0

COPY ./build/distributions/opensearch-ubi-2.16.0.0.zip /tmp/
ARG UBI_VERSION="3.0.0.0-SNAPSHOT"

RUN /usr/share/opensearch/bin/opensearch-plugin install --batch telemetry-otel
RUN /usr/share/opensearch/bin/opensearch-plugin install --batch file:/tmp/opensearch-ubi-2.16.0.0.zip
COPY ./build/distributions/opensearch-ubi-${UBI_VERSION}.zip /tmp/

# Required for OTel capabilities.
#RUN /usr/share/opensearch/bin/opensearch-plugin install --batch telemetry-otel

RUN /usr/share/opensearch/bin/opensearch-plugin install --batch file:/tmp/opensearch-ubi-${UBI_VERSION}.zip
63 changes: 24 additions & 39 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
services:

dataprepper-dev-os:
depends_on:
- ubi-dev-os
container_name: dataprepper
image: opensearchproject/data-prepper:2.8.0
ports:
- 4900:4900
- 2021:2021
volumes:
- ./dataprepper/pipelines.yaml:/usr/share/data-prepper/pipelines/pipelines.yaml
- ./dataprepper/data-prepper-config.yaml:/usr/share/data-prepper/config/data-prepper-config.yaml
networks:
- ubi-dev-os-net
# Uncomment to use OTel or Data Prepper -> OpenSearch pipelines.
# dataprepper-dev-os:
# depends_on:
# - ubi-dev-os
# container_name: dataprepper
# image: opensearchproject/data-prepper:2.8.0
# ports:
# - 4900:4900
# - 2021:2021
# volumes:
# - ./dataprepper/pipelines.yaml:/usr/share/data-prepper/pipelines/pipelines.yaml
# - ./dataprepper/data-prepper-config.yaml:/usr/share/data-prepper/config/data-prepper-config.yaml
# networks:
# - ubi-dev-os-net

ubi-dev-os:
build: ./
Expand All @@ -23,17 +24,16 @@ services:
plugins.security.disabled: "true"
logger.level: info
OPENSEARCH_INITIAL_ADMIN_PASSWORD: SuperSecretPassword_123
#ubi.dataprepper.url: "http://dataprepper-dev-os:2021/log/ingest"
# otel
telemetry.feature.tracer.enabled: true
telemetry.tracer.enabled: true
telemetry.tracer.sampler.probability: 1.0
opensearch.experimental.feature.telemetry.enabled: true
telemetry.otel.tracer.span.exporter.class: io.opentelemetry.exporter.logging.LoggingSpanExporter
telemetry.otel.tracer.exporter.batch_size: 1
telemetry.otel.tracer.exporter.max_queue_size: 3
volumes:
- "./volumes/logs:/usr/share/opensearch/logs"
# Requires the Data Prepper container:
# ubi.dataprepper.url: "http://dataprepper-dev-os:2021/log/ingest"
# Requires the OTel plugin to be installed.
#telemetry.feature.tracer.enabled: true
#telemetry.tracer.enabled: true
#telemetry.tracer.sampler.probability: 1.0
#opensearch.experimental.feature.telemetry.enabled: true
#telemetry.otel.tracer.span.exporter.class: io.opentelemetry.exporter.logging.LoggingSpanExporter
#telemetry.otel.tracer.exporter.batch_size: 1
#telemetry.otel.tracer.exporter.max_queue_size: 3
ulimits:
memlock:
soft: -1
Expand All @@ -50,21 +50,6 @@ services:
networks:
- ubi-dev-os-net

# ubi-dev-os-dashboards:
# image: opensearchproject/opensearch-dashboards:2.12.0
# container_name: ubi-dev-os-dashboards
# ports:
# - 5601:5601
# expose:
# - 5601
# environment:
# OPENSEARCH_HOSTS: '["http://ubi-dev-os:9200"]'
# DISABLE_SECURITY_DASHBOARDS_PLUGIN: "true"
# depends_on:
# - ubi-dev-os
# networks:
# - ubi-dev-os-net

networks:
ubi-dev-os-net:
driver: bridge
Empty file modified scripts/get-indexed-queries.sh
100644 → 100755
Empty file.
8 changes: 8 additions & 0 deletions scripts/msearch.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/bash -e

curl -s -X GET "http://localhost:9200/_msearch" -H 'Content-Type: application/json' -d'
{ "index": "ecommerce"}
{ "query": { "match_all": {} }, "ext": { "ubi": { "query_id": "11111" } } }
{ "index": "ecommerce"}
{ "query": { "match_all": {} }, "ext": { "ubi": { "query_id": "22222" } } }
'
119 changes: 68 additions & 51 deletions src/main/java/org/opensearch/ubi/UbiActionFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.MultiSearchResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.ActionFilter;
Expand Down Expand Up @@ -96,7 +98,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void app
ActionFilterChain<Request, Response> chain
) {

if (!(request instanceof SearchRequest)) {
if (!(request instanceof SearchRequest || request instanceof MultiSearchRequest)) {
chain.proceed(task, action, request, listener);
return;
}
Expand All @@ -106,84 +108,99 @@ public <Request extends ActionRequest, Response extends ActionResponse> void app
@Override
public void onResponse(Response response) {

final SearchRequest searchRequest = (SearchRequest) request;
if (request instanceof MultiSearchRequest) {

if (response instanceof SearchResponse) {
final MultiSearchRequest multiSearchRequest = (MultiSearchRequest) request;

final UbiParameters ubiParameters = UbiParameters.getUbiParameters(searchRequest);
for(final SearchRequest searchRequest : multiSearchRequest.requests()) {
handleSearchRequest(searchRequest, response);
}

if (ubiParameters != null) {
}

final String queryId = ubiParameters.getQueryId();
final String userQuery = ubiParameters.getUserQuery();
final String userId = ubiParameters.getClientId();
final String objectIdField = ubiParameters.getObjectIdField();
final Map<String, String> queryAttributes = ubiParameters.getQueryAttributes();
if(request instanceof SearchRequest) {
response = (Response) handleSearchRequest((SearchRequest) request, response);
}

// TODO: Ignore the UBI in ext.
final String query = searchRequest.source().toString();
listener.onResponse(response);

final List<String> queryResponseHitIds = new LinkedList<>();
}

for (final SearchHit hit : ((SearchResponse) response).getHits()) {
@Override
public void onFailure(Exception ex) {
listener.onFailure(ex);
}

if (objectIdField == null || objectIdField.isEmpty()) {
// Use the result's docId since no object_id was given for the search.
queryResponseHitIds.add(String.valueOf(hit.docId()));
} else {
final Map<String, Object> source = hit.getSourceAsMap();
queryResponseHitIds.add((String) source.get(objectIdField));
}
});

}
}

final String queryResponseId = UUID.randomUUID().toString();
final QueryResponse queryResponse = new QueryResponse(queryId, queryResponseId, queryResponseHitIds);
final QueryRequest queryRequest = new QueryRequest(queryId, userQuery, userId, query, queryAttributes, queryResponse);
private ActionResponse handleSearchRequest(final SearchRequest searchRequest, ActionResponse response) {

sendOtelTrace(task, tracer, queryRequest);
if (response instanceof SearchResponse) {

final String dataPrepperUrl = environment.settings().get(UbiSettings.DATA_PREPPER_URL);
if(dataPrepperUrl != null) {
sendToDataPrepper(dataPrepperUrl, queryRequest);
} else {
indexQuery(queryRequest);
}
final UbiParameters ubiParameters = UbiParameters.getUbiParameters(searchRequest);

SearchResponse searchResponse = (SearchResponse) response;
if (ubiParameters != null) {

response = (Response) new UbiSearchResponse(
searchResponse.getInternalResponse(),
searchResponse.getScrollId(),
searchResponse.getTotalShards(),
searchResponse.getSuccessfulShards(),
searchResponse.getSkippedShards(),
searchResponse.getTook().millis(),
searchResponse.getShardFailures(),
searchResponse.getClusters(),
queryId
);
final String queryId = ubiParameters.getQueryId();
final String userQuery = ubiParameters.getUserQuery();
final String userId = ubiParameters.getClientId();
final String objectIdField = ubiParameters.getObjectIdField();
final Map<String, String> queryAttributes = ubiParameters.getQueryAttributes();

final String query = searchRequest.source().toString();

final List<String> queryResponseHitIds = new LinkedList<>();

for (final SearchHit hit : ((SearchResponse) response).getHits()) {

if (objectIdField == null || objectIdField.isEmpty()) {
// Use the result's docId since no object_id was given for the search.
queryResponseHitIds.add(String.valueOf(hit.docId()));
} else {
final Map<String, Object> source = hit.getSourceAsMap();
queryResponseHitIds.add((String) source.get(objectIdField));
}

}

listener.onResponse(response);
final String queryResponseId = UUID.randomUUID().toString();
final QueryResponse queryResponse = new QueryResponse(queryId, queryResponseId, queryResponseHitIds);
final QueryRequest queryRequest = new QueryRequest(queryId, userQuery, userId, query, queryAttributes, queryResponse);

}
final String dataPrepperUrl = environment.settings().get(UbiSettings.DATA_PREPPER_URL);
if (dataPrepperUrl != null) {
sendToDataPrepper(dataPrepperUrl, queryRequest);
} else {
indexQuery(queryRequest);
}

final SearchResponse searchResponse = (SearchResponse) response;

response = new UbiSearchResponse(
searchResponse.getInternalResponse(),
searchResponse.getScrollId(),
searchResponse.getTotalShards(),
searchResponse.getSuccessfulShards(),
searchResponse.getSkippedShards(),
searchResponse.getTook().millis(),
searchResponse.getShardFailures(),
searchResponse.getClusters(),
queryId
);

@Override
public void onFailure(Exception ex) {
listener.onFailure(ex);
}

});
}

return response;

}

private void sendToDataPrepper(final String dataPrepperUrl, final QueryRequest queryRequest) {

LOGGER.debug("Sending query to DataPrepper at " + dataPrepperUrl);
LOGGER.debug("Sending query to DataPrepper at {}", dataPrepperUrl);

// TODO: Do this in a background thread?
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
---
"Query":

- do:
indices.delete:
index: ubi_queries
ignore_unavailable: true

- do:
indices.create:
index: ecommerce1
body:
mappings:
{ "properties": { "category": { "type": "text" } } }

- match: { acknowledged: true }
- match: { index: "ecommerce1"}

- do:
indices.create:
index: ecommerce2
body:
mappings:
{ "properties": { "category": { "type": "text" } } }

- match: { acknowledged: true }
- match: { index: "ecommerce2"}

- do:
index:
index: ecommerce1
id: 1
body: { category: notebook }

- match: { result: created }

- do:
index:
index: ecommerce2
id: 1
body: { category: notebook }

- match: { result: created }

- do:
indices.refresh:
index: [ "ecommerce1" ]

- do:
indices.refresh:
index: [ "ecommerce2" ]

- do:
msearch:
rest_total_hits_as_int: true
body:
- index: ecommerce1
- {query: {match_all: {}}, "ext": {"ubi": {"query_id": "12345"}}}
- index: ecommerce2
- {query: {match_all: {}}, "ext": {"ubi": {"query_id": "12345"}}}

- match: { responses.0.hits.total: 1}
- match: { responses.1.hits.total: 1}

- do:
cluster.health:
index: [ubi_queries]
wait_for_no_initializing_shards: true

- do:
indices.exists:
index: ubi_queries

- is_true: ''

0 comments on commit 1e92b6d

Please sign in to comment.