Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
findingrish committed May 6, 2024
2 parents 1598bf9 + 2a638d7 commit 26eb44d
Show file tree
Hide file tree
Showing 24 changed files with 1,036 additions and 304 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.druid.server.http.catalog;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import org.apache.curator.shaded.com.google.common.collect.ImmutableMap;
import org.apache.druid.catalog.CatalogException;
import org.apache.druid.catalog.http.TableEditRequest;
Expand All @@ -34,10 +36,12 @@
import org.apache.druid.catalog.model.Columns;
import org.apache.druid.catalog.model.TableId;
import org.apache.druid.catalog.model.TableMetadata;
import org.apache.druid.catalog.model.table.ClusterKeySpec;
import org.apache.druid.catalog.model.table.DatasourceDefn;
import org.apache.druid.catalog.model.table.TableBuilder;
import org.apache.druid.catalog.storage.CatalogStorage;
import org.apache.druid.catalog.storage.CatalogTests;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.metadata.TestDerbyConnector;
import org.junit.After;
import org.junit.Before;
Expand All @@ -56,6 +60,7 @@

public class EditorTest
{
private static final ObjectMapper MAPPER = new DefaultObjectMapper();
@Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();

Expand Down Expand Up @@ -326,7 +331,6 @@ public void testUpdateProperties() throws CatalogException
// Can't test an empty property set: no table type allows empty
// properties.

// Remove a required property
Map<String, Object> updates = new HashMap<>();
updates.put(DatasourceDefn.SEGMENT_GRANULARITY_PROPERTY, null);
cmd = new UpdateProperties(updates);
Expand Down Expand Up @@ -374,6 +378,37 @@ public void testUpdateProperties() throws CatalogException
expected,
doEdit(tableName, cmd).spec().properties()
);

// Add a DESC cluster key - should fail
Map<String, Object> updates1 = new HashMap<>();
updates1.put(DatasourceDefn.CLUSTER_KEYS_PROPERTY, ImmutableList.of(new ClusterKeySpec("clusterKeyA", true)));

assertThrows(
CatalogException.class,
() -> new TableEditor(
catalog,
table.id(),
new UpdateProperties(updates1)
).go()
);

// Add a ASC cluster key - should succeed
updates = new HashMap<>();
updates.put(DatasourceDefn.CLUSTER_KEYS_PROPERTY, ImmutableList.of(new ClusterKeySpec("clusterKeyA", false)));
cmd = new UpdateProperties(updates);
expected = ImmutableMap.of(
DatasourceDefn.SEGMENT_GRANULARITY_PROPERTY, "PT1H",
DatasourceDefn.CLUSTER_KEYS_PROPERTY, ImmutableList.of(new ClusterKeySpec("clusterKeyA", false))
);
Map<String, Object> actual = doEdit(tableName, cmd).spec().properties();
actual.put(
DatasourceDefn.CLUSTER_KEYS_PROPERTY,
MAPPER.convertValue(actual.get(DatasourceDefn.CLUSTER_KEYS_PROPERTY), ClusterKeySpec.CLUSTER_KEY_LIST_TYPE_REF)
);
assertEquals(
expected,
actual
);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorCursor;
import org.apache.druid.segment.vector.VectorOffset;
import org.apache.druid.utils.CloseableUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;

Expand Down Expand Up @@ -207,69 +208,76 @@ public VectorCursor buildVectorized(final int vectorSize)
final Closer closer = Closer.create();
final ColumnCache columnCache = new ColumnCache(index, closer);

final ColumnSelectorColumnIndexSelector bitmapIndexSelector = new ColumnSelectorColumnIndexSelector(
index.getBitmapFactoryForDimensions(),
virtualColumns,
columnCache
);

