Skip to content

Commit

Permalink
Merge branch 'branch-2.0' into iot_rollup
Browse files Browse the repository at this point in the history
  • Loading branch information
zddr authored Oct 18, 2024
2 parents c47e692 + 59d3326 commit 9dd88f1
Show file tree
Hide file tree
Showing 18 changed files with 330 additions and 101 deletions.
10 changes: 8 additions & 2 deletions be/src/vec/functions/function_json.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,12 @@ class FunctionJsonAlwaysNotNullable : public IFunction {
struct FunctionJsonQuoteImpl {
static constexpr auto name = "json_quote";

static DataTypePtr get_return_type_impl(const DataTypes& arguments) {
if (!arguments.empty() && arguments[0] && arguments[0]->is_nullable()) {
return make_nullable(std::make_shared<DataTypeString>());
}
return std::make_shared<DataTypeString>();
}
static void execute(const std::vector<const ColumnString*>& data_columns,
ColumnString& result_column, size_t input_rows_count) {
rapidjson::Document document;
Expand All @@ -810,13 +816,13 @@ struct FunctionJsonQuoteImpl {
rapidjson::Value value;

rapidjson::StringBuffer buf;
rapidjson::Writer<rapidjson::StringBuffer> writer(buf);

for (int i = 0; i < input_rows_count; i++) {
StringRef data = data_columns[0]->get_data_at(i);
value.SetString(data.data, data.size, allocator);

buf.Clear();
rapidjson::Writer<rapidjson::StringBuffer> writer(buf);
value.Accept(writer);
result_column.insert_data(buf.GetString(), buf.GetSize());
}
Expand Down Expand Up @@ -893,7 +899,7 @@ class FunctionJson : public IFunction {
bool is_variadic() const override { return true; }

DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
return std::make_shared<DataTypeString>();
return Impl::get_return_type_impl(arguments);
}

Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
Expand Down
17 changes: 17 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,23 @@ public long getRowCount(boolean singleReplica) {
return singleReplica ? Double.valueOf(s.average().orElse(0)).longValue() : s.sum();
}

// Get the least row count among all valid replicas.
// The replica with the least row count is the most accurate one. Because it performs most compaction.
public long getMinReplicaRowCount(long version) {
long minRowCount = Long.MAX_VALUE;
long maxReplicaVersion = 0;
for (Replica r : replicas) {
if (r.isAlive()
&& r.checkVersionCatchUp(version, false)
&& (r.getVersion() > maxReplicaVersion
|| r.getVersion() == maxReplicaVersion && r.getRowCount() < minRowCount)) {
minRowCount = r.getRowCount();
maxReplicaVersion = r.getVersion();
}
}
return minRowCount == Long.MAX_VALUE ? 0 : minRowCount;
}

/**
* A replica is healthy only if
* 1. the backend is available
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,17 @@ protected void runAfterCatalogReady() {
long indexRowCount = 0L;
boolean indexReported = true;
for (Tablet tablet : index.getTablets()) {
long tabletRowCount = 0L;
long tabletRowCount = Long.MAX_VALUE;
boolean tabletReported = false;
for (Replica replica : tablet.getReplicas()) {
LOG.debug("Table {} replica {} current version {}, report version {}",
olapTable.getName(), replica.getId(),
replica.getVersion(), replica.getLastReportVersion());
// Replica with less row count is more accurate than the others
// when replicas' version are identical. Because less row count
// means this replica does more compaction than the others.
if (replica.checkVersionCatchUp(version, false)
&& replica.getRowCount() >= tabletRowCount) {
&& replica.getRowCount() < tabletRowCount) {
// 1. If replica version and reported replica version are all equal to
// PARTITION_INIT_VERSION, set tabletReported to true, which indicates this
// tablet is empty for sure when previous report.
Expand All @@ -139,6 +142,11 @@ protected void runAfterCatalogReady() {
tabletRowCount = replica.getRowCount();
}
}

// When all BEs are down, avoid set Long.MAX_VALUE to index and table row count. Use 0.
if (tabletRowCount == Long.MAX_VALUE) {
tabletRowCount = 0L;
}
indexRowCount += tabletRowCount;
// Only when all tablets of this index are reported, we set indexReported to true.
indexReported = indexReported && tabletReported;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1560,10 +1560,35 @@ public PlanFragment visitPhysicalNestedLoopJoin(
public PlanFragment visitPhysicalLimit(PhysicalLimit<? extends Plan> physicalLimit, PlanTranslatorContext context) {
PlanFragment inputFragment = physicalLimit.child(0).accept(this, context);
PlanNode child = inputFragment.getPlanRoot();
child.setLimit(MergeLimits.mergeLimit(physicalLimit.getLimit(), physicalLimit.getOffset(), child.getLimit()));
// TODO: plan node don't support limit
// child.setOffset(MergeLimits.mergeOffset(physicalLimit.getOffset(), child.getOffset()));
updateLegacyPlanIdToPhysicalPlan(child, physicalLimit);

if (physicalLimit.getPhase().isLocal()) {
child.setLimit(MergeLimits.mergeLimit(physicalLimit.getLimit(), physicalLimit.getOffset(),
child.getLimit()));
} else if (physicalLimit.getPhase().isGlobal()) {
if (!(child instanceof ExchangeNode)) {
ExchangeNode exchangeNode = new ExchangeNode(context.nextPlanNodeId(), child);
exchangeNode.setLimit(physicalLimit.getLimit());
exchangeNode.setOffset(physicalLimit.getOffset());
exchangeNode.setPartitionType(TPartitionType.UNPARTITIONED);
exchangeNode.setNumInstances(1);
PlanFragment fragment = new PlanFragment(context.nextFragmentId(), exchangeNode,
DataPartition.UNPARTITIONED);
inputFragment.setDestination(exchangeNode);
inputFragment.setOutputPartition(DataPartition.UNPARTITIONED);
DataStreamSink sink = new DataStreamSink(exchangeNode.getId());
sink.setOutputPartition(DataPartition.UNPARTITIONED);
inputFragment.setSink(sink);
context.addPlanFragment(fragment);
inputFragment = fragment;
} else {
ExchangeNode exchangeNode = (ExchangeNode) child;
exchangeNode.setLimit(MergeLimits.mergeLimit(physicalLimit.getLimit(), physicalLimit.getOffset(),
exchangeNode.getLimit()));
exchangeNode.setOffset(MergeLimits.mergeOffset(physicalLimit.getOffset(), exchangeNode.getOffset()));
}
}

updateLegacyPlanIdToPhysicalPlan(inputFragment.getPlanRoot(), physicalLimit);
return inputFragment;
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ public List<PlanPostProcessor> getProcessors() {
builder.add(new ColumnPruningPostProcessor());
builder.add(new MergeProjectPostProcessor());
builder.add(new RecomputeLogicalPropertiesProcessor());
builder.add(new AddOffsetIntoDistribute());
builder.add(new TopNScanOpt());
// after generate rf, DO NOT replace PLAN NODE
builder.add(new FragmentProcessor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.IsNull;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Or;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.functions.ExpressionTrait;
Expand Down Expand Up @@ -108,47 +109,72 @@ public List<Rule> buildRules() {
logicalAggregate(
logicalFilter(
logicalOlapScan().when(this::isDupOrMowKeyTable).when(this::isInvertedIndexEnabledOnTable)
).when(filter -> !filter.getConjuncts().isEmpty()))
.when(agg -> enablePushDownCountOnIndex())
.when(agg -> agg.getGroupByExpressions().isEmpty())
.when(agg -> {
Set<AggregateFunction> funcs = agg.getAggregateFunctions();
return !funcs.isEmpty() && funcs.stream()
.allMatch(f -> f instanceof Count && !f.isDistinct() && (((Count) f).isStar()
|| f.children().isEmpty()
|| (f.children().size() == 1 && f.child(0) instanceof Literal)
|| f.child(0) instanceof Slot));
})
.thenApply(ctx -> {
LogicalAggregate<LogicalFilter<LogicalOlapScan>> agg = ctx.root;
LogicalFilter<LogicalOlapScan> filter = agg.child();
LogicalOlapScan olapScan = filter.child();
return pushdownCountOnIndex(agg, null, filter, olapScan, ctx.cascadesContext);
})
)
)
.when(agg -> enablePushDownCountOnIndex())
.when(agg -> agg.getGroupByExpressions().isEmpty())
.when(agg -> {
Set<AggregateFunction> funcs = agg.getAggregateFunctions();
if (funcs.isEmpty() || !funcs.stream()
.allMatch(f -> f instanceof Count && !f.isDistinct() && (((Count) f).isStar()
|| f.children().isEmpty()
|| (f.children().size() == 1 && f.child(0) instanceof Literal)
|| f.child(0) instanceof Slot))) {
return false;
}
Set<Expression> conjuncts = agg.child().getConjuncts();
if (conjuncts.isEmpty()) {
return false;
}

Set<Slot> aggSlots = funcs.stream()
.flatMap(f -> f.getInputSlots().stream())
.collect(Collectors.toSet());
return conjuncts.stream().allMatch(expr -> checkSlotInOrExpression(expr, aggSlots));
})
.thenApply(ctx -> {
LogicalAggregate<LogicalFilter<LogicalOlapScan>> agg = ctx.root;
LogicalFilter<LogicalOlapScan> filter = agg.child();
LogicalOlapScan olapScan = filter.child();
return pushdownCountOnIndex(agg, null, filter, olapScan, ctx.cascadesContext);
})
),
RuleType.COUNT_ON_INDEX.build(
logicalAggregate(
logicalProject(
logicalFilter(
logicalOlapScan().when(this::isDupOrMowKeyTable).when(this::isInvertedIndexEnabledOnTable)
).when(filter -> !filter.getConjuncts().isEmpty())))
.when(agg -> enablePushDownCountOnIndex())
.when(agg -> agg.getGroupByExpressions().isEmpty())
.when(agg -> {
Set<AggregateFunction> funcs = agg.getAggregateFunctions();
return !funcs.isEmpty() && funcs.stream()
.allMatch(f -> f instanceof Count && !f.isDistinct() && (((Count) f).isStar()
|| f.children().isEmpty()
|| (f.children().size() == 1 && f.child(0) instanceof Literal)
|| f.child(0) instanceof Slot));
})
.thenApply(ctx -> {
LogicalAggregate<LogicalProject<LogicalFilter<LogicalOlapScan>>> agg = ctx.root;
LogicalProject<LogicalFilter<LogicalOlapScan>> project = agg.child();
LogicalFilter<LogicalOlapScan> filter = project.child();
LogicalOlapScan olapScan = filter.child();
return pushdownCountOnIndex(agg, project, filter, olapScan, ctx.cascadesContext);
})
)
)
)
.when(agg -> enablePushDownCountOnIndex())
.when(agg -> agg.getGroupByExpressions().isEmpty())
.when(agg -> {
Set<AggregateFunction> funcs = agg.getAggregateFunctions();
if (funcs.isEmpty() || !funcs.stream()
.allMatch(f -> f instanceof Count && !f.isDistinct() && (((Count) f).isStar()
|| f.children().isEmpty()
|| (f.children().size() == 1 && f.child(0) instanceof Literal)
|| f.child(0) instanceof Slot))) {
return false;
}
Set<Expression> conjuncts = agg.child().child().getConjuncts();
if (conjuncts.isEmpty()) {
return false;
}

Set<Slot> aggSlots = funcs.stream()
.flatMap(f -> f.getInputSlots().stream())
.collect(Collectors.toSet());
return conjuncts.stream().allMatch(expr -> checkSlotInOrExpression(expr, aggSlots));
})
.thenApply(ctx -> {
LogicalAggregate<LogicalProject<LogicalFilter<LogicalOlapScan>>> agg = ctx.root;
LogicalProject<LogicalFilter<LogicalOlapScan>> project = agg.child();
LogicalFilter<LogicalOlapScan> filter = project.child();
LogicalOlapScan olapScan = filter.child();
return pushdownCountOnIndex(agg, project, filter, olapScan, ctx.cascadesContext);
})
),
RuleType.STORAGE_LAYER_AGGREGATE_MINMAX_ON_UNIQUE_WITHOUT_PROJECT.build(
logicalAggregate(
Expand Down Expand Up @@ -331,6 +357,22 @@ private boolean enablePushDownCountOnIndex() {
return connectContext != null && connectContext.getSessionVariable().isEnablePushDownCountOnIndex();
}

private boolean checkSlotInOrExpression(Expression expr, Set<Slot> aggSlots) {
if (expr instanceof Or) {
Set<Slot> slots = expr.getInputSlots();
if (!slots.stream().allMatch(aggSlots::contains)) {
return false;
}
} else {
for (Expression child : expr.children()) {
if (!checkSlotInOrExpression(child, aggSlots)) {
return false;
}
}
}
return true;
}

private boolean isDupOrMowKeyTable(LogicalOlapScan logicalScan) {
if (logicalScan != null) {
KeysType keysType = logicalScan.getTable().getKeysType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.doris.statistics.StatsRecursiveDerive;
import org.apache.doris.thrift.TExchangeNode;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TPartitionType;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;

Expand Down Expand Up @@ -64,6 +65,7 @@ public class ExchangeNode extends PlanNode {
private SortInfo mergeInfo;

private boolean isRightChildOfBroadcastHashJoin = false;
private TPartitionType partitionType;

/**
* use for Nereids only.
Expand All @@ -77,6 +79,10 @@ public ExchangeNode(PlanNodeId id, PlanNode inputNode) {
computeTupleIds();
}

public void setPartitionType(TPartitionType partitionType) {
this.partitionType = partitionType;
}

/**
* Create ExchangeNode that consumes output of inputNode.
* An ExchangeNode doesn't have an input node as a child, which is why we
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public PartitionPrunerV2Base(Map<Long, PartitionItem> idToPartitionItem,
public Collection<Long> prune() throws AnalysisException {
Map<Column, FinalFilters> columnToFilters = Maps.newHashMap();
for (Column column : partitionColumns) {
ColumnRange columnRange = columnNameToRange.get(column.getName());
ColumnRange columnRange = columnNameToRange.get(column.getName().toLowerCase());
if (columnRange == null) {
columnToFilters.put(column, FinalFilters.noFilters());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,8 @@ protected Pair<List<Long>, Long> calcActualSampleTablets(boolean forPartitionCol
int seekTid = (int) ((i + seek) % ids.size());
long tabletId = ids.get(seekTid);
sampleTabletIds.add(tabletId);
actualSampledRowCount += materializedIndex.getTablet(tabletId).getRowCount(true);
actualSampledRowCount += materializedIndex.getTablet(tabletId)
.getMinReplicaRowCount(p.getVisibleVersion());
if (actualSampledRowCount >= sampleRows && !forPartitionColumn) {
enough = true;
break;
Expand Down
Loading

0 comments on commit 9dd88f1

Please sign in to comment.