Skip to content

Commit

Permalink
Add support for selective loading of lookups in the task layer (apach…
Browse files Browse the repository at this point in the history
…e#16328)

Changes:
- Add `LookupLoadingSpec` to support 3 modes of lookup loading: ALL, NONE, ONLY_REQUIRED
- Add method `Task.getLookupLoadingSpec()`
- Do not load any lookups for `KillUnusedSegmentsTask`
  • Loading branch information
Akshat-Jain authored Apr 29, 2024
1 parent 9aef8e0 commit 9d2cae4
Show file tree
Hide file tree
Showing 12 changed files with 328 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CollectionUtils;
Expand Down Expand Up @@ -339,4 +340,10 @@ private NavigableMap<DateTime, List<TaskLock>> getNonRevokedTaskLockMap(TaskActi
);
return taskLockMap;
}

@Override
public LookupLoadingSpec getLookupLoadingSpec()
{
return LookupLoadingSpec.NONE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -331,4 +333,10 @@ static TaskInfo<TaskIdentifier, TaskStatus> toTaskIdentifierInfo(TaskInfo<Task,
taskInfo.getTask().getMetadata()
);
}

@Nullable
default LookupLoadingSpec getLookupLoadingSpec()
{
return LookupLoadingSpec.ALL;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.timeline.DataSegment;
import org.assertj.core.api.Assertions;
import org.hamcrest.MatcherAssert;
Expand Down Expand Up @@ -423,6 +424,17 @@ public void testGetInputSourceResources()
Assert.assertTrue(task.getInputSourceResources().isEmpty());
}

@Test
public void testGetLookupsToLoad()
{
final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
.dataSource(DATA_SOURCE)
.interval(Intervals.of("2019-03-01/2019-04-01"))
.markAsUnused(true)
.build();
Assert.assertEquals(LookupLoadingSpec.Mode.NONE, task.getLookupLoadingSpec().getMode());
}

@Test
public void testKillBatchSizeOneAndLimit4() throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;

class LookupListeningAnnouncerConfig
Expand Down Expand Up @@ -57,4 +58,9 @@ public String getLookupTier()
lookupTierIsDatasource ? "bound value" : LookupModule.PROPERTY_BASE
);
}

