From d4000443c71b1efd2c949a2c64a0a1c326a91e00 Mon Sep 17 00:00:00 2001 From: Yaliang Wu Date: Tue, 8 Nov 2022 16:55:51 +0000 Subject: [PATCH] fix no permission bug when update index mapping (#545) Signed-off-by: Yaliang Wu Signed-off-by: Yaliang Wu Signed-off-by: Sicheng Song --- .../ml/indices/MLIndicesHandler.java | 89 ++++++++++--------- 1 file changed, 45 insertions(+), 44 deletions(-) diff --git a/plugin/src/main/java/org/opensearch/ml/indices/MLIndicesHandler.java b/plugin/src/main/java/org/opensearch/ml/indices/MLIndicesHandler.java index 9dc9aaa2a8..ce60d01c24 100644 --- a/plugin/src/main/java/org/opensearch/ml/indices/MLIndicesHandler.java +++ b/plugin/src/main/java/org/opensearch/ml/indices/MLIndicesHandler.java @@ -57,61 +57,62 @@ public void initMLIndexIfAbsent(MLIndex index, ActionListener listener) String indexName = index.getIndexName(); String mapping = index.getMapping(); - if (!clusterService.state().metadata().hasIndex(indexName)) { - try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) { + try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) { + ActionListener internalListener = ActionListener.runBefore(listener, () -> threadContext.restore()); + if (!clusterService.state().metadata().hasIndex(indexName)) { ActionListener actionListener = ActionListener.wrap(r -> { if (r.isAcknowledged()) { log.info("create index:{}", indexName); - listener.onResponse(true); + internalListener.onResponse(true); } else { - listener.onResponse(false); + internalListener.onResponse(false); } }, e -> { log.error("Failed to create index " + indexName, e); - listener.onFailure(e); + internalListener.onFailure(e); }); CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(mapping); - client.admin().indices().create(request, ActionListener.runBefore(actionListener, () -> threadContext.restore())); - } catch (Exception e) { - log.error("Failed to init index " + indexName, e); - listener.onFailure(e); - } - } else { - log.debug("index:{} is already created", indexName); - if (indexMappingUpdated.containsKey(indexName) && !indexMappingUpdated.get(indexName).get()) { - shouldUpdateIndex(indexName, index.getVersion(), ActionListener.wrap(r -> { - if (r) { - // return true if should update index - client - .admin() - .indices() - .putMapping( - new PutMappingRequest().indices(indexName).source(mapping, XContentType.JSON), - ActionListener.wrap(response -> { - if (response.isAcknowledged()) { - indexMappingUpdated.get(indexName).set(true); - listener.onResponse(true); - } else { - listener.onFailure(new MLException("Failed to update index: " + indexName)); - } - }, exception -> { - log.error("Failed to update index " + indexName, exception); - listener.onFailure(exception); - }) - ); - } else { - // no need to update index if it does not exist or the version is already up-to-date. - indexMappingUpdated.get(indexName).set(true); - listener.onResponse(true); - } - }, e -> { - log.error("Failed to update index mapping", e); - listener.onFailure(e); - })); + client.admin().indices().create(request, actionListener); } else { - // No need to update index if it's not ML system index or it's already updated. - listener.onResponse(true); + log.info("index:{} is already created", indexName); + if (indexMappingUpdated.containsKey(indexName) && !indexMappingUpdated.get(indexName).get()) { + shouldUpdateIndex(indexName, index.getVersion(), ActionListener.wrap(r -> { + if (r) { + // return true if should update index + client + .admin() + .indices() + .putMapping( + new PutMappingRequest().indices(indexName).source(mapping, XContentType.JSON), + ActionListener.wrap(response -> { + if (response.isAcknowledged()) { + indexMappingUpdated.get(indexName).set(true); + internalListener.onResponse(true); + } else { + internalListener.onFailure(new MLException("Failed to update index: " + indexName)); + } + }, exception -> { + log.error("Failed to update index " + indexName, exception); + internalListener.onFailure(exception); + }) + ); + } else { + // no need to update index if it does not exist or the version is already up-to-date. + indexMappingUpdated.get(indexName).set(true); + internalListener.onResponse(true); + } + }, e -> { + log.error("Failed to update index mapping", e); + internalListener.onFailure(e); + })); + } else { + // No need to update index if it's not ML system index or it's already updated. + internalListener.onResponse(true); + } } + } catch (Exception e) { + log.error("Failed to init index " + indexName, e); + listener.onFailure(e); } }