Skip to content

Commit

Permalink
removing experimental from the Conversation memory feature (#2592)
Browse files Browse the repository at this point in the history
* removing experimental from the Conversation memory feature

Signed-off-by: Dhrubo Saha <[email protected]>

* reflecting another unit test

Signed-off-by: Dhrubo Saha <[email protected]>

---------

Signed-off-by: Dhrubo Saha <[email protected]>
  • Loading branch information
dhrubo-os authored Jul 1, 2024
1 parent f61ab17 commit c4cf1b2
Show file tree
Hide file tree
Showing 21 changed files with 82 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class ConversationalIndexConstants {
/** Mappings for the conversational metadata index */
public final static String META_MAPPING = "{\n"
+ " \"_meta\": {\n"
+ " \"schema_version\": " + META_INDEX_SCHEMA_VERSION + "\n"
+ " \"schema_version\": " + META_INDEX_SCHEMA_VERSION + "\n"
+ " },\n"
+ " \"properties\": {\n"
+ " \""
Expand Down Expand Up @@ -86,7 +86,7 @@ public class ConversationalIndexConstants {
/** Mappings for the interactions index */
public final static String INTERACTIONS_MAPPINGS = "{\n"
+ " \"_meta\": {\n"
+ " \"schema_version\": " + INTERACTIONS_INDEX_SCHEMA_VERSION + "\n"
+ " \"schema_version\": " + INTERACTIONS_INDEX_SCHEMA_VERSION + "\n"
+ " },\n"
+ " \"properties\": {\n"
+ " \""
Expand Down Expand Up @@ -122,4 +122,6 @@ public class ConversationalIndexConstants {
/** Feature Flag setting for conversational memory */
public static final Setting<Boolean> ML_COMMONS_MEMORY_FEATURE_ENABLED = Setting
.boolSetting("plugins.ml_commons.memory_feature_enabled", true, Setting.Property.NodeScope, Setting.Property.Dynamic);
}

public static final String ML_COMMONS_MEMORY_FEATURE_DISABLED_MESSAGE = "The Conversation Memory feature is not enabled. To enable, please update the setting " + ML_COMMONS_MEMORY_FEATURE_ENABLED.getKey();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.opensearch.ml.memory.action.conversation;

import static org.opensearch.ml.common.conversation.ConversationalIndexConstants.ML_COMMONS_MEMORY_FEATURE_DISABLED_MESSAGE;

import org.opensearch.OpenSearchException;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
Expand Down Expand Up @@ -72,13 +74,7 @@ public CreateConversationTransportAction(
@Override
protected void doExecute(Task task, CreateConversationRequest request, ActionListener<CreateConversationResponse> actionListener) {
if (!featureIsEnabled) {
actionListener
.onFailure(
new OpenSearchException(
"The experimental Conversation Memory feature is not enabled. To enable, please update the setting "
+ ConversationalIndexConstants.ML_COMMONS_MEMORY_FEATURE_ENABLED.getKey()
)
);
actionListener.onFailure(new OpenSearchException(ML_COMMONS_MEMORY_FEATURE_DISABLED_MESSAGE));
return;
}
String name = request.getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.opensearch.ml.memory.action.conversation;

import static org.opensearch.ml.common.conversation.ConversationalIndexConstants.ML_COMMONS_MEMORY_FEATURE_DISABLED_MESSAGE;

import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -44,8 +46,8 @@
@Log4j2
public class CreateInteractionTransportAction extends HandledTransportAction<CreateInteractionRequest, CreateInteractionResponse> {

private ConversationalMemoryHandler cmHandler;
private Client client;
private final ConversationalMemoryHandler cmHandler;
private final Client client;

private volatile boolean featureIsEnabled;

Expand Down Expand Up @@ -77,13 +79,7 @@ public CreateInteractionTransportAction(
@Override
protected void doExecute(Task task, CreateInteractionRequest request, ActionListener<CreateInteractionResponse> actionListener) {
if (!featureIsEnabled) {
actionListener
.onFailure(
new OpenSearchException(
"The experimental Conversation Memory feature is not enabled. To enable, please update the setting "
+ ConversationalIndexConstants.ML_COMMONS_MEMORY_FEATURE_ENABLED.getKey()
)
);
actionListener.onFailure(new OpenSearchException(ML_COMMONS_MEMORY_FEATURE_DISABLED_MESSAGE));
return;
}
String cid = request.getConversationId();
Expand All @@ -95,18 +91,18 @@ protected void doExecute(Task task, CreateInteractionRequest request, ActionList
String parintIid = request.getParentIid();
Integer traceNumber = request.getTraceNumber();
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().newStoredContext(true)) {
ActionListener<CreateInteractionResponse> internalListener = ActionListener.runBefore(actionListener, () -> context.restore());
ActionListener<CreateInteractionResponse> internalListener = ActionListener.runBefore(actionListener, context::restore);
ActionListener<String> al = ActionListener.wrap(iid -> {
cmHandler.updateConversation(cid, new HashMap<>(), getUpdateResponseListener(cid, iid, internalListener));
log.info("Updating the memory {} after the message {} is created", cid, iid);
}, e -> { internalListener.onFailure(e); });
}, internalListener::onFailure);
if (parintIid == null || traceNumber == null) {
cmHandler.createInteraction(cid, inp, prompt, rsp, ogn, additionalInfo, al);
} else {
cmHandler.createInteraction(cid, inp, prompt, rsp, ogn, additionalInfo, al, parintIid, traceNumber);
}
} catch (Exception e) {
log.error("Failed to create message for memory " + cid, e);
log.error("Failed to create message for memory {}", cid, e);
actionListener.onFailure(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.opensearch.ml.memory.action.conversation;

import static org.opensearch.ml.common.conversation.ConversationalIndexConstants.ML_COMMONS_MEMORY_FEATURE_DISABLED_MESSAGE;

import org.opensearch.OpenSearchException;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
Expand Down Expand Up @@ -72,13 +74,7 @@ public DeleteConversationTransportAction(
@Override
public void doExecute(Task task, DeleteConversationRequest request, ActionListener<DeleteConversationResponse> listener) {
if (!featureIsEnabled) {
listener
.onFailure(
new OpenSearchException(
"The experimental Conversation Memory feature is not enabled. To enable, please update the setting "
+ ConversationalIndexConstants.ML_COMMONS_MEMORY_FEATURE_ENABLED.getKey()
)
);
listener.onFailure(new OpenSearchException(ML_COMMONS_MEMORY_FEATURE_DISABLED_MESSAGE));
return;
}
String conversationId = request.getId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.opensearch.ml.memory.action.conversation;

import static org.opensearch.ml.common.conversation.ConversationalIndexConstants.ML_COMMONS_MEMORY_FEATURE_DISABLED_MESSAGE;

import org.opensearch.OpenSearchException;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
Expand All @@ -39,8 +41,8 @@
*/
@Log4j2
public class GetConversationTransportAction extends HandledTransportAction<GetConversationRequest, GetConversationResponse> {
private Client client;
private ConversationalMemoryHandler cmHandler;
private final Client client;
private final ConversationalMemoryHandler cmHandler;

private volatile boolean featureIsEnabled;

Expand Down Expand Up @@ -72,25 +74,18 @@ public GetConversationTransportAction(
@Override
public void doExecute(Task task, GetConversationRequest request, ActionListener<GetConversationResponse> actionListener) {
if (!featureIsEnabled) {
actionListener
.onFailure(
new OpenSearchException(
"The experimental Conversation Memory feature is not enabled. To enable, please update the setting "
+ ConversationalIndexConstants.ML_COMMONS_MEMORY_FEATURE_ENABLED.getKey()
)
);
actionListener.onFailure(new OpenSearchException(ML_COMMONS_MEMORY_FEATURE_DISABLED_MESSAGE));
return;
} else {
String conversationId = request.getConversationId();
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().newStoredContext(true)) {
ActionListener<GetConversationResponse> internalListener = ActionListener
.runBefore(actionListener, () -> context.restore());
ActionListener<GetConversationResponse> internalListener = ActionListener.runBefore(actionListener, context::restore);
ActionListener<ConversationMeta> al = ActionListener.wrap(conversationMeta -> {
internalListener.onResponse(new GetConversationResponse(conversationMeta));
}, e -> { internalListener.onFailure(e); });
}, internalListener::onFailure);
cmHandler.getConversation(conversationId, al);
} catch (Exception e) {
log.error("Failed to get memory " + conversationId, e);
log.error("Failed to get memory {}", conversationId, e);
actionListener.onFailure(e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.opensearch.ml.memory.action.conversation;

import static org.opensearch.ml.common.conversation.ConversationalIndexConstants.ML_COMMONS_MEMORY_FEATURE_DISABLED_MESSAGE;

import java.util.List;

import org.opensearch.OpenSearchException;
Expand All @@ -42,8 +44,8 @@
@Log4j2
public class GetConversationsTransportAction extends HandledTransportAction<GetConversationsRequest, GetConversationsResponse> {

private Client client;
private ConversationalMemoryHandler cmHandler;
private final Client client;
private final ConversationalMemoryHandler cmHandler;

private volatile boolean featureIsEnabled;

Expand Down Expand Up @@ -75,19 +77,13 @@ public GetConversationsTransportAction(
@Override
public void doExecute(Task task, GetConversationsRequest request, ActionListener<GetConversationsResponse> actionListener) {
if (!featureIsEnabled) {
actionListener
.onFailure(
new OpenSearchException(
"The experimental Conversation Memory feature is not enabled. To enable, please update the setting "
+ ConversationalIndexConstants.ML_COMMONS_MEMORY_FEATURE_ENABLED.getKey()
)
);
actionListener.onFailure(new OpenSearchException(ML_COMMONS_MEMORY_FEATURE_DISABLED_MESSAGE));
return;
}
int maxResults = request.getMaxResults();
int from = request.getFrom();
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().newStoredContext(true)) {
ActionListener<GetConversationsResponse> internalListener = ActionListener.runBefore(actionListener, () -> context.restore());
ActionListener<GetConversationsResponse> internalListener = ActionListener.runBefore(actionListener, context::restore);
ActionListener<List<ConversationMeta>> al = ActionListener.wrap(conversations -> {
internalListener
.onResponse(new GetConversationsResponse(conversations, from + maxResults, conversations.size() == maxResults));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.opensearch.ml.memory.action.conversation;

import static org.opensearch.ml.common.conversation.ConversationalIndexConstants.ML_COMMONS_MEMORY_FEATURE_DISABLED_MESSAGE;

import org.opensearch.OpenSearchException;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
Expand All @@ -37,8 +39,8 @@
@Log4j2
public class GetInteractionTransportAction extends HandledTransportAction<GetInteractionRequest, GetInteractionResponse> {

private Client client;
private ConversationalMemoryHandler cmHandler;
private final Client client;
private final ConversationalMemoryHandler cmHandler;

private volatile boolean featureIsEnabled;

Expand Down Expand Up @@ -70,24 +72,18 @@ public GetInteractionTransportAction(
@Override
public void doExecute(Task task, GetInteractionRequest request, ActionListener<GetInteractionResponse> actionListener) {
if (!featureIsEnabled) {
actionListener
.onFailure(
new OpenSearchException(
"The experimental Conversation Memory feature is not enabled. To enable, please update the setting "
+ ConversationalIndexConstants.ML_COMMONS_MEMORY_FEATURE_ENABLED.getKey()
)
);
actionListener.onFailure(new OpenSearchException(ML_COMMONS_MEMORY_FEATURE_DISABLED_MESSAGE));
return;
}
String interactionId = request.getInteractionId();
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().newStoredContext(true)) {
ActionListener<GetInteractionResponse> internalListener = ActionListener.runBefore(actionListener, () -> context.restore());
ActionListener<GetInteractionResponse> internalListener = ActionListener.runBefore(actionListener, context::restore);
ActionListener<Interaction> al = ActionListener.wrap(interaction -> {
internalListener.onResponse(new GetInteractionResponse(interaction));
}, e -> { internalListener.onFailure(e); });
}, internalListener::onFailure);
cmHandler.getInteraction(interactionId, al);
} catch (Exception e) {
log.error("Failed to get message " + interactionId, e);
log.error("Failed to get message {}", interactionId, e);
actionListener.onFailure(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.opensearch.ml.memory.action.conversation;

import static org.opensearch.ml.common.conversation.ConversationalIndexConstants.ML_COMMONS_MEMORY_FEATURE_DISABLED_MESSAGE;

import java.util.List;

import org.opensearch.OpenSearchException;
Expand All @@ -42,8 +44,8 @@
@Log4j2
public class GetInteractionsTransportAction extends HandledTransportAction<GetInteractionsRequest, GetInteractionsResponse> {

private Client client;
private ConversationalMemoryHandler cmHandler;
private final Client client;
private final ConversationalMemoryHandler cmHandler;

private volatile boolean featureIsEnabled;

Expand Down Expand Up @@ -75,26 +77,20 @@ public GetInteractionsTransportAction(
@Override
public void doExecute(Task task, GetInteractionsRequest request, ActionListener<GetInteractionsResponse> actionListener) {
if (!featureIsEnabled) {
actionListener
.onFailure(
new OpenSearchException(
"The experimental Conversation Memory feature is not enabled. To enable, please update the setting "
+ ConversationalIndexConstants.ML_COMMONS_MEMORY_FEATURE_ENABLED.getKey()
)
);
actionListener.onFailure(new OpenSearchException(ML_COMMONS_MEMORY_FEATURE_DISABLED_MESSAGE));
return;
}
int maxResults = request.getMaxResults();
int from = request.getFrom();
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().newStoredContext(true)) {
ActionListener<GetInteractionsResponse> internalListener = ActionListener.runBefore(actionListener, () -> context.restore());
ActionListener<GetInteractionsResponse> internalListener = ActionListener.runBefore(actionListener, context::restore);
ActionListener<List<Interaction>> al = ActionListener.wrap(interactions -> {
internalListener
.onResponse(new GetInteractionsResponse(interactions, from + maxResults, interactions.size() == maxResults));
}, e -> { internalListener.onFailure(e); });
}, internalListener::onFailure);
cmHandler.getInteractions(request.getConversationId(), from, maxResults, al);
} catch (Exception e) {
log.error("Failed to get messages for memory " + request.getConversationId(), e);
log.error("Failed to get messages for memory {}", request.getConversationId(), e);
actionListener.onFailure(e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.opensearch.ml.memory.action.conversation;

import static org.opensearch.ml.common.conversation.ConversationalIndexConstants.ML_COMMONS_MEMORY_FEATURE_DISABLED_MESSAGE;

import org.opensearch.OpenSearchException;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
Expand Down Expand Up @@ -71,13 +73,7 @@ public SearchConversationsTransportAction(
@Override
public void doExecute(Task task, SearchRequest request, ActionListener<SearchResponse> actionListener) {
if (!featureIsEnabled) {
actionListener
.onFailure(
new OpenSearchException(
"The experimental Conversation Memory feature is not enabled. To enable, please update the setting "
+ ConversationalIndexConstants.ML_COMMONS_MEMORY_FEATURE_ENABLED.getKey()
)
);
actionListener.onFailure(new OpenSearchException(ML_COMMONS_MEMORY_FEATURE_DISABLED_MESSAGE));
return;
} else {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().newStoredContext(true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.opensearch.ml.memory.action.conversation;

import static org.opensearch.ml.common.conversation.ConversationalIndexConstants.ML_COMMONS_MEMORY_FEATURE_DISABLED_MESSAGE;

import org.opensearch.OpenSearchException;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.ActionFilters;
Expand All @@ -37,8 +39,8 @@
@Log4j2
public class SearchInteractionsTransportAction extends HandledTransportAction<SearchInteractionsRequest, SearchResponse> {

private ConversationalMemoryHandler cmHandler;
private Client client;
private final ConversationalMemoryHandler cmHandler;
private final Client client;

private volatile boolean featureIsEnabled;

Expand Down Expand Up @@ -70,17 +72,11 @@ public SearchInteractionsTransportAction(
@Override
public void doExecute(Task task, SearchInteractionsRequest request, ActionListener<SearchResponse> actionListener) {
if (!featureIsEnabled) {
actionListener
.onFailure(
new OpenSearchException(
"The experimental Conversation Memory feature is not enabled. To enable, please update the setting "
+ ConversationalIndexConstants.ML_COMMONS_MEMORY_FEATURE_ENABLED.getKey()
)
);
actionListener.onFailure(new OpenSearchException(ML_COMMONS_MEMORY_FEATURE_DISABLED_MESSAGE));
return;
} else {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().newStoredContext(true)) {
ActionListener<SearchResponse> internalListener = ActionListener.runBefore(actionListener, () -> context.restore());
ActionListener<SearchResponse> internalListener = ActionListener.runBefore(actionListener, context::restore);
cmHandler.searchInteractions(request.getConversationId(), request, internalListener);
} catch (Exception e) {
log.error("Failed to search memories", e);
Expand Down
Loading

0 comments on commit c4cf1b2

Please sign in to comment.