Skip to content

Commit

Permalink
[fix](scanner) Delete meaningless finish dependency in schema scanner (
Browse files Browse the repository at this point in the history
…#44915)

Finish dependency is used to block `close` phase in order to release
some necessary resources safely. In schema scanner, an async thread is
used to do scanning and finish dependency is used to wait it done. But
in fact, we could close schema scanner directly now so this PR delete
this finish dependency.
  • Loading branch information
Gabriel39 authored Dec 4, 2024
1 parent 6f59658 commit b8dcac0
Show file tree
Hide file tree
Showing 4 changed files with 2 additions and 19 deletions.
4 changes: 0 additions & 4 deletions be/src/exec/schema_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ Status SchemaScanner::get_next_block_async(RuntimeState* state) {
}
SCOPED_ATTACH_TASK(state);
_async_thread_running = true;
_finish_dependency->block();
if (!_opened) {
_data_block = vectorized::Block::create_unique();
_init_block(_data_block.get());
Expand All @@ -140,9 +139,6 @@ Status SchemaScanner::get_next_block_async(RuntimeState* state) {
_eos = eos;
_async_thread_running = false;
_dependency->set_ready();
if (eos) {
_finish_dependency->set_ready();
}
}));
return Status::OK();
}
Expand Down
7 changes: 1 addition & 6 deletions be/src/exec/schema_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,7 @@ class SchemaScanner {
// factory function
static std::unique_ptr<SchemaScanner> create(TSchemaTableType::type type);
TSchemaTableType::type type() const { return _schema_table_type; }
void set_dependency(std::shared_ptr<pipeline::Dependency> dep,
std::shared_ptr<pipeline::Dependency> fin_dep) {
_dependency = dep;
_finish_dependency = fin_dep;
}
void set_dependency(std::shared_ptr<pipeline::Dependency> dep) { _dependency = dep; }
Status get_next_block_async(RuntimeState* state);

protected:
Expand Down Expand Up @@ -139,7 +135,6 @@ class SchemaScanner {
RuntimeProfile::Counter* _fill_block_timer = nullptr;

std::shared_ptr<pipeline::Dependency> _dependency = nullptr;
std::shared_ptr<pipeline::Dependency> _finish_dependency = nullptr;

std::unique_ptr<vectorized::Block> _data_block;
AtomicStatus _scanner_status;
Expand Down
5 changes: 1 addition & 4 deletions be/src/pipeline/exec/schema_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ Status SchemaScanLocalState::init(RuntimeState* state, LocalStateInfo& info) {
// new one scanner
_schema_scanner = SchemaScanner::create(schema_table->schema_table_type());

_schema_scanner->set_dependency(_data_dependency, _finish_dependency);
_schema_scanner->set_dependency(_data_dependency);
if (nullptr == _schema_scanner) {
return Status::InternalError("schema scanner get nullptr pointer.");
}
Expand Down Expand Up @@ -266,9 +266,6 @@ Status SchemaScanOperatorX::get_block(RuntimeState* state, vectorized::Block* bl
} while (block->rows() == 0 && !*eos);

local_state.reached_limit(block, eos);
if (*eos) {
local_state._finish_dependency->set_always_ready();
}
return Status::OK();
}

Expand Down
5 changes: 0 additions & 5 deletions be/src/pipeline/exec/schema_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ class SchemaScanLocalState final : public PipelineXLocalState<> {

SchemaScanLocalState(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalState<>(state, parent) {
_finish_dependency =
std::make_shared<Dependency>(parent->operator_id(), parent->node_id(),
parent->get_name() + "_FINISH_DEPENDENCY", true);
_data_dependency = std::make_shared<Dependency>(parent->operator_id(), parent->node_id(),
parent->get_name() + "_DEPENDENCY", true);
}
Expand All @@ -48,7 +45,6 @@ class SchemaScanLocalState final : public PipelineXLocalState<> {

Status open(RuntimeState* state) override;

Dependency* finishdependency() override { return _finish_dependency.get(); }
std::vector<Dependency*> dependencies() const override { return {_data_dependency.get()}; }

private:
Expand All @@ -57,7 +53,6 @@ class SchemaScanLocalState final : public PipelineXLocalState<> {
SchemaScannerParam _scanner_param;
std::unique_ptr<SchemaScanner> _schema_scanner;

std::shared_ptr<Dependency> _finish_dependency;
std::shared_ptr<Dependency> _data_dependency;
};

Expand Down

0 comments on commit b8dcac0

Please sign in to comment.