diff --git a/CHANGELOG.md b/CHANGELOG.md index cefa3f4716d5f..437781f8ed2f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -136,6 +136,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Renaming to match merge to main branch ([5362](https://github.com/opensearch-project/OpenSearch/pull/5362)) - Updated settings registration changes to reflect main ([5532](https://github.com/opensearch-project/OpenSearch/pull/5532)) - Added dependency information to Extensions ([#5438](https://github.com/opensearch-project/OpenSearch/pull/5438)) + - Adding request response model to fetch job details from extensions ([#5586](https://github.com/opensearch-project/OpenSearch/pull/5586)) ## [Unreleased 2.x] ### Added diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionStringResponseHandler.java b/server/src/main/java/org/opensearch/extensions/ExtensionStringResponseHandler.java new file mode 100644 index 0000000000000..a9fcdf07bf0de --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/ExtensionStringResponseHandler.java @@ -0,0 +1,71 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportException; +import org.opensearch.transport.TransportResponseHandler; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +/** + * ExtensionStringResponseHandler to handle ExtensionStringResponse from extensions + * + * @opensearch.internal + */ +public class ExtensionStringResponseHandler implements TransportResponseHandler { + private static final Logger logger = LogManager.getLogger(ExtensionStringResponseHandler.class); + final CompletableFuture inProgressFuture = new CompletableFuture<>(); + private String response; + + public ExtensionStringResponseHandler(String response) { + this.response = response; + } + + @Override + public ExtensionStringResponse read(StreamInput in) throws IOException { + return new ExtensionStringResponse(in); + } + + @Override + public void handleResponse(ExtensionStringResponse response) { + logger.info("Response from extension " + response); + this.response = response.getResponse(); + inProgressFuture.complete(response); + } + + @Override + public void handleException(TransportException exp) { + logger.error(new ParameterizedMessage("Fetch Job Details from extension failed"), exp); + inProgressFuture.completeExceptionally(exp); + } + + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } + + public String getResponse() { + return response; + } + + public void setResponse(String response) { + this.response = response; + } + + @Override + public String toString() { + return "ExtensionStringResponseHandler{" + "response='" + response + '\'' + '}'; + } +} diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java index 86293d38fcfd7..8d8696099cd39 100644 --- a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java +++ b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java @@ -87,6 +87,8 @@ public class ExtensionsManager { public static final String REQUEST_REST_EXECUTE_ON_EXTENSION_ACTION = "internal:extensions/restexecuteonextensiontaction"; public static final String REQUEST_EXTENSION_HANDLE_TRANSPORT_ACTION = "internal:extensions/handle-transportaction"; public static final String TRANSPORT_ACTION_REQUEST_FROM_EXTENSION = "internal:extensions/request-transportaction-from-extension"; + public static final String JOB_INDEX_REQUEST_FROM_EXTENSION = "internal:extensions/job-index-from-extension"; + public static final String JOB_TYPE_REQUEST_FROM_EXTENSION = "internal:extensions/job-type-from-extension"; public static final int EXTENSION_REQUEST_WAIT_TIMEOUT = 10; private static final Logger logger = LogManager.getLogger(ExtensionsManager.class); @@ -105,7 +107,9 @@ public static enum RequestType { REQUEST_EXTENSION_ENVIRONMENT_SETTINGS, CREATE_COMPONENT, ON_INDEX_MODULE, - GET_SETTINGS + GET_SETTINGS, + JOB_INDEX_REQUEST_FROM_EXTENSION, + JOB_TYPE_REQUEST_FROM_EXTENSION }; /** @@ -133,6 +137,8 @@ public static enum OpenSearchRequestType { private Settings environmentSettings; private AddSettingsUpdateConsumerRequestHandler addSettingsUpdateConsumerRequestHandler; private NodeClient client; + private Map jobDetailsMap; + private ExtensionStringResponseHandler extensionStringResponseHandler; /** * Instantiate a new ExtensionsManager object to handle requests and responses from extensions. This is called during Node bootstrap. @@ -153,6 +159,7 @@ public ExtensionsManager(Settings settings, Path extensionsPath) throws IOExcept this.namedWriteableRegistry = null; this.client = null; this.extensionTransportActionsHandler = null; + this.jobDetailsMap = new HashMap<>(); /* * Now Discover extensions @@ -358,6 +365,7 @@ private void loadExtension(Extension extension) throws IOException { public void initialize() { for (DiscoveryExtensionNode extension : extensionIdMap.values()) { initializeExtension(extension); + fetchJobDetails(extension); } this.namedWriteableRegistry = new ExtensionNamedWriteableRegistry(extensions, transportService); } @@ -414,6 +422,53 @@ public String executor() { } } + private void fetchJobDetails(DiscoveryExtensionNode extension) { + String jobType = null; + String jobIndex = null; + try { + logger.info("Sending extension request type: " + JOB_TYPE_REQUEST_FROM_EXTENSION); + transportService.connectToExtensionNode(extension); + this.extensionStringResponseHandler = new ExtensionStringResponseHandler(jobType); + transportService.sendRequest( + extension, + JOB_TYPE_REQUEST_FROM_EXTENSION, + new ExtensionRequest(RequestType.JOB_TYPE_REQUEST_FROM_EXTENSION), + extensionStringResponseHandler + ); + extensionStringResponseHandler.inProgressFuture.join(); + jobType = extensionStringResponseHandler.getResponse(); + + } catch (Exception e) { + try { + throw e; + } catch (Exception e1) { + logger.error(e.toString()); + } + } + + try { + logger.info("Sending extension request type: " + JOB_INDEX_REQUEST_FROM_EXTENSION); + this.extensionStringResponseHandler = new ExtensionStringResponseHandler(jobIndex); + transportService.sendRequest( + extension, + JOB_INDEX_REQUEST_FROM_EXTENSION, + new ExtensionRequest(RequestType.JOB_INDEX_REQUEST_FROM_EXTENSION), + extensionStringResponseHandler + ); + extensionStringResponseHandler.inProgressFuture.join(); + jobIndex = extensionStringResponseHandler.getResponse(); + + } catch (Exception e) { + try { + throw e; + } catch (Exception e1) { + logger.error(e.toString()); + } + } + + jobDetailsMap.put(extension.getId(), new JobDetails(jobType, jobIndex)); + } + /** * Handles an {@link ExtensionRequest}. * @@ -622,6 +677,14 @@ public static int getExtensionRequestWaitTimeout() { return EXTENSION_REQUEST_WAIT_TIMEOUT; } + public static String getJobIndexRequestFromExtension() { + return JOB_INDEX_REQUEST_FROM_EXTENSION; + } + + public static String getJobTypeRequestFromExtension() { + return JOB_TYPE_REQUEST_FROM_EXTENSION; + } + public static Logger getLogger() { return logger; } @@ -634,6 +697,10 @@ public ExtensionTransportActionsHandler getExtensionTransportActionsHandler() { return extensionTransportActionsHandler; } + public ExtensionStringResponseHandler getExtensionStringResponseHandler() { + return extensionStringResponseHandler; + } + public List getExtensions() { return extensions; } diff --git a/server/src/main/java/org/opensearch/extensions/JobDetails.java b/server/src/main/java/org/opensearch/extensions/JobDetails.java new file mode 100644 index 0000000000000..6d1bab47e84ed --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/JobDetails.java @@ -0,0 +1,67 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; + +import java.io.IOException; + +/** + * The object will contain job details of extension. + * + * @opensearch.internal + */ +public class JobDetails implements Writeable { + + private String jobType; + + private String jobIndex; + + public JobDetails(String jobType, String jobIndex) { + this.jobType = jobType; + this.jobIndex = jobIndex; + } + + public JobDetails(StreamInput in) throws IOException { + if (in.available() > 0) { + this.jobType = in.readString(); + this.jobIndex = in.readString(); + } + } + + public String getJobType() { + return jobType; + } + + public void setJobType(String jobType) { + this.jobType = jobType; + } + + public String getJobIndex() { + return jobIndex; + } + + public void setJobIndex(String jobIndex) { + this.jobIndex = jobIndex; + } + + @Override + public String toString() { + return "JobDetails [jobType=" + jobType + ", jobIndex=" + jobIndex + "]"; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(jobType); + out.writeString(jobIndex); + + } +} diff --git a/server/src/test/java/org/opensearch/extensions/ExtensionStringResponseHandlerTests.java b/server/src/test/java/org/opensearch/extensions/ExtensionStringResponseHandlerTests.java new file mode 100644 index 0000000000000..571330392dc2c --- /dev/null +++ b/server/src/test/java/org/opensearch/extensions/ExtensionStringResponseHandlerTests.java @@ -0,0 +1,46 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions; + +import org.junit.Before; +import org.mockito.Mockito; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.test.OpenSearchTestCase; + +public class ExtensionStringResponseHandlerTests extends OpenSearchTestCase { + + private ExtensionStringResponseHandler extensionStringResponseHandler; + + @Before + public void setup() throws Exception { + extensionStringResponseHandler = new ExtensionStringResponseHandler("Sample-string"); + } + + public void testReadResponse() throws Exception { + String expected = "Sample-string"; + StreamInput in = Mockito.mock(StreamInput.class); + Mockito.when(in.readString()).thenReturn("Sample-string"); + ExtensionStringResponse extensionStringResponse = extensionStringResponseHandler.read(in); + assertEquals(expected, extensionStringResponse.getResponse()); + + } + + public void testHandleResponse() throws Exception { + String expected = "Sample-string"; + ExtensionStringResponse response = new ExtensionStringResponse("Sample-string"); + extensionStringResponseHandler.handleResponse(response); + assertEquals(expected, response.getResponse()); + } + + public void testExecutor() { + String expected = "generic"; + String response = extensionStringResponseHandler.executor(); + assertEquals(expected, response); + } +}