final int numRows = index.getNumRows();
final FilterBundle filterBundle = makeFilterBundle(bitmapIndexSelector, numRows);
// Wrap the remainder of cursor setup in a try, so if an error is encountered while setting it up, we don't
// leak columns in the ColumnCache.
try {
final ColumnSelectorColumnIndexSelector bitmapIndexSelector = new ColumnSelectorColumnIndexSelector(
index.getBitmapFactoryForDimensions(),
virtualColumns,
columnCache
);

NumericColumn timestamps = null;
final int numRows = index.getNumRows();
final FilterBundle filterBundle = makeFilterBundle(bitmapIndexSelector, numRows);

final int startOffset;
final int endOffset;
NumericColumn timestamps = null;

if (interval.getStartMillis() > minDataTimestamp) {
timestamps = (NumericColumn) columnCache.getColumn(ColumnHolder.TIME_COLUMN_NAME);
final int startOffset;
final int endOffset;

startOffset = timeSearch(timestamps, interval.getStartMillis(), 0, index.getNumRows());
} else {
startOffset = 0;
}

if (interval.getEndMillis() <= maxDataTimestamp) {
if (timestamps == null) {
if (interval.getStartMillis() > minDataTimestamp) {
timestamps = (NumericColumn) columnCache.getColumn(ColumnHolder.TIME_COLUMN_NAME);
}

endOffset = timeSearch(timestamps, interval.getEndMillis(), startOffset, index.getNumRows());
} else {
endOffset = index.getNumRows();
}
startOffset = timeSearch(timestamps, interval.getStartMillis(), 0, index.getNumRows());
} else {
startOffset = 0;
}

// filterBundle will only be null if the filter itself is null, otherwise check to see if the filter can use
// an index
final VectorOffset baseOffset =
filterBundle == null || filterBundle.getIndex() == null
? new NoFilterVectorOffset(vectorSize, startOffset, endOffset)
: new BitmapVectorOffset(vectorSize, filterBundle.getIndex().getBitmap(), startOffset, endOffset);
if (interval.getEndMillis() <= maxDataTimestamp) {
if (timestamps == null) {
timestamps = (NumericColumn) columnCache.getColumn(ColumnHolder.TIME_COLUMN_NAME);
}

// baseColumnSelectorFactory using baseOffset is the column selector for filtering.
final VectorColumnSelectorFactory baseColumnSelectorFactory = makeVectorColumnSelectorFactoryForOffset(
columnCache,
baseOffset
);
endOffset = timeSearch(timestamps, interval.getEndMillis(), startOffset, index.getNumRows());
} else {
endOffset = index.getNumRows();
}

// filterBundle will only be null if the filter itself is null, otherwise check to see if the filter needs to use
// a value matcher
if (filterBundle != null && filterBundle.getMatcherBundle() != null) {
final VectorValueMatcher vectorValueMatcher = filterBundle.getMatcherBundle()
.vectorMatcher(baseColumnSelectorFactory, baseOffset);
final VectorOffset filteredOffset = FilteredVectorOffset.create(
baseOffset,
vectorValueMatcher
);
// filterBundle will only be null if the filter itself is null, otherwise check to see if the filter can use
// an index
final VectorOffset baseOffset =
filterBundle == null || filterBundle.getIndex() == null
? new NoFilterVectorOffset(vectorSize, startOffset, endOffset)
: new BitmapVectorOffset(vectorSize, filterBundle.getIndex().getBitmap(), startOffset, endOffset);

// Now create the cursor and column selector that will be returned to the caller.
final VectorColumnSelectorFactory filteredColumnSelectorFactory = makeVectorColumnSelectorFactoryForOffset(
// baseColumnSelectorFactory using baseOffset is the column selector for filtering.
final VectorColumnSelectorFactory baseColumnSelectorFactory = makeVectorColumnSelectorFactoryForOffset(
columnCache,
filteredOffset
baseOffset
);
return new QueryableIndexVectorCursor(filteredColumnSelectorFactory, filteredOffset, vectorSize, closer);
} else {
return new QueryableIndexVectorCursor(baseColumnSelectorFactory, baseOffset, vectorSize, closer);

// filterBundle will only be null if the filter itself is null, otherwise check to see if the filter needs to use
// a value matcher
if (filterBundle != null && filterBundle.getMatcherBundle() != null) {
final VectorValueMatcher vectorValueMatcher = filterBundle.getMatcherBundle()
.vectorMatcher(baseColumnSelectorFactory, baseOffset);
final VectorOffset filteredOffset = FilteredVectorOffset.create(
baseOffset,
vectorValueMatcher
);

// Now create the cursor and column selector that will be returned to the caller.
final VectorColumnSelectorFactory filteredColumnSelectorFactory = makeVectorColumnSelectorFactoryForOffset(
columnCache,
filteredOffset
);
return new QueryableIndexVectorCursor(filteredColumnSelectorFactory, filteredOffset, vectorSize, closer);
} else {
return new QueryableIndexVectorCursor(baseColumnSelectorFactory, baseOffset, vectorSize, closer);
}
}
catch (Throwable t) {
throw CloseableUtils.closeAndWrapInCatch(t, closer);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,34 @@ public void validate(Object value, ObjectMapper jsonMapper)
}
}

public static class ClusterKeysDefn extends ModelProperties.ListPropertyDefn<ClusterKeySpec>
{
public ClusterKeysDefn()
{
super(
CLUSTER_KEYS_PROPERTY,
"ClusterKeySpec list",
new TypeReference<List<ClusterKeySpec>>() {}
);
}

@Override
public void validate(Object value, ObjectMapper jsonMapper)
{
if (value == null) {
return;
}
List<ClusterKeySpec> clusterKeys = decode(value, jsonMapper);
for (ClusterKeySpec clusterKey : clusterKeys) {
if (clusterKey.desc()) {
throw new IAE(
StringUtils.format("Cannot specify DESC clustering key [%s]. Only ASC is supported.", clusterKey)
);
}
}
}
}

public DatasourceDefn()
{
super(
Expand All @@ -118,11 +146,7 @@ public DatasourceDefn()
Arrays.asList(
new SegmentGranularityFieldDefn(),
new ModelProperties.IntPropertyDefn(TARGET_SEGMENT_ROWS_PROPERTY),
new ModelProperties.ListPropertyDefn<ClusterKeySpec>(
CLUSTER_KEYS_PROPERTY,
"cluster keys",
new TypeReference<List<ClusterKeySpec>>() { }
),
new ClusterKeysDefn(),
new HiddenColumnsDefn(),
new ModelProperties.BooleanPropertyDefn(SEALED_PROPERTY)
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,12 +266,6 @@ public int getMaxKillTaskSlots()
return maxKillTaskSlots;
}

@JsonIgnore
public boolean isKillUnusedSegmentsInAllDataSources()
{
return specificDataSourcesToKillUnusedSegmentsIn.isEmpty();
}

@JsonProperty("killPendingSegmentsSkipList")
public Set<String> getDataSourcesToNotKillStalePendingSegmentsIn()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.catalog.model.table;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.catalog.CatalogTest;
Expand Down Expand Up @@ -116,6 +117,28 @@ public void testEmptySpec()
}
}

@Test
public void testSpecWithClusterKeyProp()
{
{
TableSpec spec = new TableSpec(
DatasourceDefn.TABLE_TYPE,
ImmutableMap.of(DatasourceDefn.CLUSTER_KEYS_PROPERTY, ImmutableList.of(new ClusterKeySpec("clusterKeyA", true))),
null
);
expectValidationFails(spec);
}

{
TableSpec spec = new TableSpec(
DatasourceDefn.TABLE_TYPE,
ImmutableMap.of(DatasourceDefn.CLUSTER_KEYS_PROPERTY, ImmutableList.of(new ClusterKeySpec("clusterKeyA", false))),
null
);
expectValidationSucceeds(spec);
}
}

@Test
public void testAllProperties()
{
Expand All @@ -125,13 +148,15 @@ public void testAllProperties()
.put(DatasourceDefn.TARGET_SEGMENT_ROWS_PROPERTY, 1_000_000)
.put(DatasourceDefn.HIDDEN_COLUMNS_PROPERTY, Arrays.asList("foo", "bar"))
.put(DatasourceDefn.SEALED_PROPERTY, true)
.put(DatasourceDefn.CLUSTER_KEYS_PROPERTY, ImmutableList.of(new ClusterKeySpec("clusterKeyA", false)))
.build();

TableSpec spec = new TableSpec(DatasourceDefn.TABLE_TYPE, props, null);
DatasourceFacade facade = new DatasourceFacade(registry.resolve(spec));
assertEquals("P1D", facade.segmentGranularityString());
assertEquals(1_000_000, (int) facade.targetSegmentRows());
assertEquals(Arrays.asList("foo", "bar"), facade.hiddenColumns());
assertEquals(Collections.singletonList(new ClusterKeySpec("clusterKeyA", false)), facade.clusterKeys());
assertTrue(facade.isSealed());
}

Expand Down
Loading

0 comments on commit 26eb44d

Please sign in to comment.