From 4297e9197bdd71b523229df3a90135d489f084a6 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 20 Nov 2024 22:55:52 +0000 Subject: [PATCH 1/4] Clean up `TransportRemoteClusterStatsAction` (#117119) No need to have an `ActionType<>` here since we never register this as an action the `Client` can invoke. Also no need to use a dummy constructor parameter just to trick the injector into instantiating it, we can instantiate it ourselves like we do with all other subsidiary transport-only actions. Also fixes the parent task so the remote action is a child of the local action rather than a sibling. --- .../cluster/stats/ClusterStatsRequest.java | 7 ++++--- .../stats/TransportClusterStatsAction.java | 10 ++++++---- .../TransportRemoteClusterStatsAction.java | 18 +++++++++--------- 3 files changed, 19 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsRequest.java index d8db2c5e657b4..ce9b48666d6ed 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsRequest.java @@ -48,9 +48,10 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId, return new CancellableTask(id, type, action, "", parentTaskId, headers); } - public ClusterStatsRequest asRemoteStats() { - this.remoteStats = true; - return this; + public static ClusterStatsRequest newRemoteClusterStatsRequest() { + final var request = new ClusterStatsRequest(); + request.remoteStats = true; + return request; } /** diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java index 36b018b5002eb..97585ea9a1024 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.support.RefCountingListener; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.nodes.TransportNodesAction; +import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterSnapshotStats; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; @@ -108,20 +109,19 @@ public class TransportClusterStatsAction extends TransportNodesAction< private final MetadataStatsCache mappingStatsCache; private final MetadataStatsCache analysisStatsCache; private final RemoteClusterService remoteClusterService; - private final TransportRemoteClusterStatsAction remoteClusterStatsAction; @Inject public TransportClusterStatsAction( ThreadPool threadPool, ClusterService clusterService, TransportService transportService, + Client client, NodeService nodeService, IndicesService indicesService, RepositoriesService repositoriesService, UsageService usageService, ActionFilters actionFilters, - Settings settings, - TransportRemoteClusterStatsAction remoteClusterStatsAction + Settings settings ) { super( TYPE.name(), @@ -141,7 +141,9 @@ public TransportClusterStatsAction( this.analysisStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), AnalysisStats::of); this.remoteClusterService = transportService.getRemoteClusterService(); this.settings = settings; - this.remoteClusterStatsAction = remoteClusterStatsAction; + + // register remote-cluster action with transport service only and not as a local-node Action that the Client can invoke + new TransportRemoteClusterStatsAction(client, transportService, actionFilters); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportRemoteClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportRemoteClusterStatsAction.java index 4d57f10807af6..882aaa8b18e15 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportRemoteClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportRemoteClusterStatsAction.java @@ -10,11 +10,11 @@ package org.elasticsearch.action.admin.cluster.stats; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionType; import org.elasticsearch.action.RemoteClusterActionType; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; -import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.client.internal.ParentTaskAssigningClient; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.tasks.Task; @@ -27,26 +27,26 @@ public class TransportRemoteClusterStatsAction extends HandledTransportAction { public static final String NAME = "cluster:monitor/stats/remote"; - public static final ActionType TYPE = new ActionType<>(NAME); public static final RemoteClusterActionType REMOTE_TYPE = new RemoteClusterActionType<>( NAME, RemoteClusterStatsResponse::new ); - private final NodeClient client; + + private final Client client; + private final TransportService transportService; @Inject - public TransportRemoteClusterStatsAction(NodeClient client, TransportService transportService, ActionFilters actionFilters) { + public TransportRemoteClusterStatsAction(Client client, TransportService transportService, ActionFilters actionFilters) { super(NAME, transportService, actionFilters, RemoteClusterStatsRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE); this.client = client; + this.transportService = transportService; } @Override protected void doExecute(Task task, RemoteClusterStatsRequest request, ActionListener listener) { - ClusterStatsRequest subRequest = new ClusterStatsRequest().asRemoteStats(); - subRequest.setParentTask(request.getParentTask()); - client.execute( + new ParentTaskAssigningClient(client, transportService.getLocalNode(), task).execute( TransportClusterStatsAction.TYPE, - subRequest, + ClusterStatsRequest.newRemoteClusterStatsRequest(), listener.map( clusterStatsResponse -> new RemoteClusterStatsResponse( clusterStatsResponse.getClusterUUID(), From 4f46924f36640e684abccfbc3656b7335908136e Mon Sep 17 00:00:00 2001 From: Jack Conradson Date: Wed, 20 Nov 2024 15:05:42 -0800 Subject: [PATCH 2/4] Split plugin loading into two different phases to support entitlements (#116998) This change loads all the modules and creates the module layers for plugins prior to entitlement checking during the 2nd phase of bootstrap initialization. This will allow us to know what modules exist for both validation and checking prior to actually loading any plugin classes (in a follow up change). There are now two classes: PluginsLoader which does the module loading and layer creation PluginsService which uses a PluginsLoader to create the main plugin classes and start the plugins --- .../script/ScriptScoreBenchmark.java | 4 +- .../elasticsearch/bootstrap/Bootstrap.java | 12 + .../bootstrap/Elasticsearch.java | 6 +- .../java/org/elasticsearch/node/Node.java | 5 +- .../elasticsearch/node/NodeConstruction.java | 8 +- .../node/NodeServiceProvider.java | 5 +- .../elasticsearch/plugins/PluginsLoader.java | 461 ++++++++++++++++++ .../elasticsearch/plugins/PluginsService.java | 425 ++-------------- .../plugins/PluginsLoaderTests.java | 31 ++ .../plugins/PluginsServiceTests.java | 23 +- .../java/org/elasticsearch/node/MockNode.java | 8 +- .../plugins/MockPluginsService.java | 18 +- .../bench/WatcherScheduleEngineBenchmark.java | 22 +- 13 files changed, 591 insertions(+), 437 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/plugins/PluginsLoader.java create mode 100644 server/src/test/java/org/elasticsearch/plugins/PluginsLoaderTests.java diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/script/ScriptScoreBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/script/ScriptScoreBenchmark.java index 3790be5f279d1..d44586ef4901a 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/script/ScriptScoreBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/script/ScriptScoreBenchmark.java @@ -34,6 +34,7 @@ import org.elasticsearch.index.mapper.NumberFieldMapper.NumberType; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.plugins.PluginsLoader; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.plugins.ScriptPlugin; import org.elasticsearch.script.DocReader; @@ -76,8 +77,7 @@ public class ScriptScoreBenchmark { private final PluginsService pluginsService = new PluginsService( Settings.EMPTY, null, - null, - Path.of(System.getProperty("plugins.dir")) + new PluginsLoader(null, Path.of(System.getProperty("plugins.dir"))) ); private final ScriptModule scriptModule = new ScriptModule(Settings.EMPTY, pluginsService.filterPlugins(ScriptPlugin.class).toList()); diff --git a/server/src/main/java/org/elasticsearch/bootstrap/Bootstrap.java b/server/src/main/java/org/elasticsearch/bootstrap/Bootstrap.java index 699198a8e22c2..56d185645e149 100644 --- a/server/src/main/java/org/elasticsearch/bootstrap/Bootstrap.java +++ b/server/src/main/java/org/elasticsearch/bootstrap/Bootstrap.java @@ -17,6 +17,7 @@ import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.env.Environment; import org.elasticsearch.node.NodeValidationException; +import org.elasticsearch.plugins.PluginsLoader; import java.io.PrintStream; @@ -42,6 +43,9 @@ class Bootstrap { // the loaded settings for the node, not valid until after phase 2 of initialization private final SetOnce nodeEnv = new SetOnce<>(); + // loads information about plugins required for entitlements in phase 2, used by plugins service in phase 3 + private final SetOnce pluginsLoader = new SetOnce<>(); + Bootstrap(PrintStream out, PrintStream err, ServerArgs args) { this.out = out; this.err = err; @@ -72,6 +76,14 @@ Environment environment() { return nodeEnv.get(); } + void setPluginsLoader(PluginsLoader pluginsLoader) { + this.pluginsLoader.set(pluginsLoader); + } + + PluginsLoader pluginsLoader() { + return pluginsLoader.get(); + } + void exitWithNodeValidationException(NodeValidationException e) { Logger logger = LogManager.getLogger(Elasticsearch.class); logger.error("node validation exception\n{}", e.getMessage()); diff --git a/server/src/main/java/org/elasticsearch/bootstrap/Elasticsearch.java b/server/src/main/java/org/elasticsearch/bootstrap/Elasticsearch.java index 2a83f749e7d33..77875e65ab9b8 100644 --- a/server/src/main/java/org/elasticsearch/bootstrap/Elasticsearch.java +++ b/server/src/main/java/org/elasticsearch/bootstrap/Elasticsearch.java @@ -41,6 +41,7 @@ import org.elasticsearch.nativeaccess.NativeAccess; import org.elasticsearch.node.Node; import org.elasticsearch.node.NodeValidationException; +import org.elasticsearch.plugins.PluginsLoader; import java.io.IOException; import java.io.InputStream; @@ -199,6 +200,9 @@ private static void initPhase2(Bootstrap bootstrap) throws IOException { VectorUtil.class ); + // load the plugin Java modules and layers now for use in entitlements + bootstrap.setPluginsLoader(new PluginsLoader(nodeEnv.modulesFile(), nodeEnv.pluginsFile())); + if (Boolean.parseBoolean(System.getProperty("es.entitlements.enabled"))) { logger.info("Bootstrapping Entitlements"); EntitlementBootstrap.bootstrap(); @@ -244,7 +248,7 @@ private static void ensureInitialized(Class... classes) { private static void initPhase3(Bootstrap bootstrap) throws IOException, NodeValidationException { checkLucene(); - Node node = new Node(bootstrap.environment()) { + Node node = new Node(bootstrap.environment(), bootstrap.pluginsLoader()) { @Override protected void validateNodeBeforeAcceptingRequests( final BootstrapContext context, diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index ec4a534fc883b..80c9aafaa84b4 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -69,6 +69,7 @@ import org.elasticsearch.plugins.ClusterPlugin; import org.elasticsearch.plugins.MetadataUpgrader; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.PluginsLoader; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.readiness.ReadinessService; import org.elasticsearch.repositories.RepositoriesService; @@ -180,8 +181,8 @@ public class Node implements Closeable { * * @param environment the initial environment for this node, which will be added to by plugins */ - public Node(Environment environment) { - this(NodeConstruction.prepareConstruction(environment, new NodeServiceProvider(), true)); + public Node(Environment environment, PluginsLoader pluginsLoader) { + this(NodeConstruction.prepareConstruction(environment, pluginsLoader, new NodeServiceProvider(), true)); } /** diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index caf65c05cf27d..e1fc586424dec 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -164,6 +164,7 @@ import org.elasticsearch.plugins.NetworkPlugin; import org.elasticsearch.plugins.PersistentTaskPlugin; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.PluginsLoader; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.plugins.RecoveryPlannerPlugin; import org.elasticsearch.plugins.ReloadablePlugin; @@ -260,6 +261,7 @@ class NodeConstruction { */ static NodeConstruction prepareConstruction( Environment initialEnvironment, + PluginsLoader pluginsLoader, NodeServiceProvider serviceProvider, boolean forbidPrivateIndexSettings ) { @@ -267,7 +269,7 @@ static NodeConstruction prepareConstruction( try { NodeConstruction constructor = new NodeConstruction(closeables); - Settings settings = constructor.createEnvironment(initialEnvironment, serviceProvider); + Settings settings = constructor.createEnvironment(initialEnvironment, serviceProvider, pluginsLoader); constructor.loadLoggingDataProviders(); TelemetryProvider telemetryProvider = constructor.createTelemetryProvider(settings); ThreadPool threadPool = constructor.createThreadPool(settings, telemetryProvider.getMeterRegistry()); @@ -400,7 +402,7 @@ private static Optional getSinglePlugin(Stream plugins, Class plugi return Optional.of(plugin); } - private Settings createEnvironment(Environment initialEnvironment, NodeServiceProvider serviceProvider) { + private Settings createEnvironment(Environment initialEnvironment, NodeServiceProvider serviceProvider, PluginsLoader pluginsLoader) { // Pass the node settings to the DeprecationLogger class so that it can have the deprecation.skip_deprecated_settings setting: Settings envSettings = initialEnvironment.settings(); DeprecationLogger.initialize(envSettings); @@ -473,7 +475,7 @@ private Settings createEnvironment(Environment initialEnvironment, NodeServicePr (e, apmConfig) -> logger.error("failed to delete temporary APM config file [{}], reason: [{}]", apmConfig, e.getMessage()) ); - pluginsService = serviceProvider.newPluginService(initialEnvironment, envSettings); + pluginsService = serviceProvider.newPluginService(initialEnvironment, pluginsLoader); modules.bindToInstance(PluginsService.class, pluginsService); Settings settings = Node.mergePluginSettings(pluginsService.pluginMap(), envSettings); diff --git a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java index f18655afb8f02..8f2dc4e532ae0 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java +++ b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java @@ -27,6 +27,7 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.recovery.RecoverySettings; +import org.elasticsearch.plugins.PluginsLoader; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.readiness.ReadinessService; import org.elasticsearch.script.ScriptContext; @@ -51,9 +52,9 @@ */ class NodeServiceProvider { - PluginsService newPluginService(Environment environment, Settings settings) { + PluginsService newPluginService(Environment initialEnvironment, PluginsLoader pluginsLoader) { // this creates a PluginsService with an empty list of classpath plugins - return new PluginsService(settings, environment.configFile(), environment.modulesFile(), environment.pluginsFile()); + return new PluginsService(initialEnvironment.settings(), initialEnvironment.configFile(), pluginsLoader); } ScriptService newScriptService( diff --git a/server/src/main/java/org/elasticsearch/plugins/PluginsLoader.java b/server/src/main/java/org/elasticsearch/plugins/PluginsLoader.java new file mode 100644 index 0000000000000..6b3eda6c0c9b4 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/plugins/PluginsLoader.java @@ -0,0 +1,461 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.plugins; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.core.PathUtils; +import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.jdk.JarHell; +import org.elasticsearch.jdk.ModuleQualifiedExportsService; + +import java.io.IOException; +import java.lang.ModuleLayer.Controller; +import java.lang.module.Configuration; +import java.lang.module.ModuleFinder; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.file.Path; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Stream; + +import static org.elasticsearch.common.io.FileSystemUtils.isAccessibleDirectory; +import static org.elasticsearch.jdk.ModuleQualifiedExportsService.addExportsService; +import static org.elasticsearch.jdk.ModuleQualifiedExportsService.exposeQualifiedExportsAndOpens; + +/** + * This class is used to load modules and module layers for each plugin during + * node initialization prior to enablement of entitlements. This allows entitlements + * to have all the plugin information they need prior to starting. + */ +public class PluginsLoader { + + /** + * Contains information about the {@link ClassLoader} required to load a plugin + */ + public interface PluginLayer { + /** + * @return Information about the bundle of jars used in this plugin + */ + PluginBundle pluginBundle(); + + /** + * @return The {@link ClassLoader} used to instantiate the main class for the plugin + */ + ClassLoader pluginClassLoader(); + } + + /** + * Contains information about the {@link ClassLoader}s and {@link ModuleLayer} required for loading a plugin + * @param pluginBundle Information about the bundle of jars used in this plugin + * @param pluginClassLoader The {@link ClassLoader} used to instantiate the main class for the plugin + * @param spiClassLoader The exported {@link ClassLoader} visible to other Java modules + * @param spiModuleLayer The exported {@link ModuleLayer} visible to other Java modules + */ + private record LoadedPluginLayer( + PluginBundle pluginBundle, + ClassLoader pluginClassLoader, + ClassLoader spiClassLoader, + ModuleLayer spiModuleLayer + ) implements PluginLayer { + + public LoadedPluginLayer { + Objects.requireNonNull(pluginBundle); + Objects.requireNonNull(pluginClassLoader); + Objects.requireNonNull(spiClassLoader); + Objects.requireNonNull(spiModuleLayer); + } + } + + /** + * Tuple of module layer and loader. + * Modular Plugins have a plugin specific loader and layer. + * Non-Modular plugins have a plugin specific loader and the boot layer. + */ + public record LayerAndLoader(ModuleLayer layer, ClassLoader loader) { + + public LayerAndLoader { + Objects.requireNonNull(layer); + Objects.requireNonNull(loader); + } + + public static LayerAndLoader ofLoader(ClassLoader loader) { + return new LayerAndLoader(ModuleLayer.boot(), loader); + } + } + + private static final Logger logger = LogManager.getLogger(PluginsLoader.class); + private static final Module serverModule = PluginsLoader.class.getModule(); + + private final List moduleDescriptors; + private final List pluginDescriptors; + private final Map loadedPluginLayers; + + /** + * Constructs a new PluginsLoader + * + * @param modulesDirectory The directory modules exist in, or null if modules should not be loaded from the filesystem + * @param pluginsDirectory The directory plugins exist in, or null if plugins should not be loaded from the filesystem + */ + @SuppressWarnings("this-escape") + public PluginsLoader(Path modulesDirectory, Path pluginsDirectory) { + + Map> qualifiedExports = new HashMap<>(ModuleQualifiedExportsService.getBootServices()); + addServerExportsService(qualifiedExports); + + Set seenBundles = new LinkedHashSet<>(); + + // load (elasticsearch) module layers + if (modulesDirectory != null) { + try { + Set modules = PluginsUtils.getModuleBundles(modulesDirectory); + moduleDescriptors = modules.stream().map(PluginBundle::pluginDescriptor).toList(); + seenBundles.addAll(modules); + } catch (IOException ex) { + throw new IllegalStateException("Unable to initialize modules", ex); + } + } else { + moduleDescriptors = Collections.emptyList(); + } + + // load plugin layers + if (pluginsDirectory != null) { + try { + // TODO: remove this leniency, but tests bogusly rely on it + if (isAccessibleDirectory(pluginsDirectory, logger)) { + PluginsUtils.checkForFailedPluginRemovals(pluginsDirectory); + Set plugins = PluginsUtils.getPluginBundles(pluginsDirectory); + pluginDescriptors = plugins.stream().map(PluginBundle::pluginDescriptor).toList(); + seenBundles.addAll(plugins); + } else { + pluginDescriptors = Collections.emptyList(); + } + } catch (IOException ex) { + throw new IllegalStateException("Unable to initialize plugins", ex); + } + } else { + pluginDescriptors = Collections.emptyList(); + } + + this.loadedPluginLayers = Collections.unmodifiableMap(loadPluginLayers(seenBundles, qualifiedExports)); + } + + public List moduleDescriptors() { + return moduleDescriptors; + } + + public List pluginDescriptors() { + return pluginDescriptors; + } + + public Stream pluginLayers() { + return loadedPluginLayers.values().stream().map(Function.identity()); + } + + private Map loadPluginLayers( + Set bundles, + Map> qualifiedExports + ) { + Map loaded = new LinkedHashMap<>(); + Map> transitiveUrls = new HashMap<>(); + List sortedBundles = PluginsUtils.sortBundles(bundles); + if (sortedBundles.isEmpty() == false) { + Set systemLoaderURLs = JarHell.parseModulesAndClassPath(); + for (PluginBundle bundle : sortedBundles) { + PluginsUtils.checkBundleJarHell(systemLoaderURLs, bundle, transitiveUrls); + loadPluginLayer(bundle, loaded, qualifiedExports); + } + } + + return loaded; + } + + private void loadPluginLayer( + PluginBundle bundle, + Map loaded, + Map> qualifiedExports + ) { + String name = bundle.plugin.getName(); + logger.debug(() -> "Loading bundle: " + name); + + PluginsUtils.verifyCompatibility(bundle.plugin); + + // collect the list of extended plugins + List extendedPlugins = new ArrayList<>(); + for (String extendedPluginName : bundle.plugin.getExtendedPlugins()) { + LoadedPluginLayer extendedPlugin = loaded.get(extendedPluginName); + assert extendedPlugin != null; + assert extendedPlugin.spiClassLoader() != null : "All non-classpath plugins should be loaded with a classloader"; + extendedPlugins.add(extendedPlugin); + } + + final ClassLoader parentLoader = ExtendedPluginsClassLoader.create( + getClass().getClassLoader(), + extendedPlugins.stream().map(LoadedPluginLayer::spiClassLoader).toList() + ); + LayerAndLoader spiLayerAndLoader = null; + if (bundle.hasSPI()) { + spiLayerAndLoader = createSPI(bundle, parentLoader, extendedPlugins, qualifiedExports); + } + + final ClassLoader pluginParentLoader = spiLayerAndLoader == null ? parentLoader : spiLayerAndLoader.loader(); + final LayerAndLoader pluginLayerAndLoader = createPlugin( + bundle, + pluginParentLoader, + extendedPlugins, + spiLayerAndLoader, + qualifiedExports + ); + final ClassLoader pluginClassLoader = pluginLayerAndLoader.loader(); + + if (spiLayerAndLoader == null) { + // use full implementation for plugins extending this one + spiLayerAndLoader = pluginLayerAndLoader; + } + + loaded.put(name, new LoadedPluginLayer(bundle, pluginClassLoader, spiLayerAndLoader.loader, spiLayerAndLoader.layer)); + } + + static LayerAndLoader createSPI( + PluginBundle bundle, + ClassLoader parentLoader, + List extendedPlugins, + Map> qualifiedExports + ) { + final PluginDescriptor plugin = bundle.plugin; + if (plugin.getModuleName().isPresent()) { + logger.debug(() -> "Loading bundle: " + plugin.getName() + ", creating spi, modular"); + return createSpiModuleLayer( + bundle.spiUrls, + parentLoader, + extendedPlugins.stream().map(LoadedPluginLayer::spiModuleLayer).toList(), + qualifiedExports + ); + } else { + logger.debug(() -> "Loading bundle: " + plugin.getName() + ", creating spi, non-modular"); + return LayerAndLoader.ofLoader(URLClassLoader.newInstance(bundle.spiUrls.toArray(new URL[0]), parentLoader)); + } + } + + static LayerAndLoader createPlugin( + PluginBundle bundle, + ClassLoader pluginParentLoader, + List extendedPlugins, + LayerAndLoader spiLayerAndLoader, + Map> qualifiedExports + ) { + final PluginDescriptor plugin = bundle.plugin; + if (plugin.getModuleName().isPresent()) { + logger.debug(() -> "Loading bundle: " + plugin.getName() + ", modular"); + var parentLayers = Stream.concat( + Stream.ofNullable(spiLayerAndLoader != null ? spiLayerAndLoader.layer() : null), + extendedPlugins.stream().map(LoadedPluginLayer::spiModuleLayer) + ).toList(); + return createPluginModuleLayer(bundle, pluginParentLoader, parentLayers, qualifiedExports); + } else if (plugin.isStable()) { + logger.debug(() -> "Loading bundle: " + plugin.getName() + ", non-modular as synthetic module"); + return LayerAndLoader.ofLoader( + UberModuleClassLoader.getInstance( + pluginParentLoader, + ModuleLayer.boot(), + "synthetic." + toModuleName(plugin.getName()), + bundle.allUrls, + Set.of("org.elasticsearch.server") // TODO: instead of denying server, allow only jvm + stable API modules + ) + ); + } else { + logger.debug(() -> "Loading bundle: " + plugin.getName() + ", non-modular"); + return LayerAndLoader.ofLoader(URLClassLoader.newInstance(bundle.urls.toArray(URL[]::new), pluginParentLoader)); + } + } + + static LayerAndLoader createSpiModuleLayer( + Set urls, + ClassLoader parentLoader, + List parentLayers, + Map> qualifiedExports + ) { + // assert bundle.plugin.getModuleName().isPresent(); + return createModuleLayer( + null, // no entry point + spiModuleName(urls), + urlsToPaths(urls), + parentLoader, + parentLayers, + qualifiedExports + ); + } + + static LayerAndLoader createPluginModuleLayer( + PluginBundle bundle, + ClassLoader parentLoader, + List parentLayers, + Map> qualifiedExports + ) { + assert bundle.plugin.getModuleName().isPresent(); + return createModuleLayer( + bundle.plugin.getClassname(), + bundle.plugin.getModuleName().get(), + urlsToPaths(bundle.urls), + parentLoader, + parentLayers, + qualifiedExports + ); + } + + static LayerAndLoader createModuleLayer( + String className, + String moduleName, + Path[] paths, + ClassLoader parentLoader, + List parentLayers, + Map> qualifiedExports + ) { + logger.debug(() -> "Loading bundle: creating module layer and loader for module " + moduleName); + var finder = ModuleFinder.of(paths); + + var configuration = Configuration.resolveAndBind( + ModuleFinder.of(), + parentConfigurationOrBoot(parentLayers), + finder, + Set.of(moduleName) + ); + var controller = privilegedDefineModulesWithOneLoader(configuration, parentLayersOrBoot(parentLayers), parentLoader); + var pluginModule = controller.layer().findModule(moduleName).get(); + ensureEntryPointAccessible(controller, pluginModule, className); + // export/open upstream modules to this plugin module + exposeQualifiedExportsAndOpens(pluginModule, qualifiedExports); + // configure qualified exports/opens to other modules/plugins + addPluginExportsServices(qualifiedExports, controller); + logger.debug(() -> "Loading bundle: created module layer and loader for module " + moduleName); + return new LayerAndLoader(controller.layer(), privilegedFindLoader(controller.layer(), moduleName)); + } + + /** Determines the module name of the SPI module, given its URL. */ + static String spiModuleName(Set spiURLS) { + ModuleFinder finder = ModuleFinder.of(urlsToPaths(spiURLS)); + var mrefs = finder.findAll(); + assert mrefs.size() == 1 : "Expected a single module, got:" + mrefs; + return mrefs.stream().findFirst().get().descriptor().name(); + } + + // package-visible for testing + static String toModuleName(String name) { + String result = name.replaceAll("\\W+", ".") // replace non-alphanumeric character strings with dots + .replaceAll("(^[^A-Za-z_]*)", "") // trim non-alpha or underscore characters from start + .replaceAll("\\.$", "") // trim trailing dot + .toLowerCase(Locale.getDefault()); + assert ModuleSupport.isPackageName(result); + return result; + } + + static final String toPackageName(String className) { + assert className.endsWith(".") == false; + int index = className.lastIndexOf('.'); + if (index == -1) { + throw new IllegalStateException("invalid class name:" + className); + } + return className.substring(0, index); + } + + @SuppressForbidden(reason = "I need to convert URL's to Paths") + static final Path[] urlsToPaths(Set urls) { + return urls.stream().map(PluginsLoader::uncheckedToURI).map(PathUtils::get).toArray(Path[]::new); + } + + static final URI uncheckedToURI(URL url) { + try { + return url.toURI(); + } catch (URISyntaxException e) { + throw new AssertionError(new IOException(e)); + } + } + + private static List parentConfigurationOrBoot(List parentLayers) { + if (parentLayers == null || parentLayers.isEmpty()) { + return List.of(ModuleLayer.boot().configuration()); + } else { + return parentLayers.stream().map(ModuleLayer::configuration).toList(); + } + } + + /** Ensures that the plugins main class (its entry point), if any, is accessible to the server. */ + private static void ensureEntryPointAccessible(Controller controller, Module pluginModule, String className) { + if (className != null) { + controller.addOpens(pluginModule, toPackageName(className), serverModule); + } + } + + @SuppressWarnings("removal") + static Controller privilegedDefineModulesWithOneLoader(Configuration cf, List parentLayers, ClassLoader parentLoader) { + return AccessController.doPrivileged( + (PrivilegedAction) () -> ModuleLayer.defineModulesWithOneLoader(cf, parentLayers, parentLoader) + ); + } + + @SuppressWarnings("removal") + static ClassLoader privilegedFindLoader(ModuleLayer layer, String name) { + return AccessController.doPrivileged((PrivilegedAction) () -> layer.findLoader(name)); + } + + private static List parentLayersOrBoot(List parentLayers) { + if (parentLayers == null || parentLayers.isEmpty()) { + return List.of(ModuleLayer.boot()); + } else { + return parentLayers; + } + } + + protected void addServerExportsService(Map> qualifiedExports) { + var exportsService = new ModuleQualifiedExportsService(serverModule) { + @Override + protected void addExports(String pkg, Module target) { + serverModule.addExports(pkg, target); + } + + @Override + protected void addOpens(String pkg, Module target) { + serverModule.addOpens(pkg, target); + } + }; + addExportsService(qualifiedExports, exportsService, serverModule.getName()); + } + + private static void addPluginExportsServices(Map> qualifiedExports, Controller controller) { + for (Module module : controller.layer().modules()) { + var exportsService = new ModuleQualifiedExportsService(module) { + @Override + protected void addExports(String pkg, Module target) { + controller.addExports(module, pkg, target); + } + + @Override + protected void addOpens(String pkg, Module target) { + controller.addOpens(module, pkg, target); + } + }; + addExportsService(qualifiedExports, exportsService, module.getName()); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/plugins/PluginsService.java b/server/src/main/java/org/elasticsearch/plugins/PluginsService.java index d5dd6d62d615e..cfdb7aaf0b509 100644 --- a/server/src/main/java/org/elasticsearch/plugins/PluginsService.java +++ b/server/src/main/java/org/elasticsearch/plugins/PluginsService.java @@ -23,34 +23,22 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; -import org.elasticsearch.core.PathUtils; -import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.Tuple; -import org.elasticsearch.jdk.JarHell; -import org.elasticsearch.jdk.ModuleQualifiedExportsService; import org.elasticsearch.node.ReportingService; +import org.elasticsearch.plugins.PluginsLoader.PluginLayer; import org.elasticsearch.plugins.scanners.StablePluginsRegistry; import org.elasticsearch.plugins.spi.SPIClassIterator; import java.io.IOException; -import java.lang.ModuleLayer.Controller; -import java.lang.module.Configuration; -import java.lang.module.ModuleFinder; import java.lang.reflect.Constructor; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; -import java.net.URLClassLoader; import java.nio.file.Path; import java.security.AccessController; import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; -import java.util.LinkedHashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -63,10 +51,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.elasticsearch.common.io.FileSystemUtils.isAccessibleDirectory; -import static org.elasticsearch.jdk.ModuleQualifiedExportsService.addExportsService; -import static org.elasticsearch.jdk.ModuleQualifiedExportsService.exposeQualifiedExportsAndOpens; - public class PluginsService implements ReportingService { public StablePluginsRegistry getStablePluginRegistry() { @@ -77,33 +61,18 @@ public StablePluginsRegistry getStablePluginRegistry() { * A loaded plugin is one for which Elasticsearch has successfully constructed an instance of the plugin's class * @param descriptor Metadata about the plugin, usually loaded from plugin properties * @param instance The constructed instance of the plugin's main class - * @param loader The classloader for the plugin - * @param layer The module layer for the plugin */ - record LoadedPlugin(PluginDescriptor descriptor, Plugin instance, ClassLoader loader, ModuleLayer layer) { + record LoadedPlugin(PluginDescriptor descriptor, Plugin instance) { LoadedPlugin { Objects.requireNonNull(descriptor); Objects.requireNonNull(instance); - Objects.requireNonNull(loader); - Objects.requireNonNull(layer); - } - - /** - * Creates a loaded classpath plugin. A classpath plugin is a plugin loaded - * by the system classloader and defined to the unnamed module of the boot layer. - */ - LoadedPlugin(PluginDescriptor descriptor, Plugin instance) { - this(descriptor, instance, PluginsService.class.getClassLoader(), ModuleLayer.boot()); } } private static final Logger logger = LogManager.getLogger(PluginsService.class); private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(PluginsService.class); - private final Settings settings; - private final Path configPath; - /** * We keep around a list of plugins and modules. The order of * this list is that which the plugins and modules were loaded in. @@ -117,69 +86,32 @@ record LoadedPlugin(PluginDescriptor descriptor, Plugin instance, ClassLoader lo /** * Constructs a new PluginService * - * @param settings The settings of the system - * @param modulesDirectory The directory modules exist in, or null if modules should not be loaded from the filesystem - * @param pluginsDirectory The directory plugins exist in, or null if plugins should not be loaded from the filesystem + * @param settings The settings for this node + * @param configPath The configuration path for this node + * @param pluginsLoader the information required to complete loading of plugins */ - @SuppressWarnings("this-escape") - public PluginsService(Settings settings, Path configPath, Path modulesDirectory, Path pluginsDirectory) { - this.settings = settings; - this.configPath = configPath; - - Map> qualifiedExports = new HashMap<>(ModuleQualifiedExportsService.getBootServices()); - addServerExportsService(qualifiedExports); - - Set seenBundles = new LinkedHashSet<>(); - - // load modules - List modulesList = new ArrayList<>(); - Set moduleNameList = new HashSet<>(); - if (modulesDirectory != null) { - try { - Set modules = PluginsUtils.getModuleBundles(modulesDirectory); - modules.stream().map(PluginBundle::pluginDescriptor).forEach(m -> { - modulesList.add(m); - moduleNameList.add(m.getName()); - }); - seenBundles.addAll(modules); - } catch (IOException ex) { - throw new IllegalStateException("Unable to initialize modules", ex); - } - } + public PluginsService(Settings settings, Path configPath, PluginsLoader pluginsLoader) { + Map loadedPlugins = loadPluginBundles(settings, configPath, pluginsLoader); - // load plugins - List pluginsList = new ArrayList<>(); - if (pluginsDirectory != null) { - try { - // TODO: remove this leniency, but tests bogusly rely on it - if (isAccessibleDirectory(pluginsDirectory, logger)) { - PluginsUtils.checkForFailedPluginRemovals(pluginsDirectory); - Set plugins = PluginsUtils.getPluginBundles(pluginsDirectory); - plugins.stream().map(PluginBundle::pluginDescriptor).forEach(pluginsList::add); - seenBundles.addAll(plugins); - } - } catch (IOException ex) { - throw new IllegalStateException("Unable to initialize plugins", ex); - } - } - - LinkedHashMap loadedPlugins = loadBundles(seenBundles, qualifiedExports); + var modulesDescriptors = pluginsLoader.moduleDescriptors(); + var pluginDescriptors = pluginsLoader.pluginDescriptors(); var inspector = PluginIntrospector.getInstance(); - this.info = new PluginsAndModules(getRuntimeInfos(inspector, pluginsList, loadedPlugins), modulesList); + this.info = new PluginsAndModules(getRuntimeInfos(inspector, pluginDescriptors, loadedPlugins), modulesDescriptors); this.plugins = List.copyOf(loadedPlugins.values()); - checkDeprecations(inspector, pluginsList, loadedPlugins); + checkDeprecations(inspector, pluginDescriptors, loadedPlugins); checkMandatoryPlugins( - pluginsList.stream().map(PluginDescriptor::getName).collect(Collectors.toSet()), + pluginDescriptors.stream().map(PluginDescriptor::getName).collect(Collectors.toSet()), new HashSet<>(MANDATORY_SETTING.get(settings)) ); // we don't log jars in lib/ we really shouldn't log modules, // but for now: just be transparent so we can debug any potential issues + Set moduleNames = new HashSet<>(modulesDescriptors.stream().map(PluginDescriptor::getName).toList()); for (String name : loadedPlugins.keySet()) { - if (moduleNameList.contains(name)) { + if (moduleNames.contains(name)) { logger.info("loaded module [{}]", name); } else { logger.info("loaded plugin [{}]", name); @@ -282,23 +214,11 @@ protected List plugins() { return this.plugins; } - private LinkedHashMap loadBundles( - Set bundles, - Map> qualifiedExports - ) { - LinkedHashMap loaded = new LinkedHashMap<>(); - Map> transitiveUrls = new HashMap<>(); - List sortedBundles = PluginsUtils.sortBundles(bundles); - if (sortedBundles.isEmpty() == false) { - Set systemLoaderURLs = JarHell.parseModulesAndClassPath(); - for (PluginBundle bundle : sortedBundles) { - PluginsUtils.checkBundleJarHell(systemLoaderURLs, bundle, transitiveUrls); - loadBundle(bundle, loaded, qualifiedExports); - } - } - - loadExtensions(loaded.values()); - return loaded; + private Map loadPluginBundles(Settings settings, Path configPath, PluginsLoader pluginsLoader) { + Map loadedPlugins = new LinkedHashMap<>(); + pluginsLoader.pluginLayers().forEach(pl -> loadBundle(pl, loadedPlugins, settings, configPath)); + loadExtensions(loadedPlugins.values()); + return loadedPlugins; } // package-private for test visibility @@ -443,68 +363,43 @@ private static String extensionConstructorMessage(Class extensi return "constructor for extension [" + extensionClass.getName() + "] of type [" + extensionPointType.getName() + "]"; } - private void loadBundle( - PluginBundle bundle, - Map loaded, - Map> qualifiedExports - ) { - String name = bundle.plugin.getName(); - logger.debug(() -> "Loading bundle: " + name); - - PluginsUtils.verifyCompatibility(bundle.plugin); + private void loadBundle(PluginLayer pluginLayer, Map loadedPlugins, Settings settings, Path configPath) { + String name = pluginLayer.pluginBundle().plugin.getName(); + logger.debug(() -> "Loading plugin bundle: " + name); - // collect the list of extended plugins + // validate the list of extended plugins List extendedPlugins = new ArrayList<>(); - for (String extendedPluginName : bundle.plugin.getExtendedPlugins()) { - LoadedPlugin extendedPlugin = loaded.get(extendedPluginName); + for (String extendedPluginName : pluginLayer.pluginBundle().plugin.getExtendedPlugins()) { + LoadedPlugin extendedPlugin = loadedPlugins.get(extendedPluginName); assert extendedPlugin != null; if (ExtensiblePlugin.class.isInstance(extendedPlugin.instance()) == false) { throw new IllegalStateException("Plugin [" + name + "] cannot extend non-extensible plugin [" + extendedPluginName + "]"); } - assert extendedPlugin.loader() != null : "All non-classpath plugins should be loaded with a classloader"; extendedPlugins.add(extendedPlugin); logger.debug( - () -> "Loading bundle: " + name + ", ext plugins: " + extendedPlugins.stream().map(lp -> lp.descriptor().getName()).toList() + () -> "Loading plugin bundle: " + + name + + ", ext plugins: " + + extendedPlugins.stream().map(lp -> lp.descriptor().getName()).toList() ); } - final ClassLoader parentLoader = ExtendedPluginsClassLoader.create( - getClass().getClassLoader(), - extendedPlugins.stream().map(LoadedPlugin::loader).toList() - ); - LayerAndLoader spiLayerAndLoader = null; - if (bundle.hasSPI()) { - spiLayerAndLoader = createSPI(bundle, parentLoader, extendedPlugins, qualifiedExports); - } - - final ClassLoader pluginParentLoader = spiLayerAndLoader == null ? parentLoader : spiLayerAndLoader.loader(); - final LayerAndLoader pluginLayerAndLoader = createPlugin( - bundle, - pluginParentLoader, - extendedPlugins, - spiLayerAndLoader, - qualifiedExports - ); - final ClassLoader pluginClassLoader = pluginLayerAndLoader.loader(); - - if (spiLayerAndLoader == null) { - // use full implementation for plugins extending this one - spiLayerAndLoader = pluginLayerAndLoader; - } + PluginBundle pluginBundle = pluginLayer.pluginBundle(); + ClassLoader pluginClassLoader = pluginLayer.pluginClassLoader(); // reload SPI with any new services from the plugin - reloadLuceneSPI(pluginClassLoader); + reloadLuceneSPI(pluginLayer.pluginClassLoader()); ClassLoader cl = Thread.currentThread().getContextClassLoader(); try { // Set context class loader to plugin's class loader so that plugins // that have dependencies with their own SPI endpoints have a chance to load // and initialize them appropriately. - privilegedSetContextClassLoader(pluginClassLoader); + privilegedSetContextClassLoader(pluginLayer.pluginClassLoader()); Plugin plugin; - if (bundle.pluginDescriptor().isStable()) { - stablePluginsRegistry.scanBundleForStablePlugins(bundle, pluginClassLoader); + if (pluginBundle.pluginDescriptor().isStable()) { + stablePluginsRegistry.scanBundleForStablePlugins(pluginBundle, pluginClassLoader); /* Contrary to old plugins we don't need an instance of the plugin here. Stable plugin register components (like CharFilterFactory) in stable plugin registry, which is then used in AnalysisModule @@ -514,16 +409,16 @@ Stable plugin register components (like CharFilterFactory) in stable plugin regi We need to pass a name though so that we can show that a plugin was loaded (via cluster state api) This might need to be revisited once support for settings is added */ - plugin = new StablePluginPlaceHolder(bundle.plugin.getName()); + plugin = new StablePluginPlaceHolder(pluginBundle.plugin.getName()); } else { - Class pluginClass = loadPluginClass(bundle.plugin.getClassname(), pluginClassLoader); + Class pluginClass = loadPluginClass(pluginBundle.plugin.getClassname(), pluginClassLoader); if (pluginClassLoader != pluginClass.getClassLoader()) { throw new IllegalStateException( "Plugin [" + name + "] must reference a class loader local Plugin class [" - + bundle.plugin.getClassname() + + pluginBundle.plugin.getClassname() + "] (class loader [" + pluginClass.getClassLoader() + "])" @@ -531,75 +426,12 @@ We need to pass a name though so that we can show that a plugin was loaded (via } plugin = loadPlugin(pluginClass, settings, configPath); } - loaded.put(name, new LoadedPlugin(bundle.plugin, plugin, spiLayerAndLoader.loader(), spiLayerAndLoader.layer())); + loadedPlugins.put(name, new LoadedPlugin(pluginBundle.plugin, plugin)); } finally { privilegedSetContextClassLoader(cl); } } - static LayerAndLoader createSPI( - PluginBundle bundle, - ClassLoader parentLoader, - List extendedPlugins, - Map> qualifiedExports - ) { - final PluginDescriptor plugin = bundle.plugin; - if (plugin.getModuleName().isPresent()) { - logger.debug(() -> "Loading bundle: " + plugin.getName() + ", creating spi, modular"); - return createSpiModuleLayer( - bundle.spiUrls, - parentLoader, - extendedPlugins.stream().map(LoadedPlugin::layer).toList(), - qualifiedExports - ); - } else { - logger.debug(() -> "Loading bundle: " + plugin.getName() + ", creating spi, non-modular"); - return LayerAndLoader.ofLoader(URLClassLoader.newInstance(bundle.spiUrls.toArray(new URL[0]), parentLoader)); - } - } - - static LayerAndLoader createPlugin( - PluginBundle bundle, - ClassLoader pluginParentLoader, - List extendedPlugins, - LayerAndLoader spiLayerAndLoader, - Map> qualifiedExports - ) { - final PluginDescriptor plugin = bundle.plugin; - if (plugin.getModuleName().isPresent()) { - logger.debug(() -> "Loading bundle: " + plugin.getName() + ", modular"); - var parentLayers = Stream.concat( - Stream.ofNullable(spiLayerAndLoader != null ? spiLayerAndLoader.layer() : null), - extendedPlugins.stream().map(LoadedPlugin::layer) - ).toList(); - return createPluginModuleLayer(bundle, pluginParentLoader, parentLayers, qualifiedExports); - } else if (plugin.isStable()) { - logger.debug(() -> "Loading bundle: " + plugin.getName() + ", non-modular as synthetic module"); - return LayerAndLoader.ofLoader( - UberModuleClassLoader.getInstance( - pluginParentLoader, - ModuleLayer.boot(), - "synthetic." + toModuleName(plugin.getName()), - bundle.allUrls, - Set.of("org.elasticsearch.server") // TODO: instead of denying server, allow only jvm + stable API modules - ) - ); - } else { - logger.debug(() -> "Loading bundle: " + plugin.getName() + ", non-modular"); - return LayerAndLoader.ofLoader(URLClassLoader.newInstance(bundle.urls.toArray(URL[]::new), pluginParentLoader)); - } - } - - // package-visible for testing - static String toModuleName(String name) { - String result = name.replaceAll("\\W+", ".") // replace non-alphanumeric character strings with dots - .replaceAll("(^[^A-Za-z_]*)", "") // trim non-alpha or underscore characters from start - .replaceAll("\\.$", "") // trim trailing dot - .toLowerCase(Locale.getDefault()); - assert ModuleSupport.isPackageName(result); - return result; - } - private static void checkDeprecations( PluginIntrospector inspector, List pluginDescriptors, @@ -706,173 +538,6 @@ public final Stream filterPlugins(Class type) { return plugins().stream().filter(x -> type.isAssignableFrom(x.instance().getClass())).map(p -> ((T) p.instance())); } - static LayerAndLoader createPluginModuleLayer( - PluginBundle bundle, - ClassLoader parentLoader, - List parentLayers, - Map> qualifiedExports - ) { - assert bundle.plugin.getModuleName().isPresent(); - return createModuleLayer( - bundle.plugin.getClassname(), - bundle.plugin.getModuleName().get(), - urlsToPaths(bundle.urls), - parentLoader, - parentLayers, - qualifiedExports - ); - } - - static final LayerAndLoader createSpiModuleLayer( - Set urls, - ClassLoader parentLoader, - List parentLayers, - Map> qualifiedExports - ) { - // assert bundle.plugin.getModuleName().isPresent(); - return createModuleLayer( - null, // no entry point - spiModuleName(urls), - urlsToPaths(urls), - parentLoader, - parentLayers, - qualifiedExports - ); - } - - private static final Module serverModule = PluginsService.class.getModule(); - - static LayerAndLoader createModuleLayer( - String className, - String moduleName, - Path[] paths, - ClassLoader parentLoader, - List parentLayers, - Map> qualifiedExports - ) { - logger.debug(() -> "Loading bundle: creating module layer and loader for module " + moduleName); - var finder = ModuleFinder.of(paths); - - var configuration = Configuration.resolveAndBind( - ModuleFinder.of(), - parentConfigurationOrBoot(parentLayers), - finder, - Set.of(moduleName) - ); - var controller = privilegedDefineModulesWithOneLoader(configuration, parentLayersOrBoot(parentLayers), parentLoader); - var pluginModule = controller.layer().findModule(moduleName).get(); - ensureEntryPointAccessible(controller, pluginModule, className); - // export/open upstream modules to this plugin module - exposeQualifiedExportsAndOpens(pluginModule, qualifiedExports); - // configure qualified exports/opens to other modules/plugins - addPluginExportsServices(qualifiedExports, controller); - logger.debug(() -> "Loading bundle: created module layer and loader for module " + moduleName); - return new LayerAndLoader(controller.layer(), privilegedFindLoader(controller.layer(), moduleName)); - } - - private static List parentLayersOrBoot(List parentLayers) { - if (parentLayers == null || parentLayers.isEmpty()) { - return List.of(ModuleLayer.boot()); - } else { - return parentLayers; - } - } - - private static List parentConfigurationOrBoot(List parentLayers) { - if (parentLayers == null || parentLayers.isEmpty()) { - return List.of(ModuleLayer.boot().configuration()); - } else { - return parentLayers.stream().map(ModuleLayer::configuration).toList(); - } - } - - /** Ensures that the plugins main class (its entry point), if any, is accessible to the server. */ - private static void ensureEntryPointAccessible(Controller controller, Module pluginModule, String className) { - if (className != null) { - controller.addOpens(pluginModule, toPackageName(className), serverModule); - } - } - - protected void addServerExportsService(Map> qualifiedExports) { - final Module serverModule = PluginsService.class.getModule(); - var exportsService = new ModuleQualifiedExportsService(serverModule) { - @Override - protected void addExports(String pkg, Module target) { - serverModule.addExports(pkg, target); - } - - @Override - protected void addOpens(String pkg, Module target) { - serverModule.addOpens(pkg, target); - } - }; - addExportsService(qualifiedExports, exportsService, serverModule.getName()); - } - - private static void addPluginExportsServices(Map> qualifiedExports, Controller controller) { - for (Module module : controller.layer().modules()) { - var exportsService = new ModuleQualifiedExportsService(module) { - @Override - protected void addExports(String pkg, Module target) { - controller.addExports(module, pkg, target); - } - - @Override - protected void addOpens(String pkg, Module target) { - controller.addOpens(module, pkg, target); - } - }; - addExportsService(qualifiedExports, exportsService, module.getName()); - } - } - - /** Determines the module name of the SPI module, given its URL. */ - static String spiModuleName(Set spiURLS) { - ModuleFinder finder = ModuleFinder.of(urlsToPaths(spiURLS)); - var mrefs = finder.findAll(); - assert mrefs.size() == 1 : "Expected a single module, got:" + mrefs; - return mrefs.stream().findFirst().get().descriptor().name(); - } - - /** - * Tuple of module layer and loader. - * Modular Plugins have a plugin specific loader and layer. - * Non-Modular plugins have a plugin specific loader and the boot layer. - */ - record LayerAndLoader(ModuleLayer layer, ClassLoader loader) { - - LayerAndLoader { - Objects.requireNonNull(layer); - Objects.requireNonNull(loader); - } - - static LayerAndLoader ofLoader(ClassLoader loader) { - return new LayerAndLoader(ModuleLayer.boot(), loader); - } - } - - @SuppressForbidden(reason = "I need to convert URL's to Paths") - static final Path[] urlsToPaths(Set urls) { - return urls.stream().map(PluginsService::uncheckedToURI).map(PathUtils::get).toArray(Path[]::new); - } - - static final URI uncheckedToURI(URL url) { - try { - return url.toURI(); - } catch (URISyntaxException e) { - throw new AssertionError(new IOException(e)); - } - } - - static final String toPackageName(String className) { - assert className.endsWith(".") == false; - int index = className.lastIndexOf('.'); - if (index == -1) { - throw new IllegalStateException("invalid class name:" + className); - } - return className.substring(0, index); - } - @SuppressWarnings("removal") private static void privilegedSetContextClassLoader(ClassLoader loader) { AccessController.doPrivileged((PrivilegedAction) () -> { @@ -880,16 +545,4 @@ private static void privilegedSetContextClassLoader(ClassLoader loader) { return null; }); } - - @SuppressWarnings("removal") - static Controller privilegedDefineModulesWithOneLoader(Configuration cf, List parentLayers, ClassLoader parentLoader) { - return AccessController.doPrivileged( - (PrivilegedAction) () -> ModuleLayer.defineModulesWithOneLoader(cf, parentLayers, parentLoader) - ); - } - - @SuppressWarnings("removal") - static ClassLoader privilegedFindLoader(ModuleLayer layer, String name) { - return AccessController.doPrivileged((PrivilegedAction) () -> layer.findLoader(name)); - } } diff --git a/server/src/test/java/org/elasticsearch/plugins/PluginsLoaderTests.java b/server/src/test/java/org/elasticsearch/plugins/PluginsLoaderTests.java new file mode 100644 index 0000000000000..059cb15551acb --- /dev/null +++ b/server/src/test/java/org/elasticsearch/plugins/PluginsLoaderTests.java @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.plugins; + +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.equalTo; + +public class PluginsLoaderTests extends ESTestCase { + + public void testToModuleName() { + assertThat(PluginsLoader.toModuleName("module.name"), equalTo("module.name")); + assertThat(PluginsLoader.toModuleName("module-name"), equalTo("module.name")); + assertThat(PluginsLoader.toModuleName("module-name1"), equalTo("module.name1")); + assertThat(PluginsLoader.toModuleName("1module-name"), equalTo("module.name")); + assertThat(PluginsLoader.toModuleName("module-name!"), equalTo("module.name")); + assertThat(PluginsLoader.toModuleName("module!@#name!"), equalTo("module.name")); + assertThat(PluginsLoader.toModuleName("!module-name!"), equalTo("module.name")); + assertThat(PluginsLoader.toModuleName("module_name"), equalTo("module_name")); + assertThat(PluginsLoader.toModuleName("-module-name-"), equalTo("module.name")); + assertThat(PluginsLoader.toModuleName("_module_name"), equalTo("_module_name")); + assertThat(PluginsLoader.toModuleName("_"), equalTo("_")); + } +} diff --git a/server/src/test/java/org/elasticsearch/plugins/PluginsServiceTests.java b/server/src/test/java/org/elasticsearch/plugins/PluginsServiceTests.java index f927a12b50da3..b84f1d2c7635c 100644 --- a/server/src/test/java/org/elasticsearch/plugins/PluginsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/plugins/PluginsServiceTests.java @@ -66,12 +66,12 @@ public class PluginsServiceTests extends ESTestCase { public static class FilterablePlugin extends Plugin implements ScriptPlugin {} static PluginsService newPluginsService(Settings settings) { - return new PluginsService(settings, null, null, TestEnvironment.newEnvironment(settings).pluginsFile()) { + return new PluginsService(settings, null, new PluginsLoader(null, TestEnvironment.newEnvironment(settings).pluginsFile()) { @Override protected void addServerExportsService(Map> qualifiedExports) { // tests don't run modular } - }; + }); } static PluginsService newMockPluginsService(List> classpathPlugins) { @@ -875,20 +875,6 @@ public void testCanCreateAClassLoader() { assertEquals(this.getClass().getClassLoader(), loader.getParent()); } - public void testToModuleName() { - assertThat(PluginsService.toModuleName("module.name"), equalTo("module.name")); - assertThat(PluginsService.toModuleName("module-name"), equalTo("module.name")); - assertThat(PluginsService.toModuleName("module-name1"), equalTo("module.name1")); - assertThat(PluginsService.toModuleName("1module-name"), equalTo("module.name")); - assertThat(PluginsService.toModuleName("module-name!"), equalTo("module.name")); - assertThat(PluginsService.toModuleName("module!@#name!"), equalTo("module.name")); - assertThat(PluginsService.toModuleName("!module-name!"), equalTo("module.name")); - assertThat(PluginsService.toModuleName("module_name"), equalTo("module_name")); - assertThat(PluginsService.toModuleName("-module-name-"), equalTo("module.name")); - assertThat(PluginsService.toModuleName("_module_name"), equalTo("_module_name")); - assertThat(PluginsService.toModuleName("_"), equalTo("_")); - } - static final class Loader extends ClassLoader { Loader(ClassLoader parent) { super(parent); @@ -896,16 +882,17 @@ static final class Loader extends ClassLoader { } // Closes the URLClassLoaders and UberModuleClassloaders of plugins loaded by the given plugin service. + // We can use the direct ClassLoader from the plugin because tests do not use any parent SPI ClassLoaders. static void closePluginLoaders(PluginsService pluginService) { for (var lp : pluginService.plugins()) { - if (lp.loader() instanceof URLClassLoader urlClassLoader) { + if (lp.instance().getClass().getClassLoader() instanceof URLClassLoader urlClassLoader) { try { PrivilegedOperations.closeURLClassLoader(urlClassLoader); } catch (IOException unexpected) { throw new UncheckedIOException(unexpected); } } - if (lp.loader() instanceof UberModuleClassLoader loader) { + if (lp.instance().getClass().getClassLoader() instanceof UberModuleClassLoader loader) { try { PrivilegedOperations.closeURLClassLoader(loader.getInternalLoader()); } catch (Exception e) { diff --git a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java index 40fb4f91c77d0..38c7b1eb04772 100644 --- a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java +++ b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java @@ -31,6 +31,7 @@ import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.MockPluginsService; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.PluginsLoader; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.readiness.MockReadinessService; import org.elasticsearch.readiness.ReadinessService; @@ -279,10 +280,11 @@ private MockNode( final Collection> classpathPlugins, final boolean forbidPrivateIndexSettings ) { - super(NodeConstruction.prepareConstruction(environment, new MockServiceProvider() { + super(NodeConstruction.prepareConstruction(environment, null, new MockServiceProvider() { + @Override - PluginsService newPluginService(Environment environment, Settings settings) { - return new MockPluginsService(settings, environment, classpathPlugins); + PluginsService newPluginService(Environment environment, PluginsLoader pluginsLoader) { + return new MockPluginsService(environment.settings(), environment, classpathPlugins); } }, forbidPrivateIndexSettings)); diff --git a/test/framework/src/main/java/org/elasticsearch/plugins/MockPluginsService.java b/test/framework/src/main/java/org/elasticsearch/plugins/MockPluginsService.java index e4734f9cf021e..d51b2cfb450bc 100644 --- a/test/framework/src/main/java/org/elasticsearch/plugins/MockPluginsService.java +++ b/test/framework/src/main/java/org/elasticsearch/plugins/MockPluginsService.java @@ -20,7 +20,6 @@ import org.elasticsearch.plugins.spi.SPIClassIterator; import java.lang.reflect.Constructor; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -44,14 +43,18 @@ public class MockPluginsService extends PluginsService { * @param classpathPlugins Plugins that exist in the classpath which should be loaded */ public MockPluginsService(Settings settings, Environment environment, Collection> classpathPlugins) { - super(settings, environment.configFile(), environment.modulesFile(), environment.pluginsFile()); + super(settings, environment.configFile(), new PluginsLoader(environment.modulesFile(), environment.pluginsFile()) { - final Path configPath = environment.configFile(); + @Override + protected void addServerExportsService(Map> qualifiedExports) { + // tests don't run modular + } + }); List pluginsLoaded = new ArrayList<>(); for (Class pluginClass : classpathPlugins) { - Plugin plugin = loadPlugin(pluginClass, settings, configPath); + Plugin plugin = loadPlugin(pluginClass, settings, environment.configFile()); PluginDescriptor pluginInfo = new PluginDescriptor( pluginClass.getName(), "classpath plugin", @@ -69,7 +72,7 @@ public MockPluginsService(Settings settings, Environment environment, Collection if (logger.isTraceEnabled()) { logger.trace("plugin loaded from classpath [{}]", pluginInfo); } - pluginsLoaded.add(new LoadedPlugin(pluginInfo, plugin, pluginClass.getClassLoader(), ModuleLayer.boot())); + pluginsLoaded.add(new LoadedPlugin(pluginInfo, plugin)); } loadExtensions(pluginsLoaded); this.classpathPlugins = List.copyOf(pluginsLoaded); @@ -169,9 +172,4 @@ private static List createExtensions( } return extensions; } - - @Override - protected void addServerExportsService(Map> qualifiedExports) { - // tests don't run modular - } } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/bench/WatcherScheduleEngineBenchmark.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/bench/WatcherScheduleEngineBenchmark.java index 1691a464d8061..99fb626ad9474 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/bench/WatcherScheduleEngineBenchmark.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/bench/WatcherScheduleEngineBenchmark.java @@ -16,11 +16,13 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.env.Environment; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.node.InternalSettingsPreparer; import org.elasticsearch.node.MockNode; import org.elasticsearch.node.Node; +import org.elasticsearch.plugins.PluginsLoader; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; @@ -96,18 +98,18 @@ public static void main(String[] args) throws Exception { ); System.out.println("and heap_max=" + JvmInfo.jvmInfo().getMem().getHeapMax()); + Environment internalNodeEnv = InternalSettingsPreparer.prepareEnvironment( + Settings.builder().put(SETTINGS).put("node.data", false).build(), + emptyMap(), + null, + () -> { + throw new IllegalArgumentException("settings must have [node.name]"); + } + ); + // First clean everything and index the watcher (but not via put alert api!) try ( - Node node = new Node( - InternalSettingsPreparer.prepareEnvironment( - Settings.builder().put(SETTINGS).put("node.data", false).build(), - emptyMap(), - null, - () -> { - throw new IllegalArgumentException("settings must have [node.name]"); - } - ) - ).start() + Node node = new Node(internalNodeEnv, new PluginsLoader(internalNodeEnv.modulesFile(), internalNodeEnv.pluginsFile())).start() ) { final Client client = node.client(); ClusterHealthResponse response = client.admin().cluster().prepareHealth(TimeValue.THIRTY_SECONDS).setWaitForNodes("2").get(); From 4cc9f5de6c14bd7b96df30977e17995c389a1162 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 20 Nov 2024 16:09:46 -0700 Subject: [PATCH 3/4] Revert Remove direct cloning of BytesTransportRequests (#117200) Reverts #114808 and unmutes #117024 which was a related failure. --- muted-tests.yml | 3 --- .../test/transport/MockTransportService.java | 20 +++++++++++++++++-- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index 0331f705951f1..710cdea8f1564 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -237,9 +237,6 @@ tests: - class: org.elasticsearch.upgrades.QueryBuilderBWCIT method: testQueryBuilderBWC {cluster=UPGRADED} issue: https://github.com/elastic/elasticsearch/issues/116990 -- class: org.elasticsearch.discovery.ClusterDisruptionIT - method: testAckedIndexing - issue: https://github.com/elastic/elasticsearch/issues/117024 - class: org.elasticsearch.smoketest.DocsClientYamlTestSuiteIT method: test {yaml=reference/esql/esql-across-clusters/line_197} issue: https://github.com/elastic/elasticsearch/issues/117099 diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 18c591166e720..fd376fcd07688 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -39,6 +39,7 @@ import org.elasticsearch.core.RefCounted; import org.elasticsearch.core.Strings; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.UpdateForV9; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.node.Node; import org.elasticsearch.plugins.Plugin; @@ -49,6 +50,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.tasks.MockTaskManager; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.BytesTransportRequest; import org.elasticsearch.transport.ClusterConnectionManager; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectionProfile; @@ -584,8 +586,13 @@ public void sendRequest( // poor mans request cloning... BytesStreamOutput bStream = new BytesStreamOutput(); request.writeTo(bStream); - RequestHandlerRegistry reg = MockTransportService.this.getRequestHandler(action); - final TransportRequest clonedRequest = reg.newRequest(bStream.bytes().streamInput()); + final TransportRequest clonedRequest; + if (request instanceof BytesTransportRequest) { + clonedRequest = copyRawBytesForBwC(bStream); + } else { + RequestHandlerRegistry reg = MockTransportService.this.getRequestHandler(action); + clonedRequest = reg.newRequest(bStream.bytes().streamInput()); + } assert clonedRequest.getClass().equals(MasterNodeRequestHelper.unwrapTermOverride(request).getClass()) : clonedRequest + " vs " + request; @@ -633,6 +640,15 @@ protected void doRun() throws IOException { } } + // Some request handlers read back a BytesTransportRequest + // into a different class that cannot be re-serialized (i.e. JOIN_VALIDATE_ACTION_NAME), + // in those cases we just copy the raw bytes back to a BytesTransportRequest. + // This is only needed for the BwC for JOIN_VALIDATE_ACTION_NAME and can be removed in the next major + @UpdateForV9(owner = UpdateForV9.Owner.DISTRIBUTED_COORDINATION) + private static TransportRequest copyRawBytesForBwC(BytesStreamOutput bStream) throws IOException { + return new BytesTransportRequest(bStream.bytes().streamInput()); + } + @Override public void clearCallback() { synchronized (this) { From afa3abcec5968161cbb2aa126104ea8daad7c3d2 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Wed, 20 Nov 2024 17:34:44 -0600 Subject: [PATCH 4/4] Reindex data stream persistent task (#116780) --- .../ReindexDataStreamTransportActionIT.java | 152 +++++++++++++++++ .../datastreams/DataStreamsPlugin.java | 65 +++++++- .../ReindexDataStreamTransportAction.java | 93 +++++++++++ ...indexDataStreamPersistentTaskExecutor.java | 121 ++++++++++++++ .../ReindexDataStreamPersistentTaskState.java | 63 +++++++ .../task/ReindexDataStreamStatus.java | 95 +++++++++++ .../task/ReindexDataStreamTask.java | 86 ++++++++++ .../task/ReindexDataStreamTaskParams.java | 86 ++++++++++ ...dexDataStreamPersistentTaskStateTests.java | 38 +++++ .../task/ReindexDataStreamStatusTests.java | 157 ++++++++++++++++++ .../ReindexDataStreamTaskParamsTests.java | 70 ++++++++ .../org/elasticsearch/TransportVersions.java | 1 + .../datastreams/ReindexDataStreamAction.java | 119 +++++++++++++ .../ReindexDataStreamResponseTests.java | 52 ++++++ 14 files changed, 1197 insertions(+), 1 deletion(-) create mode 100644 modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/action/ReindexDataStreamTransportActionIT.java create mode 100644 modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/ReindexDataStreamTransportAction.java create mode 100644 modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskExecutor.java create mode 100644 modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskState.java create mode 100644 modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamStatus.java create mode 100644 modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamTask.java create mode 100644 modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamTaskParams.java create mode 100644 modules/data-streams/src/test/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskStateTests.java create mode 100644 modules/data-streams/src/test/java/org/elasticsearch/datastreams/task/ReindexDataStreamStatusTests.java create mode 100644 modules/data-streams/src/test/java/org/elasticsearch/datastreams/task/ReindexDataStreamTaskParamsTests.java create mode 100644 server/src/main/java/org/elasticsearch/action/datastreams/ReindexDataStreamAction.java create mode 100644 server/src/test/java/org/elasticsearch/action/datastreams/ReindexDataStreamResponseTests.java diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/action/ReindexDataStreamTransportActionIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/action/ReindexDataStreamTransportActionIT.java new file mode 100644 index 0000000000000..fdc96892d4b27 --- /dev/null +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/action/ReindexDataStreamTransportActionIT.java @@ -0,0 +1,152 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.action; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.admin.indices.rollover.RolloverRequestBuilder; +import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; +import org.elasticsearch.action.datastreams.CreateDataStreamAction; +import org.elasticsearch.action.datastreams.ReindexDataStreamAction; +import org.elasticsearch.action.datastreams.ReindexDataStreamAction.ReindexDataStreamRequest; +import org.elasticsearch.action.datastreams.ReindexDataStreamAction.ReindexDataStreamResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.Template; +import org.elasticsearch.datastreams.DataStreamsPlugin; +import org.elasticsearch.datastreams.task.ReindexDataStreamTask; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xcontent.XContentType; + +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class ReindexDataStreamTransportActionIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return List.of(DataStreamsPlugin.class); + } + + public void testNonExistentDataStream() { + String nonExistentDataStreamName = randomAlphaOfLength(50); + ReindexDataStreamRequest reindexDataStreamRequest = new ReindexDataStreamRequest(nonExistentDataStreamName); + assertThrows( + ResourceNotFoundException.class, + () -> client().execute(new ActionType(ReindexDataStreamAction.NAME), reindexDataStreamRequest) + .actionGet() + ); + } + + public void testAlreadyUpToDateDataStream() throws Exception { + String dataStreamName = randomAlphaOfLength(50).toLowerCase(Locale.ROOT); + ReindexDataStreamRequest reindexDataStreamRequest = new ReindexDataStreamRequest(dataStreamName); + createDataStream(dataStreamName); + ReindexDataStreamResponse response = client().execute( + new ActionType(ReindexDataStreamAction.NAME), + reindexDataStreamRequest + ).actionGet(); + String persistentTaskId = response.getTaskId(); + assertThat(persistentTaskId, equalTo("reindex-data-stream-" + dataStreamName)); + AtomicReference runningTask = new AtomicReference<>(); + for (TransportService transportService : internalCluster().getInstances(TransportService.class)) { + TaskManager taskManager = transportService.getTaskManager(); + Map tasksMap = taskManager.getCancellableTasks(); + Optional> optionalTask = taskManager.getCancellableTasks() + .entrySet() + .stream() + .filter(entry -> entry.getValue().getType().equals("persistent")) + .filter( + entry -> entry.getValue() instanceof ReindexDataStreamTask + && persistentTaskId.equals((((ReindexDataStreamTask) entry.getValue()).getPersistentTaskId())) + ) + .findAny(); + optionalTask.ifPresent( + longCancellableTaskEntry -> runningTask.compareAndSet(null, (ReindexDataStreamTask) longCancellableTaskEntry.getValue()) + ); + } + ReindexDataStreamTask task = runningTask.get(); + assertNotNull(task); + assertThat(task.getStatus().complete(), equalTo(true)); + assertNull(task.getStatus().exception()); + assertThat(task.getStatus().pending(), equalTo(0)); + assertThat(task.getStatus().inProgress(), equalTo(0)); + assertThat(task.getStatus().errors().size(), equalTo(0)); + } + + private void createDataStream(String dataStreamName) { + final TransportPutComposableIndexTemplateAction.Request putComposableTemplateRequest = + new TransportPutComposableIndexTemplateAction.Request("my-template"); + putComposableTemplateRequest.indexTemplate( + ComposableIndexTemplate.builder() + .indexPatterns(List.of(dataStreamName)) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false)) + .template(Template.builder().build()) + .build() + ); + final AcknowledgedResponse putComposableTemplateResponse = safeGet( + client().execute(TransportPutComposableIndexTemplateAction.TYPE, putComposableTemplateRequest) + ); + assertThat(putComposableTemplateResponse.isAcknowledged(), is(true)); + + final CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request( + TEST_REQUEST_TIMEOUT, + TEST_REQUEST_TIMEOUT, + dataStreamName + ); + final AcknowledgedResponse createDataStreamResponse = safeGet( + client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest) + ); + assertThat(createDataStreamResponse.isAcknowledged(), is(true)); + indexDocs(dataStreamName); + safeGet(new RolloverRequestBuilder(client()).setRolloverTarget(dataStreamName).lazy(false).execute()); + indexDocs(dataStreamName); + safeGet(new RolloverRequestBuilder(client()).setRolloverTarget(dataStreamName).lazy(false).execute()); + } + + private void indexDocs(String dataStreamName) { + int docs = randomIntBetween(5, 10); + CountDownLatch countDownLatch = new CountDownLatch(docs); + for (int i = 0; i < docs; i++) { + var indexRequest = new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.CREATE); + final String doc = "{ \"@timestamp\": \"2099-05-06T16:21:15.000Z\", \"message\": \"something cool happened\" }"; + indexRequest.source(doc, XContentType.JSON); + client().index(indexRequest, new ActionListener<>() { + @Override + public void onResponse(DocWriteResponse docWriteResponse) { + countDownLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + fail("Indexing request should have succeeded eventually, failed with " + e.getMessage()); + } + }); + } + safeAwait(countDownLatch); + } + +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java index cb7445705537a..2f3b63d27ca35 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java @@ -19,19 +19,23 @@ import org.elasticsearch.action.datastreams.MigrateToDataStreamAction; import org.elasticsearch.action.datastreams.ModifyDataStreamsAction; import org.elasticsearch.action.datastreams.PromoteDataStreamAction; +import org.elasticsearch.action.datastreams.ReindexDataStreamAction; import org.elasticsearch.action.datastreams.lifecycle.ExplainDataStreamLifecycleAction; import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction; import org.elasticsearch.action.datastreams.lifecycle.PutDataStreamLifecycleAction; +import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.OriginSettingClient; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; +import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.TimeValue; import org.elasticsearch.datastreams.action.CreateDataStreamTransportAction; @@ -40,6 +44,7 @@ import org.elasticsearch.datastreams.action.MigrateToDataStreamTransportAction; import org.elasticsearch.datastreams.action.ModifyDataStreamsTransportAction; import org.elasticsearch.datastreams.action.PromoteDataStreamTransportAction; +import org.elasticsearch.datastreams.action.ReindexDataStreamTransportAction; import org.elasticsearch.datastreams.action.TransportGetDataStreamsAction; import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore; import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService; @@ -73,14 +78,27 @@ import org.elasticsearch.datastreams.rest.RestMigrateToDataStreamAction; import org.elasticsearch.datastreams.rest.RestModifyDataStreamsAction; import org.elasticsearch.datastreams.rest.RestPromoteDataStreamAction; +import org.elasticsearch.datastreams.task.ReindexDataStreamPersistentTaskExecutor; +import org.elasticsearch.datastreams.task.ReindexDataStreamPersistentTaskState; +import org.elasticsearch.datastreams.task.ReindexDataStreamStatus; +import org.elasticsearch.datastreams.task.ReindexDataStreamTask; +import org.elasticsearch.datastreams.task.ReindexDataStreamTaskParams; import org.elasticsearch.features.NodeFeature; import org.elasticsearch.health.HealthIndicatorService; import org.elasticsearch.index.IndexSettingProvider; +import org.elasticsearch.persistent.PersistentTaskParams; +import org.elasticsearch.persistent.PersistentTaskState; +import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.HealthPlugin; +import org.elasticsearch.plugins.PersistentTaskPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xcontent.NamedXContentRegistry; +import org.elasticsearch.xcontent.ParseField; import java.io.IOException; import java.time.Clock; @@ -93,7 +111,7 @@ import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.DATA_STREAM_LIFECYCLE_ORIGIN; -public class DataStreamsPlugin extends Plugin implements ActionPlugin, HealthPlugin { +public class DataStreamsPlugin extends Plugin implements ActionPlugin, HealthPlugin, PersistentTaskPlugin { public static final Setting TIME_SERIES_POLL_INTERVAL = Setting.timeSetting( "time_series.poll_interval", @@ -244,6 +262,7 @@ public Collection createComponents(PluginServices services) { actions.add(new ActionHandler<>(PutDataStreamOptionsAction.INSTANCE, TransportPutDataStreamOptionsAction.class)); actions.add(new ActionHandler<>(DeleteDataStreamOptionsAction.INSTANCE, TransportDeleteDataStreamOptionsAction.class)); } + actions.add(new ActionHandler<>(ReindexDataStreamAction.INSTANCE, ReindexDataStreamTransportAction.class)); return actions; } @@ -302,4 +321,48 @@ public void close() throws IOException { public Collection getHealthIndicatorServices() { return List.of(dataStreamLifecycleHealthIndicatorService.get()); } + + @Override + public List getNamedXContent() { + return List.of( + new NamedXContentRegistry.Entry( + PersistentTaskState.class, + new ParseField(ReindexDataStreamPersistentTaskState.NAME), + ReindexDataStreamPersistentTaskState::fromXContent + ), + new NamedXContentRegistry.Entry( + PersistentTaskParams.class, + new ParseField(ReindexDataStreamTaskParams.NAME), + ReindexDataStreamTaskParams::fromXContent + ) + ); + } + + @Override + public List getNamedWriteables() { + return List.of( + new NamedWriteableRegistry.Entry( + PersistentTaskState.class, + ReindexDataStreamPersistentTaskState.NAME, + ReindexDataStreamPersistentTaskState::new + ), + new NamedWriteableRegistry.Entry( + PersistentTaskParams.class, + ReindexDataStreamTaskParams.NAME, + ReindexDataStreamTaskParams::new + ), + new NamedWriteableRegistry.Entry(Task.Status.class, ReindexDataStreamStatus.NAME, ReindexDataStreamStatus::new) + ); + } + + @Override + public List> getPersistentTasksExecutor( + ClusterService clusterService, + ThreadPool threadPool, + Client client, + SettingsModule settingsModule, + IndexNameExpressionResolver expressionResolver + ) { + return List.of(new ReindexDataStreamPersistentTaskExecutor(client, clusterService, ReindexDataStreamTask.TASK_NAME, threadPool)); + } } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/ReindexDataStreamTransportAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/ReindexDataStreamTransportAction.java new file mode 100644 index 0000000000000..0a86985c6c7b2 --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/ReindexDataStreamTransportAction.java @@ -0,0 +1,93 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.action; + +import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.datastreams.ReindexDataStreamAction; +import org.elasticsearch.action.datastreams.ReindexDataStreamAction.ReindexDataStreamRequest; +import org.elasticsearch.action.datastreams.ReindexDataStreamAction.ReindexDataStreamResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.datastreams.task.ReindexDataStreamTask; +import org.elasticsearch.datastreams.task.ReindexDataStreamTaskParams; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.persistent.PersistentTasksService; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +/* + * This transport action creates a new persistent task for reindexing the source data stream given in the request. On successful creation + * of the persistent task, it responds with the persistent task id so that the user can monitor the persistent task. + */ +public class ReindexDataStreamTransportAction extends HandledTransportAction { + private final PersistentTasksService persistentTasksService; + private final TransportService transportService; + private final ClusterService clusterService; + + @Inject + public ReindexDataStreamTransportAction( + TransportService transportService, + ActionFilters actionFilters, + PersistentTasksService persistentTasksService, + ClusterService clusterService + ) { + super( + ReindexDataStreamAction.NAME, + true, + transportService, + actionFilters, + ReindexDataStreamRequest::new, + transportService.getThreadPool().executor(ThreadPool.Names.GENERIC) + ); + this.transportService = transportService; + this.persistentTasksService = persistentTasksService; + this.clusterService = clusterService; + } + + @Override + protected void doExecute(Task task, ReindexDataStreamRequest request, ActionListener listener) { + String sourceDataStreamName = request.getSourceDataStream(); + Metadata metadata = clusterService.state().metadata(); + DataStream dataStream = metadata.dataStreams().get(sourceDataStreamName); + if (dataStream == null) { + listener.onFailure(new ResourceNotFoundException("Data stream named [{}] does not exist", sourceDataStreamName)); + return; + } + int totalIndices = dataStream.getIndices().size(); + int totalIndicesToBeUpgraded = (int) dataStream.getIndices() + .stream() + .filter(index -> metadata.index(index).getCreationVersion().isLegacyIndexVersion()) + .count(); + ReindexDataStreamTaskParams params = new ReindexDataStreamTaskParams( + sourceDataStreamName, + transportService.getThreadPool().absoluteTimeInMillis(), + totalIndices, + totalIndicesToBeUpgraded + ); + String persistentTaskId = getPersistentTaskId(sourceDataStreamName); + persistentTasksService.sendStartRequest( + persistentTaskId, + ReindexDataStreamTask.TASK_NAME, + params, + null, + ActionListener.wrap(startedTask -> listener.onResponse(new ReindexDataStreamResponse(persistentTaskId)), listener::onFailure) + ); + } + + private String getPersistentTaskId(String dataStreamName) throws ResourceAlreadyExistsException { + return "reindex-data-stream-" + dataStreamName; + } +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskExecutor.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskExecutor.java new file mode 100644 index 0000000000000..f10d2e7b356fb --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskExecutor.java @@ -0,0 +1,121 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.task; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.datastreams.GetDataStreamAction; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.Index; +import org.elasticsearch.persistent.AllocatedPersistentTask; +import org.elasticsearch.persistent.PersistentTaskState; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.persistent.PersistentTasksExecutor; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.List; +import java.util.Map; + +public class ReindexDataStreamPersistentTaskExecutor extends PersistentTasksExecutor { + private static final TimeValue TASK_KEEP_ALIVE_TIME = TimeValue.timeValueDays(1); + private final Client client; + private final ClusterService clusterService; + private final ThreadPool threadPool; + + public ReindexDataStreamPersistentTaskExecutor(Client client, ClusterService clusterService, String taskName, ThreadPool threadPool) { + super(taskName, threadPool.generic()); + this.client = client; + this.clusterService = clusterService; + this.threadPool = threadPool; + } + + @Override + protected ReindexDataStreamTask createTask( + long id, + String type, + String action, + TaskId parentTaskId, + PersistentTasksCustomMetadata.PersistentTask taskInProgress, + Map headers + ) { + ReindexDataStreamTaskParams params = taskInProgress.getParams(); + return new ReindexDataStreamTask( + params.startTime(), + params.totalIndices(), + params.totalIndicesToBeUpgraded(), + threadPool, + id, + type, + action, + "id=" + taskInProgress.getId(), + parentTaskId, + headers + ); + } + + @Override + protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTaskParams params, PersistentTaskState state) { + String sourceDataStream = params.getSourceDataStream(); + GetDataStreamAction.Request request = new GetDataStreamAction.Request(TimeValue.MAX_VALUE, new String[] { sourceDataStream }); + assert task instanceof ReindexDataStreamTask; + final ReindexDataStreamTask reindexDataStreamTask = (ReindexDataStreamTask) task; + client.execute(GetDataStreamAction.INSTANCE, request, ActionListener.wrap(response -> { + List dataStreamInfos = response.getDataStreams(); + if (dataStreamInfos.size() == 1) { + List indices = dataStreamInfos.getFirst().getDataStream().getIndices(); + List indicesToBeReindexed = indices.stream() + .filter(index -> clusterService.state().getMetadata().index(index).getCreationVersion().isLegacyIndexVersion()) + .toList(); + reindexDataStreamTask.setPendingIndices(indicesToBeReindexed.stream().map(Index::getName).toList()); + for (Index index : indicesToBeReindexed) { + // TODO This is just a placeholder. This is where the real data stream reindex logic will go + } + + completeSuccessfulPersistentTask(reindexDataStreamTask); + } else { + completeFailedPersistentTask(reindexDataStreamTask, new ElasticsearchException("data stream does not exist")); + } + }, reindexDataStreamTask::markAsFailed)); + } + + private void completeSuccessfulPersistentTask(ReindexDataStreamTask persistentTask) { + persistentTask.reindexSucceeded(); + threadPool.schedule(persistentTask::markAsCompleted, getTimeToLive(persistentTask), threadPool.generic()); + } + + private void completeFailedPersistentTask(ReindexDataStreamTask persistentTask, Exception e) { + persistentTask.reindexFailed(e); + threadPool.schedule(() -> persistentTask.markAsFailed(e), getTimeToLive(persistentTask), threadPool.generic()); + } + + private TimeValue getTimeToLive(ReindexDataStreamTask reindexDataStreamTask) { + PersistentTasksCustomMetadata persistentTasksCustomMetadata = clusterService.state() + .getMetadata() + .custom(PersistentTasksCustomMetadata.TYPE); + PersistentTasksCustomMetadata.PersistentTask persistentTask = persistentTasksCustomMetadata.getTask( + reindexDataStreamTask.getPersistentTaskId() + ); + PersistentTaskState state = persistentTask.getState(); + final long completionTime; + if (state == null) { + completionTime = threadPool.absoluteTimeInMillis(); + reindexDataStreamTask.updatePersistentTaskState( + new ReindexDataStreamPersistentTaskState(completionTime), + ActionListener.noop() + ); + } else { + completionTime = ((ReindexDataStreamPersistentTaskState) state).completionTime(); + } + return TimeValue.timeValueMillis(TASK_KEEP_ALIVE_TIME.millis() - (threadPool.absoluteTimeInMillis() - completionTime)); + } +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskState.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskState.java new file mode 100644 index 0000000000000..d6f32a3d34a7a --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskState.java @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.task; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.persistent.PersistentTaskState; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; + +import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg; + +public record ReindexDataStreamPersistentTaskState(long completionTime) implements Task.Status, PersistentTaskState { + public static final String NAME = ReindexDataStreamTask.TASK_NAME; + private static final String COMPLETION_TIME_FIELD = "completion_time"; + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + NAME, + true, + args -> new ReindexDataStreamPersistentTaskState((long) args[0]) + ); + static { + PARSER.declareLong(constructorArg(), new ParseField(COMPLETION_TIME_FIELD)); + } + + public ReindexDataStreamPersistentTaskState(StreamInput in) throws IOException { + this(in.readLong()); + } + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(completionTime); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(COMPLETION_TIME_FIELD, completionTime); + builder.endObject(); + return builder; + } + + public static ReindexDataStreamPersistentTaskState fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } + +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamStatus.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamStatus.java new file mode 100644 index 0000000000000..10dfded853a13 --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamStatus.java @@ -0,0 +1,95 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.task; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +public record ReindexDataStreamStatus( + long persistentTaskStartTime, + int totalIndices, + int totalIndicesToBeUpgraded, + boolean complete, + Exception exception, + int inProgress, + int pending, + List> errors +) implements Task.Status { + public ReindexDataStreamStatus { + Objects.requireNonNull(errors); + } + + public static final String NAME = "ReindexDataStreamStatus"; + + public ReindexDataStreamStatus(StreamInput in) throws IOException { + this( + in.readLong(), + in.readInt(), + in.readInt(), + in.readBoolean(), + in.readException(), + in.readInt(), + in.readInt(), + in.readCollectionAsList(in1 -> Tuple.tuple(in1.readString(), in1.readException())) + ); + } + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(persistentTaskStartTime); + out.writeInt(totalIndices); + out.writeInt(totalIndicesToBeUpgraded); + out.writeBoolean(complete); + out.writeException(exception); + out.writeInt(inProgress); + out.writeInt(pending); + out.writeCollection(errors, (out1, tuple) -> { + out1.writeString(tuple.v1()); + out1.writeException(tuple.v2()); + }); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("start_time", persistentTaskStartTime); + builder.field("complete", complete); + builder.field("total_indices", totalIndices); + builder.field("total_indices_requiring_upgrade", totalIndicesToBeUpgraded); + builder.field("successes", totalIndicesToBeUpgraded - (inProgress + pending + errors.size())); + builder.field("in_progress", inProgress); + builder.field("pending", pending); + builder.startArray("errors"); + for (Tuple error : errors) { + builder.startObject(); + builder.field("index", error.v1()); + builder.field("message", error.v2().getMessage()); + builder.endObject(); + } + builder.endArray(); + if (exception != null) { + builder.field("exception", exception.getMessage()); + } + builder.endObject(); + return builder; + } +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamTask.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamTask.java new file mode 100644 index 0000000000000..2ae244679659f --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamTask.java @@ -0,0 +1,86 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.task; + +import org.elasticsearch.core.Tuple; +import org.elasticsearch.persistent.AllocatedPersistentTask; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class ReindexDataStreamTask extends AllocatedPersistentTask { + public static final String TASK_NAME = "reindex-data-stream"; + private final long persistentTaskStartTime; + private final int totalIndices; + private final int totalIndicesToBeUpgraded; + private final ThreadPool threadPool; + private boolean complete = false; + private Exception exception; + private List inProgress = new ArrayList<>(); + private List pending = List.of(); + private List> errors = new ArrayList<>(); + + public ReindexDataStreamTask( + long persistentTaskStartTime, + int totalIndices, + int totalIndicesToBeUpgraded, + ThreadPool threadPool, + long id, + String type, + String action, + String description, + TaskId parentTask, + Map headers + ) { + super(id, type, action, description, parentTask, headers); + this.persistentTaskStartTime = persistentTaskStartTime; + this.totalIndices = totalIndices; + this.totalIndicesToBeUpgraded = totalIndicesToBeUpgraded; + this.threadPool = threadPool; + } + + @Override + public ReindexDataStreamStatus getStatus() { + return new ReindexDataStreamStatus( + persistentTaskStartTime, + totalIndices, + totalIndicesToBeUpgraded, + complete, + exception, + inProgress.size(), + pending.size(), + errors + ); + } + + public void reindexSucceeded() { + this.complete = true; + } + + public void reindexFailed(Exception e) { + this.complete = true; + this.exception = e; + } + + public void setInProgressIndices(List inProgressIndices) { + this.inProgress = inProgressIndices; + } + + public void setPendingIndices(List pendingIndices) { + this.pending = pendingIndices; + } + + public void addErrorIndex(String index, Exception error) { + this.errors.add(Tuple.tuple(index, error)); + } +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamTaskParams.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamTaskParams.java new file mode 100644 index 0000000000000..5efbc6b672216 --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamTaskParams.java @@ -0,0 +1,86 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.task; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.TransportVersions; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.persistent.PersistentTaskParams; +import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; + +import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg; + +public record ReindexDataStreamTaskParams(String sourceDataStream, long startTime, int totalIndices, int totalIndicesToBeUpgraded) + implements + PersistentTaskParams { + + public static final String NAME = ReindexDataStreamTask.TASK_NAME; + private static final String SOURCE_DATA_STREAM_FIELD = "source_data_stream"; + private static final String START_TIME_FIELD = "start_time"; + private static final String TOTAL_INDICES_FIELD = "total_indices"; + private static final String TOTAL_INDICES_TO_BE_UPGRADED_FIELD = "total_indices_to_be_upgraded"; + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + NAME, + true, + args -> new ReindexDataStreamTaskParams((String) args[0], (long) args[1], (int) args[2], (int) args[3]) + ); + static { + PARSER.declareString(constructorArg(), new ParseField(SOURCE_DATA_STREAM_FIELD)); + PARSER.declareLong(constructorArg(), new ParseField(START_TIME_FIELD)); + PARSER.declareInt(constructorArg(), new ParseField(TOTAL_INDICES_FIELD)); + PARSER.declareInt(constructorArg(), new ParseField(TOTAL_INDICES_TO_BE_UPGRADED_FIELD)); + } + + public ReindexDataStreamTaskParams(StreamInput in) throws IOException { + this(in.readString(), in.readLong(), in.readInt(), in.readInt()); + } + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public TransportVersion getMinimalSupportedVersion() { + return TransportVersions.REINDEX_DATA_STREAMS; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(sourceDataStream); + out.writeLong(startTime); + out.writeInt(totalIndices); + out.writeInt(totalIndicesToBeUpgraded); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder.startObject() + .field(SOURCE_DATA_STREAM_FIELD, sourceDataStream) + .field(START_TIME_FIELD, startTime) + .field(TOTAL_INDICES_FIELD, totalIndices) + .field(TOTAL_INDICES_TO_BE_UPGRADED_FIELD, totalIndicesToBeUpgraded) + .endObject(); + } + + public String getSourceDataStream() { + return sourceDataStream; + } + + public static ReindexDataStreamTaskParams fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } +} diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskStateTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskStateTests.java new file mode 100644 index 0000000000000..be11bff131909 --- /dev/null +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskStateTests.java @@ -0,0 +1,38 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.task; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractXContentSerializingTestCase; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; + +public class ReindexDataStreamPersistentTaskStateTests extends AbstractXContentSerializingTestCase { + @Override + protected ReindexDataStreamPersistentTaskState doParseInstance(XContentParser parser) throws IOException { + return ReindexDataStreamPersistentTaskState.fromXContent(parser); + } + + @Override + protected Writeable.Reader instanceReader() { + return ReindexDataStreamPersistentTaskState::new; + } + + @Override + protected ReindexDataStreamPersistentTaskState createTestInstance() { + return new ReindexDataStreamPersistentTaskState(randomNegativeLong()); + } + + @Override + protected ReindexDataStreamPersistentTaskState mutateInstance(ReindexDataStreamPersistentTaskState instance) throws IOException { + return new ReindexDataStreamPersistentTaskState(instance.completionTime() + 1); + } +} diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/task/ReindexDataStreamStatusTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/task/ReindexDataStreamStatusTests.java new file mode 100644 index 0000000000000..8f0fabc2ce7ee --- /dev/null +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/task/ReindexDataStreamStatusTests.java @@ -0,0 +1,157 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.task; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.json.JsonXContent; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static java.util.Map.entry; +import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS; +import static org.hamcrest.Matchers.equalTo; + +public class ReindexDataStreamStatusTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return ReindexDataStreamStatus::new; + } + + @Override + protected ReindexDataStreamStatus createTestInstance() { + return new ReindexDataStreamStatus( + randomLong(), + randomNegativeInt(), + randomNegativeInt(), + randomBoolean(), + nullableTestException(), + randomNegativeInt(), + randomNegativeInt(), + randomErrorList() + ); + } + + private Exception nullableTestException() { + if (randomBoolean()) { + return testException(); + } + return null; + } + + private Exception testException() { + /* + * Unfortunately ElasticsearchException doesn't have an equals and just falls back to Object::equals. So we can't test for equality + * when we're using an exception. So always just use null. + */ + return null; + } + + private List randomList() { + return randomList(0); + } + + private List randomList(int minSize) { + return randomList(minSize, Math.max(minSize, 100), () -> randomAlphaOfLength(50)); + } + + private List> randomErrorList() { + return randomErrorList(0); + } + + private List> randomErrorList(int minSize) { + return randomList(minSize, Math.max(minSize, 100), () -> Tuple.tuple(randomAlphaOfLength(30), testException())); + } + + @Override + protected ReindexDataStreamStatus mutateInstance(ReindexDataStreamStatus instance) throws IOException { + long startTime = instance.persistentTaskStartTime(); + int totalIndices = instance.totalIndices(); + int totalIndicesToBeUpgraded = instance.totalIndicesToBeUpgraded(); + boolean complete = instance.complete(); + Exception exception = instance.exception(); + int inProgress = instance.inProgress(); + int pending = instance.pending(); + List> errors = instance.errors(); + switch (randomIntBetween(0, 6)) { + case 0 -> startTime = randomLong(); + case 1 -> totalIndices = totalIndices + 1; + case 2 -> totalIndicesToBeUpgraded = totalIndicesToBeUpgraded + 1; + case 3 -> complete = complete == false; + case 4 -> inProgress = inProgress + 1; + case 5 -> pending = pending + 1; + case 6 -> errors = randomErrorList(errors.size() + 1); + default -> throw new UnsupportedOperationException(); + } + return new ReindexDataStreamStatus( + startTime, + totalIndices, + totalIndicesToBeUpgraded, + complete, + exception, + inProgress, + pending, + errors + ); + } + + public void testToXContent() throws IOException { + ReindexDataStreamStatus status = new ReindexDataStreamStatus( + 1234L, + 200, + 100, + true, + new ElasticsearchException("the whole task failed"), + 12, + 8, + List.of( + Tuple.tuple("index7", new ElasticsearchException("index7 failed")), + Tuple.tuple("index8", new ElasticsearchException("index8 " + "failed")) + ) + ); + try (XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent)) { + builder.humanReadable(true); + status.toXContent(builder, EMPTY_PARAMS); + try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) { + Map parserMap = parser.map(); + assertThat( + parserMap, + equalTo( + Map.ofEntries( + entry("start_time", 1234), + entry("total_indices", 200), + entry("total_indices_requiring_upgrade", 100), + entry("complete", true), + entry("exception", "the whole task failed"), + entry("successes", 78), + entry("in_progress", 12), + entry("pending", 8), + entry( + "errors", + List.of( + Map.of("index", "index7", "message", "index7 failed"), + Map.of("index", "index8", "message", "index8 failed") + ) + ) + ) + ) + ); + } + } + } +} diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/task/ReindexDataStreamTaskParamsTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/task/ReindexDataStreamTaskParamsTests.java new file mode 100644 index 0000000000000..55098bf4a68d5 --- /dev/null +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/task/ReindexDataStreamTaskParamsTests.java @@ -0,0 +1,70 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.task; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractXContentSerializingTestCase; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.json.JsonXContent; + +import java.io.IOException; +import java.util.Map; + +import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS; +import static org.hamcrest.Matchers.equalTo; + +public class ReindexDataStreamTaskParamsTests extends AbstractXContentSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return ReindexDataStreamTaskParams::new; + } + + @Override + protected ReindexDataStreamTaskParams createTestInstance() { + return new ReindexDataStreamTaskParams(randomAlphaOfLength(50), randomLong(), randomNonNegativeInt(), randomNonNegativeInt()); + } + + @Override + protected ReindexDataStreamTaskParams mutateInstance(ReindexDataStreamTaskParams instance) { + String sourceDataStream = instance.sourceDataStream(); + long startTime = instance.startTime(); + int totalIndices = instance.totalIndices(); + int totalIndicesToBeUpgraded = instance.totalIndicesToBeUpgraded(); + switch (randomIntBetween(0, 3)) { + case 0 -> sourceDataStream = randomAlphaOfLength(50); + case 1 -> startTime = randomLong(); + case 2 -> totalIndices = totalIndices + 1; + case 3 -> totalIndices = totalIndicesToBeUpgraded + 1; + default -> throw new UnsupportedOperationException(); + } + return new ReindexDataStreamTaskParams(sourceDataStream, startTime, totalIndices, totalIndicesToBeUpgraded); + } + + @Override + protected ReindexDataStreamTaskParams doParseInstance(XContentParser parser) { + return ReindexDataStreamTaskParams.fromXContent(parser); + } + + public void testToXContent() throws IOException { + ReindexDataStreamTaskParams params = createTestInstance(); + try (XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent)) { + builder.humanReadable(true); + params.toXContent(builder, EMPTY_PARAMS); + try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) { + Map parserMap = parser.map(); + assertThat(parserMap.get("source_data_stream"), equalTo(params.sourceDataStream())); + assertThat(((Number) parserMap.get("start_time")).longValue(), equalTo(params.startTime())); + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 95fffb1fe8224..688d2aaf905a6 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -205,6 +205,7 @@ static TransportVersion def(int id) { public static final TransportVersion ESQL_ENRICH_RUNTIME_WARNINGS = def(8_796_00_0); public static final TransportVersion INGEST_PIPELINE_CONFIGURATION_AS_MAP = def(8_797_00_0); public static final TransportVersion INDEXING_PRESSURE_THROTTLING_STATS = def(8_798_00_0); + public static final TransportVersion REINDEX_DATA_STREAMS = def(8_799_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/ReindexDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/ReindexDataStreamAction.java new file mode 100644 index 0000000000000..814c512c43bec --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/datastreams/ReindexDataStreamAction.java @@ -0,0 +1,119 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.datastreams; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +public class ReindexDataStreamAction extends ActionType { + + public static final ReindexDataStreamAction INSTANCE = new ReindexDataStreamAction(); + public static final String NAME = "indices:admin/data_stream/reindex"; + + public ReindexDataStreamAction() { + super(NAME); + } + + public static class ReindexDataStreamResponse extends ActionResponse implements ToXContentObject { + private final String taskId; + + public ReindexDataStreamResponse(String taskId) { + super(); + this.taskId = taskId; + } + + public ReindexDataStreamResponse(StreamInput in) throws IOException { + super(in); + this.taskId = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(taskId); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("task", getTaskId()); + builder.endObject(); + return builder; + } + + public String getTaskId() { + return taskId; + } + + @Override + public int hashCode() { + return Objects.hashCode(taskId); + } + + @Override + public boolean equals(Object other) { + return other instanceof ReindexDataStreamResponse && taskId.equals(((ReindexDataStreamResponse) other).taskId); + } + + } + + public static class ReindexDataStreamRequest extends ActionRequest { + private final String sourceDataStream; + + public ReindexDataStreamRequest(String sourceDataStream) { + super(); + this.sourceDataStream = sourceDataStream; + } + + public ReindexDataStreamRequest(StreamInput in) throws IOException { + super(in); + this.sourceDataStream = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(sourceDataStream); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public boolean getShouldStoreResult() { + return true; // do not wait_for_completion + } + + public String getSourceDataStream() { + return sourceDataStream; + } + + @Override + public int hashCode() { + return Objects.hashCode(sourceDataStream); + } + + @Override + public boolean equals(Object other) { + return other instanceof ReindexDataStreamRequest + && sourceDataStream.equals(((ReindexDataStreamRequest) other).sourceDataStream); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/ReindexDataStreamResponseTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/ReindexDataStreamResponseTests.java new file mode 100644 index 0000000000000..fe839c28aab88 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/datastreams/ReindexDataStreamResponseTests.java @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.datastreams; + +import org.elasticsearch.action.datastreams.ReindexDataStreamAction.ReindexDataStreamResponse; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.json.JsonXContent; + +import java.io.IOException; +import java.util.Map; + +import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS; +import static org.hamcrest.Matchers.equalTo; + +public class ReindexDataStreamResponseTests extends AbstractWireSerializingTestCase { + @Override + protected Writeable.Reader instanceReader() { + return ReindexDataStreamResponse::new; + } + + @Override + protected ReindexDataStreamResponse createTestInstance() { + return new ReindexDataStreamResponse(randomAlphaOfLength(40)); + } + + @Override + protected ReindexDataStreamResponse mutateInstance(ReindexDataStreamResponse instance) { + return createTestInstance(); + } + + public void testToXContent() throws IOException { + ReindexDataStreamResponse response = createTestInstance(); + try (XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent)) { + builder.humanReadable(true); + response.toXContent(builder, EMPTY_PARAMS); + try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) { + assertThat(parser.map(), equalTo(Map.of("task", response.getTaskId()))); + } + } + } +}