Skip to content

Commit

Permalink
[fix](planner) query should be cancelled if limit reached (#44338) (#…
Browse files Browse the repository at this point in the history
…45222)

cherry-pick #44338
  • Loading branch information
morningman authored Dec 10, 2024
1 parent 5d3f0a2 commit e29d125
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 44 deletions.
13 changes: 13 additions & 0 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
size_t raw_bytes_threshold = config::doris_scanner_row_bytes;
size_t raw_bytes_read = 0;
bool first_read = true;
int64_t limit = scanner->limit();
while (!eos && raw_bytes_read < raw_bytes_threshold) {
if (UNLIKELY(ctx->done())) {
eos = true;
Expand Down Expand Up @@ -319,6 +320,18 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
ctx->inc_block_usage(free_block->allocated_bytes());
scan_task->cached_blocks.push_back(std::move(free_block));
}

if (limit > 0 && limit < ctx->batch_size()) {
// If this scanner has limit, and less than batch size,
// return immediately and no need to wait raw_bytes_threshold.
// This can save time that each scanner may only return a small number of rows,
// but rows are enough from all scanners.
// If not break, the query like "select * from tbl where id=1 limit 10"
// may scan a lot data when the "id=1"'s filter ratio is high.
// If limit is larger than batch size, this rule is skipped,
// to avoid user specify a large limit and causing too much small blocks.
break;
}
} // end for while

if (UNLIKELY(!status.ok())) {
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/scan/vscanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ class VScanner {
_query_statistics = query_statistics;
}

int64_t limit() const { return _limit; }

protected:
void _discard_conjuncts() {
for (auto& conjunct : _conjuncts) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,11 +621,6 @@ public String getExplainString(ExplainOptions explainOptions) {
return plan;
}

@Override
public boolean isBlockQuery() {
return true;
}

@Override
public DescriptorTable getDescTable() {
return descTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,6 @@ public OriginalPlanner(Analyzer analyzer) {
this.analyzer = analyzer;
}

public boolean isBlockQuery() {
return isBlockQuery;
}

public PlannerContext getPlannerContext() {
return plannerContext;
}
Expand Down Expand Up @@ -274,17 +270,6 @@ public void createPlanFragments(StatementBase statement, Analyzer analyzer, TQue

if (queryStmt instanceof SelectStmt) {
SelectStmt selectStmt = (SelectStmt) queryStmt;
if (queryStmt.getSortInfo() != null || selectStmt.getAggInfo() != null) {
isBlockQuery = true;
if (LOG.isDebugEnabled()) {
LOG.debug("this is block query");
}
} else {
isBlockQuery = false;
if (LOG.isDebugEnabled()) {
LOG.debug("this isn't block query");
}
}
// Check SelectStatement if optimization condition satisfied
if (selectStmt.isPointQueryShortCircuit()) {
// Optimize for point query like: SELECT * FROM t1 WHERE pk1 = 1 and pk2 = 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ public abstract class Planner {

protected ArrayList<PlanFragment> fragments = Lists.newArrayList();

protected boolean isBlockQuery = false;

protected TQueryOptions queryOptions;

public abstract List<ScanNode> getScanNodes();
Expand Down Expand Up @@ -115,10 +113,6 @@ public List<PlanFragment> getFragments() {
return fragments;
}

public boolean isBlockQuery() {
return isBlockQuery;
}

public TQueryOptions getQueryOptions() {
return queryOptions;
}
Expand Down
36 changes: 18 additions & 18 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,6 @@ public class Coordinator implements CoordInterface {
// same as backend_exec_states_.size() after Exec()
private final Set<TUniqueId> instanceIds = Sets.newHashSet();

private final boolean isBlockQuery;

private int numReceivedRows = 0;

private List<String> deltaUrls;
Expand Down Expand Up @@ -336,7 +334,6 @@ public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner,
// Used for query/insert/test
public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) {
this.context = context;
this.isBlockQuery = planner.isBlockQuery();
this.queryId = context.queryId();
this.fragments = planner.getFragments();
this.scanNodes = planner.getScanNodes();
Expand Down Expand Up @@ -379,7 +376,6 @@ public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) {
// Constructor of Coordinator is too complicated.
public Coordinator(Long jobId, TUniqueId queryId, DescriptorTable descTable, List<PlanFragment> fragments,
List<ScanNode> scanNodes, String timezone, boolean loadZeroTolerance, boolean enableProfile) {
this.isBlockQuery = true;
this.jobId = jobId;
this.queryId = queryId;
this.descTable = descTable.toThrift();
Expand Down Expand Up @@ -1448,24 +1444,28 @@ public RowBatch getNext() throws Exception {
}
}

if (resultBatch.isEos()) {
this.returnedAllResults = true;

// if this query is a block query do not cancel.
Long numLimitRows = fragments.get(0).getPlanRoot().getLimit();
boolean hasLimit = numLimitRows > 0;
if (!isBlockQuery && instanceIds.size() > 1 && hasLimit && numReceivedRows >= numLimitRows) {
if (LOG.isDebugEnabled()) {
LOG.debug("no block query, return num >= limit rows, need cancel");
}
cancelInternal(Types.PPlanFragmentCancelReason.LIMIT_REACH, "query reach limit");
if (resultBatch.getBatch() != null) {
numReceivedRows += resultBatch.getBatch().getRowsSize();
if (LOG.isDebugEnabled()) {
LOG.debug("number received rows: {}, {}", numReceivedRows, DebugUtil.printId(queryId));
}
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().dryRunQuery) {
}

if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().dryRunQuery) {
if (resultBatch.isEos()) {
numReceivedRows = 0;
numReceivedRows += resultBatch.getQueryStatistics().getReturnedRows();
}
} else if (resultBatch.getBatch() != null) {
numReceivedRows += resultBatch.getBatch().getRowsSize();
}

Long limitRows = fragments.get(0).getPlanRoot().getLimit();
if (limitRows > 0 && numReceivedRows >= limitRows) {
if (LOG.isDebugEnabled()) {
LOG.debug("reach limit rows: {}, received rows: {}, cancel query, {}",
limitRows, numReceivedRows, DebugUtil.printId(queryId));
}
cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR, "reach limit");
resultBatch.setEos(true);
}

return resultBatch;
Expand Down

0 comments on commit e29d125

Please sign in to comment.