public LookupLoadingSpec getLookupLoadingSpec()
{
return dataSourceTaskIdHolder.getLookupLoadingSpec();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;

Expand All @@ -70,6 +72,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* This class provide a basic {@link LookupExtractorFactory} references manager. It allows basic operations fetching,
Expand Down Expand Up @@ -167,7 +170,7 @@ public void start() throws IOException
if (!Strings.isNullOrEmpty(lookupConfig.getSnapshotWorkingDir())) {
FileUtils.mkdirp(new File(lookupConfig.getSnapshotWorkingDir()));
}
loadAllLookupsAndInitStateRef();
loadLookupsAndInitStateRef();
if (!testMode) {
mainThread = Execs.makeThread(
"LookupExtractorFactoryContainerProvider-MainThread",
Expand Down Expand Up @@ -373,10 +376,26 @@ private void takeSnapshot(Map<String, LookupExtractorFactoryContainer> lookupMap
}
}

private void loadAllLookupsAndInitStateRef()
/**
* Load a set of lookups based on the injected value in {@link DataSourceTaskIdHolder#getLookupLoadingSpec()}.
*/
private void loadLookupsAndInitStateRef()
{
List<LookupBean> lookupBeanList = getLookupsList();
if (lookupBeanList != null) {
LookupLoadingSpec lookupLoadingSpec = lookupListeningAnnouncerConfig.getLookupLoadingSpec();
LOG.info("Loading lookups using spec[%s].", lookupLoadingSpec);
List<LookupBean> lookupBeanList;
if (lookupLoadingSpec.getMode() == LookupLoadingSpec.Mode.NONE) {
lookupBeanList = Collections.emptyList();
} else {
lookupBeanList = getLookupsList();
if (lookupLoadingSpec.getMode() == LookupLoadingSpec.Mode.ONLY_REQUIRED && lookupBeanList != null) {
lookupBeanList = lookupBeanList.stream()
.filter(lookupBean -> lookupLoadingSpec.getLookupsToLoad().contains(lookupBean.getName()))
.collect(Collectors.toList());
}
}

if (lookupBeanList != null && !lookupBeanList.isEmpty()) {
startLookups(lookupBeanList);
} else {
LOG.debug("No lookups to be loaded at this point.");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.server.lookup.cache;

import com.google.common.collect.ImmutableSet;
import org.apache.druid.error.InvalidInput;

import java.util.Set;

/**
* This class defines the spec for loading of lookups for a given task. It contains 2 fields:
* <ol>
* <li>{@link LookupLoadingSpec#mode}: This mode defines whether lookups need to be
* loaded for the given task, or not. It can take 3 values: </li>
* <ul>
* <li> ALL: Load all the lookups.</li>
* <li> NONE: Load no lookups. </li>
* <li> ONLY_REQUIRED: Load only the lookups defined in lookupsToLoad </li>
* </ul>
* <li>{@link LookupLoadingSpec#lookupsToLoad}: Defines the lookups to load when the lookupLoadingMode is set to ONLY_REQUIRED.</li>
* </ol>
*/
public class LookupLoadingSpec
{
public enum Mode
{
ALL, NONE, ONLY_REQUIRED
}

private final Mode mode;
private final ImmutableSet<String> lookupsToLoad;

public static final LookupLoadingSpec ALL = new LookupLoadingSpec(Mode.ALL, null);
public static final LookupLoadingSpec NONE = new LookupLoadingSpec(Mode.NONE, null);

private LookupLoadingSpec(Mode mode, Set<String> lookupsToLoad)
{
this.mode = mode;
this.lookupsToLoad = lookupsToLoad == null ? null : ImmutableSet.copyOf(lookupsToLoad);
}

/**
* Creates a LookupLoadingSpec which loads only the lookups present in the given set.
*/
public static LookupLoadingSpec loadOnly(Set<String> lookupsToLoad)
{
if (lookupsToLoad == null) {
throw InvalidInput.exception("Expected non-null set of lookups to load.");
}
return new LookupLoadingSpec(Mode.ONLY_REQUIRED, lookupsToLoad);
}

public Mode getMode()
{
return mode;
}

/**
* @return A non-null immutable set of lookup names when {@link LookupLoadingSpec#mode} is ONLY_REQUIRED, null otherwise.
*/
public ImmutableSet<String> getLookupsToLoad()
{
return lookupsToLoad;
}

@Override
public String toString()
{
return "LookupLoadingSpec{" +
"mode=" + mode +
", lookupsToLoad=" + lookupsToLoad +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,27 @@

import com.google.inject.Inject;
import com.google.inject.name.Named;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;

import javax.annotation.Nullable;

public class DataSourceTaskIdHolder
{
public static final String DATA_SOURCE_BINDING = "druidDataSource";
public static final String TASK_ID_BINDING = "druidTaskId";
public static final String LOOKUPS_TO_LOAD_FOR_TASK = "lookupsToLoadForTask";
@Named(DATA_SOURCE_BINDING)
@Inject(optional = true)
String dataSource = null;
@Named(TASK_ID_BINDING)
@Inject(optional = true)
String taskId = null;

@Nullable
@Named(LOOKUPS_TO_LOAD_FOR_TASK)
@Inject(optional = true)
LookupLoadingSpec lookupLoadingSpec = LookupLoadingSpec.ALL;

public String getDataSource()
{
return dataSource;
Expand All @@ -42,4 +51,9 @@ public String getTaskId()
{
return taskId;
}

public LookupLoadingSpec getLookupLoadingSpec()
{
return lookupLoadingSpec;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,22 @@
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.TypeLiteral;
import com.google.inject.name.Names;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.JsonConfigurator;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.initialization.Initialization;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;

public class LookupListeningAnnouncerConfigTest
Expand All @@ -57,6 +61,11 @@ public void configure(Binder binder)
binder
.bind(Key.get(String.class, Names.named(DataSourceTaskIdHolder.DATA_SOURCE_BINDING)))
.toInstance("some_datasource");

final List<String> lookupsToLoad = Arrays.asList("lookupName1", "lookupName2");
binder.bind(new TypeLiteral<List<String>>() {})
.annotatedWith(Names.named(DataSourceTaskIdHolder.LOOKUPS_TO_LOAD_FOR_TASK))
.toInstance(lookupsToLoad);
}
},
new LookupModule()
Expand Down Expand Up @@ -127,6 +136,14 @@ public void testDatasourceInjection()
Assert.assertEquals("some_datasource", config.getLookupTier());
}

@Test
public void testLookupsToLoadInjection()
{
final DataSourceTaskIdHolder dimensionIdHolder = new DataSourceTaskIdHolder();
injector.injectMembers(dimensionIdHolder);
Assert.assertEquals(LookupLoadingSpec.Mode.ALL, dimensionIdHolder.getLookupLoadingSpec().getMode());
}

@Test(expected = IllegalArgumentException.class)
public void testFailsInjection()
{
Expand Down
Loading

0 comments on commit 9d2cae4

Please sign in to comment.