From 33a885c59948e7a1fc0c616302edee7fadf5d0ab Mon Sep 17 00:00:00 2001 From: PeterWeiWang <715533650@qq.com> Date: Wed, 2 Mar 2022 09:43:25 +0800 Subject: [PATCH] support for parallel queries - fix mtr test. --- sql/handler.cc | 2 +- sql/join_optimizer/explain_access_path.cc | 5 +---- sql/opt_explain.cc | 6 +++++- sql/pq_clone_item.cc | 20 ++++++++++++++++-- sql/pq_condition.cc | 9 +++++++- sql/sp_head.cc | 12 +++++------ sql/sql_class.cc | 5 +++-- sql/sql_class.h | 5 ++++- sql/sql_parallel.cc | 25 +++++++++++++++-------- sql/sql_select.cc | 4 ++-- sql/sql_tmp_table.cc | 14 ++++++++----- sql/sql_tmp_table.h | 3 ++- 12 files changed, 76 insertions(+), 34 deletions(-) diff --git a/sql/handler.cc b/sql/handler.cc index 6d4a8873e8a8..f7f215c86a1c 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -6519,7 +6519,7 @@ int DsMrr_impl::dsmrr_init(RANGE_SEQ_IF *seq_funcs, void *seq_init_param, return retval; } - if (!thd->in_sp_trigger && thd->parallel_exec && + if (thd->in_sp_trigger == 0 && thd->parallel_exec && table->file->pq_range_type != PQ_QUICK_SELECT_NONE) { use_default_impl = true; retval = h->handler::multi_range_read_init(seq_funcs, seq_init_param, diff --git a/sql/join_optimizer/explain_access_path.cc b/sql/join_optimizer/explain_access_path.cc index 2ebf6e073551..468360c5b0aa 100644 --- a/sql/join_optimizer/explain_access_path.cc +++ b/sql/join_optimizer/explain_access_path.cc @@ -807,10 +807,7 @@ ExplainData ExplainAccessPath(const AccessPath *path, JOIN *join) { path->parallel_scan().table->alias + path->parallel_scan().table->file->explain_extra()); Gather_operator *gather = path->parallel_scan().gather; - AccessPath *root_path = gather->m_template_join->m_root_access_path; - root_path = root_path ? root_path - : (gather->m_workers[0] - ->thd_worker->lex->unit->m_root_access_path); + AccessPath *root_path = gather->m_workers[0]->thd_worker->lex->unit->m_root_access_path; children.push_back({root_path, "", gather->m_template_join}); break; } diff --git a/sql/opt_explain.cc b/sql/opt_explain.cc index e755db33eb56..7eb6d78676fa 100644 --- a/sql/opt_explain.cc +++ b/sql/opt_explain.cc @@ -2134,9 +2134,13 @@ static bool ExplainIterator(THD *ethd, const THD *query_thd, default: break; } - explain += PrintQueryPlan(base_level, unit->root_access_path(), + if (ethd->parallel_exec && ethd->lex->is_explain_analyze) { + explain += ethd->pq_explain; + } else { + explain += PrintQueryPlan(base_level, unit->root_access_path(), unit->is_union() ? nullptr : join, /*is_root_of_join=*/!unit->is_union()); + } } else { explain += PrintQueryPlan(0, /*path=*/nullptr, /*join=*/nullptr, /*is_root_of_join=*/false); diff --git a/sql/pq_clone_item.cc b/sql/pq_clone_item.cc index 3dad94c35a62..0aef4f5a043a 100644 --- a/sql/pq_clone_item.cc +++ b/sql/pq_clone_item.cc @@ -420,6 +420,14 @@ Item *Item_view_ref::pq_clone(class THD *thd, class Query_block *select) { } item_ref = &found_table->field_translation[field_index].item; + + const char *db_name; + if (found_table->is_view()) { + db_name = found_table->db; + } else { + db_name = nullptr; + } + item = new (thd->pq_mem_root) Item_view_ref(&select->context, item_ref, db_name, table_name, orig_table_name(), field_name, found_table); @@ -1596,8 +1604,16 @@ PQ_CLONE_RETURN COPY_FUNC_ITEM(Item_typecast_real, ARG0) PQ_CLONE_DEF(Item_func_get_system_var) { - new_item = new (thd->pq_mem_root) Item_func_get_system_var( - var, var_type, &component, item_name.ptr(), item_name.length()); + sys_var *var_arg = var; + + if (var_arg == nullptr) { + var_arg = var_tracker.bind_system_variable(thd); + } + + if (var_arg != nullptr) { + new_item = new (thd->pq_mem_root) Item_func_get_system_var( + var_arg, var_type, &component, item_name.ptr(), item_name.length()); + } } PQ_CLONE_RETURN diff --git a/sql/pq_condition.cc b/sql/pq_condition.cc index 39a680dfca14..a814d4426def 100644 --- a/sql/pq_condition.cc +++ b/sql/pq_condition.cc @@ -695,7 +695,7 @@ void set_pq_condition_status(THD *thd) { } bool suite_for_parallel_query(THD *thd) { - if (thd->in_sp_trigger || // store procedure or trigger + if (thd->in_sp_trigger != 0 || // store procedure or trigger thd->m_attachable_trx || // attachable transaction thd->tx_isolation == ISO_SERIALIZABLE) { // serializable without snapshot read @@ -760,6 +760,13 @@ bool suite_for_parallel_query(Query_block *select) { } } + for (TABLE_LIST *tbl_list = select->table_list.first; tbl_list != nullptr; + tbl_list = tbl_list->next_global) { + if (!suite_for_parallel_query(tbl_list)) { + return false; + } + } + for (TABLE_LIST *tbl_list = select->leaf_tables; tbl_list != nullptr; tbl_list = tbl_list->next_leaf) { if (!suite_for_parallel_query(tbl_list)) { diff --git a/sql/sp_head.cc b/sql/sp_head.cc index 29ed8ec52a7b..f1204ad7e30d 100644 --- a/sql/sp_head.cc +++ b/sql/sp_head.cc @@ -2490,9 +2490,9 @@ bool sp_head::execute_trigger(THD *thd, const LEX_CSTRING &db_name, locker = MYSQL_START_SP(&psi_state, m_sp_share); #endif // disable paralle query for trigger - thd->in_sp_trigger = true; + thd->in_sp_trigger += 1; err_status = execute(thd, false); - thd->in_sp_trigger = false; + thd->in_sp_trigger -= 1; #ifdef HAVE_PSI_SP_INTERFACE MYSQL_END_SP(locker); #endif @@ -2671,9 +2671,9 @@ bool sp_head::execute_function(THD *thd, Item **argp, uint argcount, locker = MYSQL_START_SP(&psi_state, m_sp_share); #endif // disable paralle query for store function - thd->in_sp_trigger = true; + thd->in_sp_trigger += 1; err_status = execute(thd, true); - thd->in_sp_trigger = false; + thd->in_sp_trigger -= 1; #ifdef HAVE_PSI_SP_INTERFACE MYSQL_END_SP(locker); #endif @@ -2876,9 +2876,9 @@ bool sp_head::execute_procedure(THD *thd, mem_root_deque *args) { locker = MYSQL_START_SP(&psi_state, m_sp_share); #endif // disable parallel query for store procedure - thd->in_sp_trigger = true; + thd->in_sp_trigger += 1; if (!err_status) err_status = execute(thd, true); - thd->in_sp_trigger = false; + thd->in_sp_trigger -= 1; #ifdef HAVE_PSI_SP_INTERFACE MYSQL_END_SP(locker); #endif diff --git a/sql/sql_class.cc b/sql/sql_class.cc index b152ca544935..d4092d955afe 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -412,7 +412,7 @@ THD::THD(bool enable_plugins) pq_threads_running(0), pq_dop(0), no_pq(false), - in_sp_trigger(false), + in_sp_trigger(0), locking_clause(0), pq_error(false), pq_check_fields(0), @@ -1559,7 +1559,7 @@ void THD::cleanup_after_query() { // Set the default "cute" mode for the execution environment: check_for_truncated_fields = CHECK_FIELD_IGNORE; - if (!in_sp_trigger) { + if (in_sp_trigger == 0) { // cleanup for parallel query if (pq_threads_running > 0) { release_pq_running_threads(pq_threads_running); @@ -1572,6 +1572,7 @@ void THD::cleanup_after_query() { locking_clause = 0; pq_error = false; pq_workers.clear(); + pq_explain.clear(); if (killed == THD::KILL_PQ_QUERY) killed.store(THD::NOT_KILLED); // restore killed for next query diff --git a/sql/sql_class.h b/sql/sql_class.h index 5639c51cc703..3815220d1867 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -951,7 +951,7 @@ class THD : public MDL_context_owner, /* disable parallel execute */ bool no_pq; /* disable parallel query for store procedure and trigger */ - bool in_sp_trigger; + uint in_sp_trigger; /* select .. fro share/update */ bool locking_clause; /* indicates whether parallel query is supported */ @@ -972,6 +972,9 @@ class THD : public MDL_context_owner, /* protects THD::pq_workers. */ mysql_mutex_t pq_lock_worker; + /* for explain analyze. */ + std::string pq_explain; + /* Used to execute base64 coded binlog events in MySQL server */ Relay_log_info *rli_fake; /* Slave applier execution context */ diff --git a/sql/sql_parallel.cc b/sql/sql_parallel.cc index dbd991aa12d7..8b03c28c87e8 100644 --- a/sql/sql_parallel.cc +++ b/sql/sql_parallel.cc @@ -47,6 +47,7 @@ #include "sql/sql_tmp_table.h" #include "sql/timing_iterator.h" #include "sql/transaction.h" +#include "sql/join_optimizer/explain_access_path.h" ulonglong parallel_memory_limit = 0; ulong parallel_max_threads = 0; @@ -785,6 +786,14 @@ void *pq_worker_exec(void *arg) { mngr->signal_status(thd, PQ_worker_state::READY); join->query_expression()->ExecuteIteratorQuery(thd); + if (thd->lex->is_explain_analyze && mngr->m_id == 0) { + Query_expression *unit = leader_thd->lex->unit; + leader_thd->pq_explain += PrintQueryPlan( + 0, unit->root_access_path(), + unit->is_union() ? nullptr : unit->first_query_block()->join, + !unit->is_union()); + } + if (join->thd->is_error() || join->thd->pq_error || DBUG_EVALUATE_IF("pq_worker_error3", true, false)) { goto err; @@ -1136,18 +1145,18 @@ bool set_key_order(QEP_TAB *tab, std::vector &key_fields, return false; } - std::map fields_map; // map[field] = item - std::map::iterator iter; - std::vector order_items; + std::map fields_map; // map[field] = item + std::map::iterator iter; + std::vector order_items; - Ref_item_array ref_items = *ref_ptrs; + Ref_item_array &ref_items = *ref_ptrs; /** (1) build the map: {name} -> {item} */ for (uint i = 0; i < join->query_block_fields->size(); i++) { Item *item = ref_items[i]; if (item && item->type() == Item::FIELD_ITEM) { std::string field_name = static_cast(item)->field->field_name; - fields_map[field_name] = item; + fields_map[field_name] = &ref_items[i]; } } @@ -1163,15 +1172,15 @@ bool set_key_order(QEP_TAB *tab, std::vector &key_fields, THD *thd = join->thd; SQL_I_List order_list; - for (Item *item : order_items) { + for (Item **item : order_items) { ORDER *order = new (thd->pq_mem_root) ORDER(); if (!order) { *order_ptr = NULL; return true; } - order->item_initial = item; - order->item = &item; + order->item_initial = *item; + order->item = item; order->in_field_list = 1; order->is_explicit = 0; add_to_list(order_list, order); diff --git a/sql/sql_select.cc b/sql/sql_select.cc index 87afc8cd4821..c4110b07cc96 100644 --- a/sql/sql_select.cc +++ b/sql/sql_select.cc @@ -3741,7 +3741,7 @@ bool JOIN::make_pq_tables_info() { in tmplist, which we call it sum_field that use for recieving PQ workers's sum data. */ table = create_tmp_table(thd, tmp_param, *curr_fields, nullptr, false, true, - query_block->active_options(), HA_POS_ERROR, "", true); + query_block->active_options(), HA_POS_ERROR, "", true, true); query_result->m_table = table; // the leader/worker's table is not same @@ -4409,7 +4409,7 @@ bool JOIN::make_leader_tables_info() { */ TABLE *table = create_tmp_table(thd, tmp_param, tmplist, nullptr, false, true, - query_block->active_options(), HA_POS_ERROR, "", true); + query_block->active_options(), HA_POS_ERROR, "", true, true); if (table == nullptr) { DBUG_RETURN(true); } table->materialized= false; tmp_tables = 1; diff --git a/sql/sql_tmp_table.cc b/sql/sql_tmp_table.cc index 761be4ef2352..a166910b8e9c 100644 --- a/sql/sql_tmp_table.cc +++ b/sql/sql_tmp_table.cc @@ -904,7 +904,8 @@ TABLE *create_tmp_table(THD *thd, Temp_table_param *param, const mem_root_deque &fields, ORDER *group, bool distinct, bool save_sum_fields, ulonglong select_options, ha_rows rows_limit, - const char *table_alias, bool force_disk_table) { + const char *table_alias, bool force_disk_table, + bool parallel_query) { DBUG_TRACE; if (!param->allow_group_via_temp_table) group = nullptr; // Can't use group key @@ -1065,10 +1066,13 @@ TABLE *create_tmp_table(THD *thd, Temp_table_param *param, if (param->m_window == nullptr || !param->m_window->is_last()) store_column = false; } - if (item->const_item() && (int)hidden_field_count <= 0) { - // mark this item and then we can identify it without sending a message to MQ. - item->skip_create_tmp_table = true; - continue; // We don't have to store this + + if (item->const_item()) { + if (parallel_query || (int)hidden_field_count <= 0) { + // mark this item and then we can identify it without sending a message to MQ. + item->skip_create_tmp_table = true; + continue; // We don't have to store this + } } } diff --git a/sql/sql_tmp_table.h b/sql/sql_tmp_table.h index 82138650286f..16338aa00b69 100644 --- a/sql/sql_tmp_table.h +++ b/sql/sql_tmp_table.h @@ -56,7 +56,8 @@ TABLE *create_tmp_table(THD *thd, Temp_table_param *param, const mem_root_deque &fields, ORDER *group, bool distinct, bool save_sum_fields, ulonglong select_options, ha_rows rows_limit, - const char *table_alias, bool force_disk_table = false); + const char *table_alias, bool force_disk_table = false, + bool parallel_query = false); bool open_tmp_table(TABLE *table); TABLE *create_tmp_table_from_fields(THD *thd, List &field_list, bool is_virtual = true,