From 6095e0c837637d5c4a559bfdb677698e4b5a3704 Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Tue, 27 Aug 2024 10:46:07 -0400 Subject: [PATCH 1/3] WIP to show Geospatial plugin using LockService instance from JS Signed-off-by: Craig Perkins --- .../geospatial/plugin/GeospatialPlugin.java | 65 ++++++++++++++++++- 1 file changed, 63 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java b/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java index d64f20b4..2b391d4f 100644 --- a/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java +++ b/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java @@ -9,17 +9,24 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Supplier; import org.opensearch.action.ActionRequest; +import org.opensearch.action.search.PitService; import org.opensearch.client.Client; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.collect.MapBuilder; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.lifecycle.Lifecycle; import org.opensearch.common.lifecycle.LifecycleComponent; +import org.opensearch.common.lifecycle.LifecycleListener; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.IndexScopedSettings; import org.opensearch.common.settings.Setting; @@ -30,6 +37,7 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; +import org.opensearch.extensions.ExtensionsManager; import org.opensearch.geospatial.action.upload.geojson.UploadGeoJSONAction; import org.opensearch.geospatial.action.upload.geojson.UploadGeoJSONTransportAction; import org.opensearch.geospatial.index.mapper.xypoint.XYPointFieldMapper; @@ -71,9 +79,12 @@ import org.opensearch.geospatial.stats.upload.UploadStatsTransportAction; import org.opensearch.index.IndexModule; import org.opensearch.index.mapper.Mapper; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.SystemIndexDescriptor; import org.opensearch.ingest.Processor; +import org.opensearch.jobscheduler.spi.utils.LockService; import org.opensearch.plugins.ActionPlugin; +import org.opensearch.plugins.ClusterPlugin; import org.opensearch.plugins.IngestPlugin; import org.opensearch.plugins.MapperPlugin; import org.opensearch.plugins.Plugin; @@ -85,6 +96,8 @@ import org.opensearch.script.ScriptService; import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.RemoteClusterService; +import org.opensearch.transport.TransportService; import org.opensearch.watcher.ResourceWatcherService; import lombok.extern.log4j.Log4j2; @@ -94,7 +107,7 @@ * to interact with Cluster. */ @Log4j2 -public class GeospatialPlugin extends Plugin implements IngestPlugin, ActionPlugin, MapperPlugin, SearchPlugin, SystemIndexPlugin { +public class GeospatialPlugin extends Plugin implements IngestPlugin, ActionPlugin, MapperPlugin, SearchPlugin, SystemIndexPlugin, ClusterPlugin { private Ip2GeoCachedDao ip2GeoCachedDao; private DatasourceDao datasourceDao; private GeoIpDataDao geoIpDataDao; @@ -127,7 +140,10 @@ public void onIndexModule(IndexModule indexModule) { @Override public Collection> getGuiceServiceClasses() { - return List.of(Ip2GeoListener.class); + final List> services = new ArrayList<>(1); + services.add(GuiceHolder.class); + services.add(Ip2GeoListener.class); + return services; } @Override @@ -164,6 +180,8 @@ public Collection createComponents( ); Ip2GeoExecutor ip2GeoExecutor = new Ip2GeoExecutor(threadPool); Ip2GeoLockService ip2GeoLockService = new Ip2GeoLockService(clusterService, client); + System.out.println("createComponents"); + System.out.println("LockService: " + GuiceHolder.getLockService()); /** * We don't need to return datasource runner because it is used only by job scheduler and job scheduler * does not use DI but it calls DatasourceExtension#getJobRunner to get DatasourceRunner instance. @@ -257,4 +275,47 @@ public List getAggregations() { return List.of(geoHexGridSpec); } + + @Override + public void onNodeStarted(DiscoveryNode localNode) { + System.out.println("onNodeStarted"); + System.out.println("LockService: " + GuiceHolder.getLockService()); + } + + public static class GuiceHolder implements LifecycleComponent { + + private static LockService lockService; + + @Inject + public GuiceHolder( + final LockService lockService + ) { + GuiceHolder.lockService = lockService; + } + + public static LockService getLockService() { + return lockService; + } + + @Override + public void close() {} + + @Override + public Lifecycle.State lifecycleState() { + return null; + } + + @Override + public void addLifecycleListener(LifecycleListener listener) {} + + @Override + public void removeLifecycleListener(LifecycleListener listener) {} + + @Override + public void start() {} + + @Override + public void stop() {} + + } } From 6a5d01b85419319c0063ed76bd207732a9c02766 Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Tue, 27 Aug 2024 11:05:06 -0400 Subject: [PATCH 2/3] Use instance of LockService from Guice that is instantiated by Job Scheduler Signed-off-by: Craig Perkins --- .../ip2geo/common/Ip2GeoLockService.java | 15 ++++- .../geospatial/plugin/GeospatialPlugin.java | 55 +++++++++---------- 2 files changed, 40 insertions(+), 30 deletions(-) diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockService.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockService.java index 89174f95..81a4e9e7 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockService.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockService.java @@ -30,7 +30,7 @@ public class Ip2GeoLockService { public static final long LOCK_DURATION_IN_SECONDS = 300l; public static final long RENEW_AFTER_IN_SECONDS = 120l; private final ClusterService clusterService; - private final LockService lockService; + private LockService lockService; /** * Constructor @@ -43,6 +43,19 @@ public Ip2GeoLockService(final ClusterService clusterService, final Client clien this.lockService = new LockService(client, clusterService); } + /** + * Constructor + * + * @param clusterService the cluster service + */ + public Ip2GeoLockService(final ClusterService clusterService) { + this.clusterService = clusterService; + } + + public void initialize(final LockService lockService) { + this.lockService = lockService; + } + /** * Wrapper method of LockService#acquireLockWithId * diff --git a/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java b/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java index 2b391d4f..8882bb72 100644 --- a/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java +++ b/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java @@ -9,14 +9,11 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.function.Supplier; import org.opensearch.action.ActionRequest; -import org.opensearch.action.search.PitService; import org.opensearch.client.Client; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNode; @@ -37,7 +34,6 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; -import org.opensearch.extensions.ExtensionsManager; import org.opensearch.geospatial.action.upload.geojson.UploadGeoJSONAction; import org.opensearch.geospatial.action.upload.geojson.UploadGeoJSONTransportAction; import org.opensearch.geospatial.index.mapper.xypoint.XYPointFieldMapper; @@ -79,7 +75,6 @@ import org.opensearch.geospatial.stats.upload.UploadStatsTransportAction; import org.opensearch.index.IndexModule; import org.opensearch.index.mapper.Mapper; -import org.opensearch.indices.IndicesService; import org.opensearch.indices.SystemIndexDescriptor; import org.opensearch.ingest.Processor; import org.opensearch.jobscheduler.spi.utils.LockService; @@ -96,8 +91,6 @@ import org.opensearch.script.ScriptService; import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.ThreadPool; -import org.opensearch.transport.RemoteClusterService; -import org.opensearch.transport.TransportService; import org.opensearch.watcher.ResourceWatcherService; import lombok.extern.log4j.Log4j2; @@ -107,11 +100,22 @@ * to interact with Cluster. */ @Log4j2 -public class GeospatialPlugin extends Plugin implements IngestPlugin, ActionPlugin, MapperPlugin, SearchPlugin, SystemIndexPlugin, ClusterPlugin { +public class GeospatialPlugin extends Plugin + implements + IngestPlugin, + ActionPlugin, + MapperPlugin, + SearchPlugin, + SystemIndexPlugin, + ClusterPlugin { private Ip2GeoCachedDao ip2GeoCachedDao; private DatasourceDao datasourceDao; private GeoIpDataDao geoIpDataDao; private URLDenyListChecker urlDenyListChecker; + private ClusterService clusterService; + private Ip2GeoLockService ip2GeoLockService; + private Ip2GeoExecutor ip2GeoExecutor; + private DatasourceUpdateService datasourceUpdateService; @Override public Collection getSystemIndexDescriptors(Settings settings) { @@ -172,22 +176,10 @@ public Collection createComponents( IndexNameExpressionResolver indexNameExpressionResolver, Supplier repositoriesServiceSupplier ) { - DatasourceUpdateService datasourceUpdateService = new DatasourceUpdateService( - clusterService, - datasourceDao, - geoIpDataDao, - urlDenyListChecker - ); - Ip2GeoExecutor ip2GeoExecutor = new Ip2GeoExecutor(threadPool); - Ip2GeoLockService ip2GeoLockService = new Ip2GeoLockService(clusterService, client); - System.out.println("createComponents"); - System.out.println("LockService: " + GuiceHolder.getLockService()); - /** - * We don't need to return datasource runner because it is used only by job scheduler and job scheduler - * does not use DI but it calls DatasourceExtension#getJobRunner to get DatasourceRunner instance. - */ - DatasourceRunner.getJobRunnerInstance() - .initialize(clusterService, datasourceUpdateService, ip2GeoExecutor, datasourceDao, ip2GeoLockService); + this.clusterService = clusterService; + this.datasourceUpdateService = new DatasourceUpdateService(clusterService, datasourceDao, geoIpDataDao, urlDenyListChecker); + this.ip2GeoExecutor = new Ip2GeoExecutor(threadPool); + this.ip2GeoLockService = new Ip2GeoLockService(clusterService); return List.of( UploadStats.getInstance(), @@ -278,8 +270,15 @@ public List getAggregations() { @Override public void onNodeStarted(DiscoveryNode localNode) { - System.out.println("onNodeStarted"); - System.out.println("LockService: " + GuiceHolder.getLockService()); + LockService lockService = GuiceHolder.getLockService(); + ip2GeoLockService.initialize(lockService); + + /** + * We don't need to return datasource runner because it is used only by job scheduler and job scheduler + * does not use DI but it calls DatasourceExtension#getJobRunner to get DatasourceRunner instance. + */ + DatasourceRunner.getJobRunnerInstance() + .initialize(this.clusterService, this.datasourceUpdateService, this.ip2GeoExecutor, this.datasourceDao, this.ip2GeoLockService); } public static class GuiceHolder implements LifecycleComponent { @@ -287,9 +286,7 @@ public static class GuiceHolder implements LifecycleComponent { private static LockService lockService; @Inject - public GuiceHolder( - final LockService lockService - ) { + public GuiceHolder(final LockService lockService) { GuiceHolder.lockService = lockService; } From bc6544bae9777e6028611b1796ebe344cebaa13b Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Thu, 5 Sep 2024 08:34:19 -0400 Subject: [PATCH 3/3] Fix failing tests Signed-off-by: Craig Perkins --- .../geospatial/plugin/GeospatialPlugin.java | 4 +-- .../geospatial/ClusterSettingHelper.java | 3 +- .../geospatial/TestGeospatialPlugin.java | 29 +++++++++++++++++++ .../plugin/GeospatialPluginTests.java | 2 +- 4 files changed, 33 insertions(+), 5 deletions(-) create mode 100644 src/test/java/org/opensearch/geospatial/TestGeospatialPlugin.java diff --git a/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java b/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java index 8882bb72..14f4ac42 100644 --- a/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java +++ b/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java @@ -144,9 +144,9 @@ public void onIndexModule(IndexModule indexModule) { @Override public Collection> getGuiceServiceClasses() { - final List> services = new ArrayList<>(1); - services.add(GuiceHolder.class); + final List> services = new ArrayList<>(2); services.add(Ip2GeoListener.class); + services.add(GuiceHolder.class); return services; } diff --git a/src/test/java/org/opensearch/geospatial/ClusterSettingHelper.java b/src/test/java/org/opensearch/geospatial/ClusterSettingHelper.java index 93bde1b6..c979a075 100644 --- a/src/test/java/org/opensearch/geospatial/ClusterSettingHelper.java +++ b/src/test/java/org/opensearch/geospatial/ClusterSettingHelper.java @@ -24,7 +24,6 @@ import org.opensearch.common.network.NetworkModule; import org.opensearch.common.settings.Settings; import org.opensearch.env.Environment; -import org.opensearch.geospatial.plugin.GeospatialPlugin; import org.opensearch.node.MockNode; import org.opensearch.node.Node; import org.opensearch.plugins.Plugin; @@ -49,7 +48,7 @@ private List> basePlugins() { List> plugins = new ArrayList<>(); plugins.add(getTestTransportPlugin()); plugins.add(MockHttpTransport.TestPlugin.class); - plugins.add(GeospatialPlugin.class); + plugins.add(TestGeospatialPlugin.class); return plugins; } diff --git a/src/test/java/org/opensearch/geospatial/TestGeospatialPlugin.java b/src/test/java/org/opensearch/geospatial/TestGeospatialPlugin.java new file mode 100644 index 00000000..291c384a --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/TestGeospatialPlugin.java @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.opensearch.common.lifecycle.LifecycleComponent; +import org.opensearch.geospatial.ip2geo.listener.Ip2GeoListener; +import org.opensearch.geospatial.plugin.GeospatialPlugin; + +/** + * This class is needed for ClusterSettingsHelper.createMockNode to instantiate a test instance of the + * GeospatialPlugin without the JobSchedulerPlugin installed. Without overriding this class, the + * GeospatialPlugin would try to Inject JobScheduler's LockService in the GuiceHolder which will + * fail because JobScheduler is not installed + */ +public class TestGeospatialPlugin extends GeospatialPlugin { + @Override + public Collection> getGuiceServiceClasses() { + final List> services = new ArrayList<>(1); + services.add(Ip2GeoListener.class); + return services; + } +} diff --git a/src/test/java/org/opensearch/geospatial/plugin/GeospatialPluginTests.java b/src/test/java/org/opensearch/geospatial/plugin/GeospatialPluginTests.java index 5bfb489d..0ac04e29 100644 --- a/src/test/java/org/opensearch/geospatial/plugin/GeospatialPluginTests.java +++ b/src/test/java/org/opensearch/geospatial/plugin/GeospatialPluginTests.java @@ -167,7 +167,7 @@ public void testCreateComponents() { } public void testGetGuiceServiceClasses() { - Collection> classes = List.of(Ip2GeoListener.class); + Collection> classes = List.of(Ip2GeoListener.class, GeospatialPlugin.GuiceHolder.class); assertEquals(classes, plugin.getGuiceServiceClasses()); }