Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature/extensions] Communication mechanism JS (Part 1) #5586

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Renaming to match merge to main branch ([5362](https://github.com/opensearch-project/OpenSearch/pull/5362))
- Updated settings registration changes to reflect main ([5532](https://github.com/opensearch-project/OpenSearch/pull/5532))
- Added dependency information to Extensions ([#5438](https://github.com/opensearch-project/OpenSearch/pull/5438))
- Adding request response model to fetch job details from extensions ([#5586](https://github.com/opensearch-project/OpenSearch/pull/5586))

## [Unreleased 2.x]
### Added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public class ExtensionsManager {
public static final String REQUEST_REST_EXECUTE_ON_EXTENSION_ACTION = "internal:extensions/restexecuteonextensiontaction";
public static final String REQUEST_EXTENSION_HANDLE_TRANSPORT_ACTION = "internal:extensions/handle-transportaction";
public static final String TRANSPORT_ACTION_REQUEST_FROM_EXTENSION = "internal:extensions/request-transportaction-from-extension";
public static final String JOB_DETAILS_REQUEST_FROM_EXTENSION = "internal:extensions/job-details-from-extension";
public static final int EXTENSION_REQUEST_WAIT_TIMEOUT = 10;

private static final Logger logger = LogManager.getLogger(ExtensionsManager.class);
Expand All @@ -105,7 +106,8 @@ public static enum RequestType {
REQUEST_EXTENSION_ENVIRONMENT_SETTINGS,
CREATE_COMPONENT,
ON_INDEX_MODULE,
GET_SETTINGS
GET_SETTINGS,
JOB_DETAILS_REQUEST_FROM_EXTENSION
};

/**
Expand Down Expand Up @@ -133,6 +135,8 @@ public static enum OpenSearchRequestType {
private Settings environmentSettings;
private AddSettingsUpdateConsumerRequestHandler addSettingsUpdateConsumerRequestHandler;
private NodeClient client;
private Map<String, JobDetails> jobDetailsMap;
private JobDetailsResponseHandler jobDetailsResponseHandler;

/**
* Instantiate a new ExtensionsManager object to handle requests and responses from extensions. This is called during Node bootstrap.
Expand All @@ -153,6 +157,7 @@ public ExtensionsManager(Settings settings, Path extensionsPath) throws IOExcept
this.namedWriteableRegistry = null;
this.client = null;
this.extensionTransportActionsHandler = null;
this.jobDetailsMap = new HashMap<>();

/*
* Now Discover extensions
Expand Down Expand Up @@ -358,6 +363,7 @@ private void loadExtension(Extension extension) throws IOException {
public void initialize() {
for (DiscoveryExtensionNode extension : extensionIdMap.values()) {
initializeExtension(extension);
fetchJobDetails(extension);
}
this.namedWriteableRegistry = new ExtensionNamedWriteableRegistry(extensions, transportService);
}
Expand Down Expand Up @@ -414,6 +420,28 @@ public String executor() {
}
}

private void fetchJobDetails(DiscoveryExtensionNode extension) {
try {
logger.info("Sending extension request type: " + JOB_DETAILS_REQUEST_FROM_EXTENSION);
transportService.connectToExtensionNode(extension);
this.jobDetailsResponseHandler = new JobDetailsResponseHandler(jobDetailsMap, extension.getId());
transportService.sendRequest(
extension,
JOB_DETAILS_REQUEST_FROM_EXTENSION,
new ExtensionRequest(RequestType.JOB_DETAILS_REQUEST_FROM_EXTENSION),
jobDetailsResponseHandler
);

jobDetailsResponseHandler.inProgressFuture.join();
} catch (Exception e) {
try {
throw e;
} catch (Exception e1) {
logger.error(e.toString());
}
}
}

/**
* Handles an {@link ExtensionRequest}.
*
Expand Down Expand Up @@ -622,6 +650,10 @@ public static int getExtensionRequestWaitTimeout() {
return EXTENSION_REQUEST_WAIT_TIMEOUT;
}

public static String getJobDetailsRequestFromExtension() {
return JOB_DETAILS_REQUEST_FROM_EXTENSION;
}

public static Logger getLogger() {
return logger;
}
Expand Down Expand Up @@ -670,6 +702,10 @@ public ExtensionActionListenerHandler getListenerHandler() {
return listenerHandler;
}

public JobDetailsResponseHandler getJobDetailsHandler() {
return jobDetailsResponseHandler;
}

public Settings getEnvironmentSettings() {
return environmentSettings;
}
Expand Down
67 changes: 67 additions & 0 deletions server/src/main/java/org/opensearch/extensions/JobDetails.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.extensions;

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;

import java.io.IOException;

/**
* The object will contain job details of extension.
*
* @opensearch.internal
*/
public class JobDetails implements Writeable {

private String jobType;

private String jobIndex;

public JobDetails(String jobType, String jobIndex) {
this.jobType = jobType;
this.jobIndex = jobIndex;
}

public JobDetails(StreamInput in) throws IOException {
if (in.available() > 0) {
this.jobType = in.readString();
this.jobIndex = in.readString();
}
}

public String getJobType() {
return jobType;
}

public void setJobType(String jobType) {
this.jobType = jobType;
}

public String getJobIndex() {
return jobIndex;
}

public void setJobIndex(String jobIndex) {
this.jobIndex = jobIndex;
}

@Override
public String toString() {
return "JobDetails [jobType=" + jobType + ", jobIndex=" + jobIndex + "]";
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(jobType);
out.writeString(jobIndex);

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.extensions;

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.transport.TransportResponse;

import java.io.IOException;
import java.util.Objects;

/**
* JobDetails Response from extensions
*
* @opensearch.internal
*/
public class JobDetailsResponse extends TransportResponse {

private JobDetails jobDetails;

public JobDetailsResponse(JobDetails jobDetails) {
this.jobDetails = jobDetails;
}

public JobDetailsResponse(StreamInput in) throws IOException {
super(in);
jobDetails = new JobDetails(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
jobDetails.writeTo(out);
}

public JobDetails getJobDetails() {
return jobDetails;
}

public void setJobDetails(JobDetails jobDetails) {
this.jobDetails = jobDetails;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
JobDetailsResponse that = (JobDetailsResponse) o;
return Objects.equals(jobDetails, that.jobDetails);
}

@Override
public int hashCode() {
return Objects.hash(jobDetails);
}

@Override
public String toString() {
return "JobDetailsResponse{" + "jobDetails=" + jobDetails + '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.extensions;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* JobDetailsResponseHandler to handle JobDetailsResponse from extensions
*
* @opensearch.internal
*/
public class JobDetailsResponseHandler implements TransportResponseHandler<JobDetailsResponse> {
private static final Logger logger = LogManager.getLogger(JobDetailsResponseHandler.class);
final CompletableFuture<JobDetailsResponse> inProgressFuture = new CompletableFuture<>();
private Map<String, JobDetails> jobDetailsMap;

private String uniqueExtensionId;

public JobDetailsResponseHandler(Map<String, JobDetails> jobDetailsMap, String uniqueExtensionId) {
this.jobDetailsMap = jobDetailsMap;
this.uniqueExtensionId = uniqueExtensionId;
}

@Override
public JobDetailsResponse read(StreamInput in) throws IOException {
return new JobDetailsResponse(in);
}

@Override
public void handleResponse(JobDetailsResponse response) {
logger.info(response.getJobDetails().toString());
jobDetailsMap.put(uniqueExtensionId, response.getJobDetails());
inProgressFuture.complete(response);
}

@Override
public void handleException(TransportException exp) {
logger.error(new ParameterizedMessage("Fetch Job Details from extension failed"), exp);
inProgressFuture.completeExceptionally(exp);
}

@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
}