From 9d2cae40c3fd941658641d8a753735ec6d74340e Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Mon, 29 Apr 2024 07:19:59 +0530 Subject: [PATCH] Add support for selective loading of lookups in the task layer (#16328) Changes: - Add `LookupLoadingSpec` to support 3 modes of lookup loading: ALL, NONE, ONLY_REQUIRED - Add method `Task.getLookupLoadingSpec()` - Do not load any lookups for `KillUnusedSegmentsTask` --- .../common/task/KillUnusedSegmentsTask.java | 7 ++ .../druid/indexing/common/task/Task.java | 8 ++ .../task/KillUnusedSegmentsTaskTest.java | 12 +++ .../LookupListeningAnnouncerConfig.java | 6 ++ .../query/lookup/LookupReferencesManager.java | 27 +++++- .../lookup/cache/LookupLoadingSpec.java | 91 +++++++++++++++++++ .../metrics/DataSourceTaskIdHolder.java | 14 +++ .../LookupListeningAnnouncerConfigTest.java | 17 ++++ .../lookup/LookupReferencesManagerTest.java | 78 ++++++++++++++++ .../lookup/cache/LookupLoadingSpecTest.java | 62 +++++++++++++ .../java/org/apache/druid/cli/CliPeon.java | 9 ++ .../planner/SqlResourceCollectorShuttle.java | 2 +- 12 files changed, 328 insertions(+), 5 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java create mode 100644 server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java index acd490a6d211..78a7abae7a6b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java @@ -43,6 +43,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.timeline.DataSegment; import org.apache.druid.utils.CollectionUtils; @@ -339,4 +340,10 @@ private NavigableMap> getNonRevokedTaskLockMap(TaskActi ); return taskLockMap; } + + @Override + public LookupLoadingSpec getLookupLoadingSpec() + { + return LookupLoadingSpec.NONE; + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index a5f34f9bbbfe..7556908023a4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -41,11 +41,13 @@ import org.apache.druid.java.util.common.UOE; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.server.security.ResourceType; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -331,4 +333,10 @@ static TaskInfo toTaskIdentifierInfo(TaskInfo lookupMap } } - private void loadAllLookupsAndInitStateRef() + /** + * Load a set of lookups based on the injected value in {@link DataSourceTaskIdHolder#getLookupLoadingSpec()}. + */ + private void loadLookupsAndInitStateRef() { - List lookupBeanList = getLookupsList(); - if (lookupBeanList != null) { + LookupLoadingSpec lookupLoadingSpec = lookupListeningAnnouncerConfig.getLookupLoadingSpec(); + LOG.info("Loading lookups using spec[%s].", lookupLoadingSpec); + List lookupBeanList; + if (lookupLoadingSpec.getMode() == LookupLoadingSpec.Mode.NONE) { + lookupBeanList = Collections.emptyList(); + } else { + lookupBeanList = getLookupsList(); + if (lookupLoadingSpec.getMode() == LookupLoadingSpec.Mode.ONLY_REQUIRED && lookupBeanList != null) { + lookupBeanList = lookupBeanList.stream() + .filter(lookupBean -> lookupLoadingSpec.getLookupsToLoad().contains(lookupBean.getName())) + .collect(Collectors.toList()); + } + } + + if (lookupBeanList != null && !lookupBeanList.isEmpty()) { startLookups(lookupBeanList); } else { LOG.debug("No lookups to be loaded at this point."); diff --git a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java new file mode 100644 index 000000000000..88524fe27f96 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.lookup.cache; + +import com.google.common.collect.ImmutableSet; +import org.apache.druid.error.InvalidInput; + +import java.util.Set; + +/** + * This class defines the spec for loading of lookups for a given task. It contains 2 fields: + *
    + *
  1. {@link LookupLoadingSpec#mode}: This mode defines whether lookups need to be + * loaded for the given task, or not. It can take 3 values:
  2. + *
      + *
    • ALL: Load all the lookups.
    • + *
    • NONE: Load no lookups.
    • + *
    • ONLY_REQUIRED: Load only the lookups defined in lookupsToLoad
    • + *
    + *
  3. {@link LookupLoadingSpec#lookupsToLoad}: Defines the lookups to load when the lookupLoadingMode is set to ONLY_REQUIRED.
  4. + *
+ */ +public class LookupLoadingSpec +{ + public enum Mode + { + ALL, NONE, ONLY_REQUIRED + } + + private final Mode mode; + private final ImmutableSet lookupsToLoad; + + public static final LookupLoadingSpec ALL = new LookupLoadingSpec(Mode.ALL, null); + public static final LookupLoadingSpec NONE = new LookupLoadingSpec(Mode.NONE, null); + + private LookupLoadingSpec(Mode mode, Set lookupsToLoad) + { + this.mode = mode; + this.lookupsToLoad = lookupsToLoad == null ? null : ImmutableSet.copyOf(lookupsToLoad); + } + + /** + * Creates a LookupLoadingSpec which loads only the lookups present in the given set. + */ + public static LookupLoadingSpec loadOnly(Set lookupsToLoad) + { + if (lookupsToLoad == null) { + throw InvalidInput.exception("Expected non-null set of lookups to load."); + } + return new LookupLoadingSpec(Mode.ONLY_REQUIRED, lookupsToLoad); + } + + public Mode getMode() + { + return mode; + } + + /** + * @return A non-null immutable set of lookup names when {@link LookupLoadingSpec#mode} is ONLY_REQUIRED, null otherwise. + */ + public ImmutableSet getLookupsToLoad() + { + return lookupsToLoad; + } + + @Override + public String toString() + { + return "LookupLoadingSpec{" + + "mode=" + mode + + ", lookupsToLoad=" + lookupsToLoad + + '}'; + } +} diff --git a/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java b/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java index eed680390303..6d2dafd31a55 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java +++ b/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java @@ -21,11 +21,15 @@ import com.google.inject.Inject; import com.google.inject.name.Named; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; + +import javax.annotation.Nullable; public class DataSourceTaskIdHolder { public static final String DATA_SOURCE_BINDING = "druidDataSource"; public static final String TASK_ID_BINDING = "druidTaskId"; + public static final String LOOKUPS_TO_LOAD_FOR_TASK = "lookupsToLoadForTask"; @Named(DATA_SOURCE_BINDING) @Inject(optional = true) String dataSource = null; @@ -33,6 +37,11 @@ public class DataSourceTaskIdHolder @Inject(optional = true) String taskId = null; + @Nullable + @Named(LOOKUPS_TO_LOAD_FOR_TASK) + @Inject(optional = true) + LookupLoadingSpec lookupLoadingSpec = LookupLoadingSpec.ALL; + public String getDataSource() { return dataSource; @@ -42,4 +51,9 @@ public String getTaskId() { return taskId; } + + public LookupLoadingSpec getLookupLoadingSpec() + { + return lookupLoadingSpec; + } } diff --git a/server/src/test/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfigTest.java b/server/src/test/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfigTest.java index a0f3bc9ccacf..75cbfa2fd1e7 100644 --- a/server/src/test/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfigTest.java +++ b/server/src/test/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfigTest.java @@ -24,6 +24,7 @@ import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Module; +import com.google.inject.TypeLiteral; import com.google.inject.name.Names; import org.apache.druid.guice.GuiceInjectors; import org.apache.druid.guice.JsonConfigProvider; @@ -31,11 +32,14 @@ import org.apache.druid.guice.annotations.Self; import org.apache.druid.initialization.Initialization; import org.apache.druid.server.DruidNode; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.metrics.DataSourceTaskIdHolder; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.util.Arrays; +import java.util.List; import java.util.Properties; public class LookupListeningAnnouncerConfigTest @@ -57,6 +61,11 @@ public void configure(Binder binder) binder .bind(Key.get(String.class, Names.named(DataSourceTaskIdHolder.DATA_SOURCE_BINDING))) .toInstance("some_datasource"); + + final List lookupsToLoad = Arrays.asList("lookupName1", "lookupName2"); + binder.bind(new TypeLiteral>() {}) + .annotatedWith(Names.named(DataSourceTaskIdHolder.LOOKUPS_TO_LOAD_FOR_TASK)) + .toInstance(lookupsToLoad); } }, new LookupModule() @@ -127,6 +136,14 @@ public void testDatasourceInjection() Assert.assertEquals("some_datasource", config.getLookupTier()); } + @Test + public void testLookupsToLoadInjection() + { + final DataSourceTaskIdHolder dimensionIdHolder = new DataSourceTaskIdHolder(); + injector.injectMembers(dimensionIdHolder); + Assert.assertEquals(LookupLoadingSpec.Mode.ALL, dimensionIdHolder.getLookupLoadingSpec().getMode()); + } + @Test(expected = IllegalArgumentException.class) public void testFailsInjection() { diff --git a/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java b/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java index d004b7780d73..442a2b478f59 100644 --- a/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java +++ b/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java @@ -27,6 +27,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.easymock.EasyMock; import org.jboss.netty.buffer.BigEndianHeapChannelBuffer; @@ -51,6 +52,7 @@ public class LookupReferencesManagerTest { private static final String LOOKUP_TIER = "lookupTier"; + @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); LookupReferencesManager lookupReferencesManager; @@ -68,6 +70,7 @@ public void setUp() throws IOException druidLeaderClient = EasyMock.createMock(DruidLeaderClient.class); config = EasyMock.createMock(LookupListeningAnnouncerConfig.class); + EasyMock.expect(config.getLookupLoadingSpec()).andReturn(LookupLoadingSpec.ALL).anyTimes(); lookupExtractorFactory = new MapLookupExtractorFactory( ImmutableMap.of( @@ -765,6 +768,80 @@ public void testCoordinatorLookupSync() throws Exception } + private Map getLookupMapForSelectiveLoadingOfLookups(LookupLoadingSpec lookupLoadingSpec) + throws Exception + { + LookupExtractorFactoryContainer container1 = new LookupExtractorFactoryContainer( + "0", + new MapLookupExtractorFactory(ImmutableMap.of("key1", "value1"), true) + ); + + LookupExtractorFactoryContainer container2 = new LookupExtractorFactoryContainer( + "0", + new MapLookupExtractorFactory(ImmutableMap.of("key2", "value2"), true + ) + ); + + LookupExtractorFactoryContainer container3 = new LookupExtractorFactoryContainer( + "0", + new MapLookupExtractorFactory(ImmutableMap.of("key3", "value3"), true + ) + ); + EasyMock.reset(config); + EasyMock.reset(druidLeaderClient); + Map lookupMap = new HashMap<>(); + lookupMap.put("testLookup1", container1); + lookupMap.put("testLookup2", container2); + lookupMap.put("testLookup3", container3); + String strResult = mapper.writeValueAsString(lookupMap); + Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx")); + EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER); + EasyMock.expect(config.getLookupLoadingSpec()).andReturn(lookupLoadingSpec); + EasyMock.replay(config); + EasyMock.expect(druidLeaderClient.makeRequest( + HttpMethod.GET, + "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true" + )) + .andReturn(request); + StringFullResponseHolder responseHolder = new StringFullResponseHolder( + newEmptyResponse(HttpResponseStatus.OK), + StandardCharsets.UTF_8 + ).addChunk(strResult); + EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder); + EasyMock.replay(druidLeaderClient); + + lookupReferencesManager.start(); + return lookupMap; + } + + @Test + public void testCoordinatorLoadAllLookups() throws Exception + { + Map lookupMap = getLookupMapForSelectiveLoadingOfLookups(LookupLoadingSpec.ALL); + for (String lookupName : lookupMap.keySet()) { + Assert.assertEquals(Optional.of(lookupMap.get(lookupName)), lookupReferencesManager.get(lookupName)); + } + } + + @Test + public void testCoordinatorLoadNoLookups() throws Exception + { + Map lookupMap = getLookupMapForSelectiveLoadingOfLookups(LookupLoadingSpec.NONE); + for (String lookupName : lookupMap.keySet()) { + Assert.assertFalse(lookupReferencesManager.get(lookupName).isPresent()); + } + } + + @Test + public void testCoordinatorLoadSubsetOfLookups() throws Exception + { + Map lookupMap = + getLookupMapForSelectiveLoadingOfLookups(LookupLoadingSpec.loadOnly(ImmutableSet.of("testLookup1", "testLookup2"))); + Assert.assertEquals(Optional.of(lookupMap.get("testLookup1")), lookupReferencesManager.get("testLookup1")); + Assert.assertEquals(Optional.of(lookupMap.get("testLookup2")), lookupReferencesManager.get("testLookup2")); + Assert.assertFalse(lookupReferencesManager.get("testLookup3").isPresent()); + } + @Test public void testLoadLookupOnCoordinatorFailure() throws Exception { @@ -818,6 +895,7 @@ public int getCoordinatorRetryDelay() EasyMock.reset(config); EasyMock.reset(druidLeaderClient); EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes(); + EasyMock.expect(config.getLookupLoadingSpec()).andReturn(LookupLoadingSpec.ALL).anyTimes(); EasyMock.replay(config); EasyMock.expect(druidLeaderClient.makeRequest( HttpMethod.GET, diff --git a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java new file mode 100644 index 000000000000..8d0a7a5518a3 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.lookup.cache; + +import com.google.common.collect.ImmutableSet; +import org.apache.druid.error.DruidException; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Set; + +public class LookupLoadingSpecTest +{ + @Test + public void testLoadingAllLookups() + { + LookupLoadingSpec spec = LookupLoadingSpec.ALL; + Assert.assertEquals(LookupLoadingSpec.Mode.ALL, spec.getMode()); + Assert.assertNull(spec.getLookupsToLoad()); + } + + @Test + public void testLoadingNoLookups() + { + LookupLoadingSpec spec = LookupLoadingSpec.NONE; + Assert.assertEquals(LookupLoadingSpec.Mode.NONE, spec.getMode()); + Assert.assertNull(spec.getLookupsToLoad()); + } + + @Test + public void testLoadingOnlyRequiredLookups() + { + Set lookupsToLoad = ImmutableSet.of("lookupName1", "lookupName2"); + LookupLoadingSpec spec = LookupLoadingSpec.loadOnly(lookupsToLoad); + Assert.assertEquals(LookupLoadingSpec.Mode.ONLY_REQUIRED, spec.getMode()); + Assert.assertEquals(lookupsToLoad, spec.getLookupsToLoad()); + } + + @Test + public void testLoadingOnlyRequiredLookupsWithNullList() + { + DruidException exception = Assert.assertThrows(DruidException.class, () -> LookupLoadingSpec.loadOnly(null)); + Assert.assertEquals("Expected non-null set of lookups to load.", exception.getMessage()); + } +} diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index 3247c973c340..1ca8ddf539fc 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -131,6 +131,7 @@ import org.apache.druid.server.http.SegmentListerResource; import org.apache.druid.server.initialization.jetty.ChatHandlerServerModule; import org.apache.druid.server.initialization.jetty.JettyServerInitializer; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.metrics.DataSourceTaskIdHolder; import org.apache.druid.server.metrics.ServiceStatusMonitor; import org.apache.druid.tasklogs.TaskPayloadManager; @@ -332,6 +333,14 @@ public String getTaskIDFromTask(final Task task) { return task.getId(); } + + @Provides + @LazySingleton + @Named(DataSourceTaskIdHolder.LOOKUPS_TO_LOAD_FOR_TASK) + public LookupLoadingSpec getLookupsToLoad(final Task task) + { + return task.getLookupLoadingSpec(); + } }, new QueryablePeonModule(), new IndexingServiceFirehoseModule(), diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java index 850b472bc83c..631936972e10 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java @@ -42,7 +42,7 @@ * Walks an {@link SqlNode} to collect a set of {@link Resource} for {@link ResourceType#DATASOURCE} and * {@link ResourceType#VIEW} to use for authorization during query planning. * - * It works by looking for {@link SqlIdentifier} which corespond to a {@link IdentifierNamespace}, where + * It works by looking for {@link SqlIdentifier} which correspond to a {@link IdentifierNamespace}, where * {@link SqlValidatorNamespace} is calcite-speak for sources of data and {@link IdentifierNamespace} specifically are * namespaces which are identified by a single variable, e.g. table names. */