Skip to content

Commit

Permalink
support for parallel queries
Browse files Browse the repository at this point in the history
- fix mtr test.
  • Loading branch information
PeterWeiWang committed Mar 2, 2022
1 parent de75c58 commit 33a885c
Show file tree
Hide file tree
Showing 12 changed files with 76 additions and 34 deletions.
2 changes: 1 addition & 1 deletion sql/handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6519,7 +6519,7 @@ int DsMrr_impl::dsmrr_init(RANGE_SEQ_IF *seq_funcs, void *seq_init_param,
return retval;
}

if (!thd->in_sp_trigger && thd->parallel_exec &&
if (thd->in_sp_trigger == 0 && thd->parallel_exec &&
table->file->pq_range_type != PQ_QUICK_SELECT_NONE) {
use_default_impl = true;
retval = h->handler::multi_range_read_init(seq_funcs, seq_init_param,
Expand Down
5 changes: 1 addition & 4 deletions sql/join_optimizer/explain_access_path.cc
Original file line number Diff line number Diff line change
Expand Up @@ -807,10 +807,7 @@ ExplainData ExplainAccessPath(const AccessPath *path, JOIN *join) {
path->parallel_scan().table->alias +
path->parallel_scan().table->file->explain_extra());
Gather_operator *gather = path->parallel_scan().gather;
AccessPath *root_path = gather->m_template_join->m_root_access_path;
root_path = root_path ? root_path
: (gather->m_workers[0]
->thd_worker->lex->unit->m_root_access_path);
AccessPath *root_path = gather->m_workers[0]->thd_worker->lex->unit->m_root_access_path;
children.push_back({root_path, "", gather->m_template_join});
break;
}
Expand Down
6 changes: 5 additions & 1 deletion sql/opt_explain.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2134,9 +2134,13 @@ static bool ExplainIterator(THD *ethd, const THD *query_thd,
default:
break;
}
explain += PrintQueryPlan(base_level, unit->root_access_path(),
if (ethd->parallel_exec && ethd->lex->is_explain_analyze) {
explain += ethd->pq_explain;
} else {
explain += PrintQueryPlan(base_level, unit->root_access_path(),
unit->is_union() ? nullptr : join,
/*is_root_of_join=*/!unit->is_union());
}
} else {
explain += PrintQueryPlan(0, /*path=*/nullptr, /*join=*/nullptr,
/*is_root_of_join=*/false);
Expand Down
20 changes: 18 additions & 2 deletions sql/pq_clone_item.cc
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,14 @@ Item *Item_view_ref::pq_clone(class THD *thd, class Query_block *select) {
}

item_ref = &found_table->field_translation[field_index].item;

const char *db_name;
if (found_table->is_view()) {
db_name = found_table->db;
} else {
db_name = nullptr;
}

item = new (thd->pq_mem_root)
Item_view_ref(&select->context, item_ref, db_name, table_name,
orig_table_name(), field_name, found_table);
Expand Down Expand Up @@ -1596,8 +1604,16 @@ PQ_CLONE_RETURN
COPY_FUNC_ITEM(Item_typecast_real, ARG0)

PQ_CLONE_DEF(Item_func_get_system_var) {
new_item = new (thd->pq_mem_root) Item_func_get_system_var(
var, var_type, &component, item_name.ptr(), item_name.length());
sys_var *var_arg = var;

if (var_arg == nullptr) {
var_arg = var_tracker.bind_system_variable(thd);
}

if (var_arg != nullptr) {
new_item = new (thd->pq_mem_root) Item_func_get_system_var(
var_arg, var_type, &component, item_name.ptr(), item_name.length());
}
}
PQ_CLONE_RETURN

Expand Down
9 changes: 8 additions & 1 deletion sql/pq_condition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ void set_pq_condition_status(THD *thd) {
}

bool suite_for_parallel_query(THD *thd) {
if (thd->in_sp_trigger || // store procedure or trigger
if (thd->in_sp_trigger != 0 || // store procedure or trigger
thd->m_attachable_trx || // attachable transaction
thd->tx_isolation ==
ISO_SERIALIZABLE) { // serializable without snapshot read
Expand Down Expand Up @@ -760,6 +760,13 @@ bool suite_for_parallel_query(Query_block *select) {
}
}

for (TABLE_LIST *tbl_list = select->table_list.first; tbl_list != nullptr;
tbl_list = tbl_list->next_global) {
if (!suite_for_parallel_query(tbl_list)) {
return false;
}
}

for (TABLE_LIST *tbl_list = select->leaf_tables; tbl_list != nullptr;
tbl_list = tbl_list->next_leaf) {
if (!suite_for_parallel_query(tbl_list)) {
Expand Down
12 changes: 6 additions & 6 deletions sql/sp_head.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2490,9 +2490,9 @@ bool sp_head::execute_trigger(THD *thd, const LEX_CSTRING &db_name,
locker = MYSQL_START_SP(&psi_state, m_sp_share);
#endif
// disable paralle query for trigger
thd->in_sp_trigger = true;
thd->in_sp_trigger += 1;
err_status = execute(thd, false);
thd->in_sp_trigger = false;
thd->in_sp_trigger -= 1;
#ifdef HAVE_PSI_SP_INTERFACE
MYSQL_END_SP(locker);
#endif
Expand Down Expand Up @@ -2671,9 +2671,9 @@ bool sp_head::execute_function(THD *thd, Item **argp, uint argcount,
locker = MYSQL_START_SP(&psi_state, m_sp_share);
#endif
// disable paralle query for store function
thd->in_sp_trigger = true;
thd->in_sp_trigger += 1;
err_status = execute(thd, true);
thd->in_sp_trigger = false;
thd->in_sp_trigger -= 1;
#ifdef HAVE_PSI_SP_INTERFACE
MYSQL_END_SP(locker);
#endif
Expand Down Expand Up @@ -2876,9 +2876,9 @@ bool sp_head::execute_procedure(THD *thd, mem_root_deque<Item *> *args) {
locker = MYSQL_START_SP(&psi_state, m_sp_share);
#endif
// disable parallel query for store procedure
thd->in_sp_trigger = true;
thd->in_sp_trigger += 1;
if (!err_status) err_status = execute(thd, true);
thd->in_sp_trigger = false;
thd->in_sp_trigger -= 1;
#ifdef HAVE_PSI_SP_INTERFACE
MYSQL_END_SP(locker);
#endif
Expand Down
5 changes: 3 additions & 2 deletions sql/sql_class.cc
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ THD::THD(bool enable_plugins)
pq_threads_running(0),
pq_dop(0),
no_pq(false),
in_sp_trigger(false),
in_sp_trigger(0),
locking_clause(0),
pq_error(false),
pq_check_fields(0),
Expand Down Expand Up @@ -1559,7 +1559,7 @@ void THD::cleanup_after_query() {
// Set the default "cute" mode for the execution environment:
check_for_truncated_fields = CHECK_FIELD_IGNORE;

if (!in_sp_trigger) {
if (in_sp_trigger == 0) {
// cleanup for parallel query
if (pq_threads_running > 0) {
release_pq_running_threads(pq_threads_running);
Expand All @@ -1572,6 +1572,7 @@ void THD::cleanup_after_query() {
locking_clause = 0;
pq_error = false;
pq_workers.clear();
pq_explain.clear();

if (killed == THD::KILL_PQ_QUERY)
killed.store(THD::NOT_KILLED); // restore killed for next query
Expand Down
5 changes: 4 additions & 1 deletion sql/sql_class.h
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,7 @@ class THD : public MDL_context_owner,
/* disable parallel execute */
bool no_pq;
/* disable parallel query for store procedure and trigger */
bool in_sp_trigger;
uint in_sp_trigger;
/* select .. fro share/update */
bool locking_clause;
/* indicates whether parallel query is supported */
Expand All @@ -972,6 +972,9 @@ class THD : public MDL_context_owner,
/* protects THD::pq_workers. */
mysql_mutex_t pq_lock_worker;

/* for explain analyze. */
std::string pq_explain;

/* Used to execute base64 coded binlog events in MySQL server */
Relay_log_info *rli_fake;
/* Slave applier execution context */
Expand Down
25 changes: 17 additions & 8 deletions sql/sql_parallel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "sql/sql_tmp_table.h"
#include "sql/timing_iterator.h"
#include "sql/transaction.h"
#include "sql/join_optimizer/explain_access_path.h"

ulonglong parallel_memory_limit = 0;
ulong parallel_max_threads = 0;
Expand Down Expand Up @@ -785,6 +786,14 @@ void *pq_worker_exec(void *arg) {
mngr->signal_status(thd, PQ_worker_state::READY);
join->query_expression()->ExecuteIteratorQuery(thd);

if (thd->lex->is_explain_analyze && mngr->m_id == 0) {
Query_expression *unit = leader_thd->lex->unit;
leader_thd->pq_explain += PrintQueryPlan(
0, unit->root_access_path(),
unit->is_union() ? nullptr : unit->first_query_block()->join,
!unit->is_union());
}

if (join->thd->is_error() || join->thd->pq_error ||
DBUG_EVALUATE_IF("pq_worker_error3", true, false)) {
goto err;
Expand Down Expand Up @@ -1136,18 +1145,18 @@ bool set_key_order(QEP_TAB *tab, std::vector<std::string> &key_fields,
return false;
}

std::map<std::string, Item *> fields_map; // map[field] = item
std::map<std::string, Item *>::iterator iter;
std::vector<Item *> order_items;
std::map<std::string, Item **> fields_map; // map[field] = item
std::map<std::string, Item **>::iterator iter;
std::vector<Item **> order_items;

Ref_item_array ref_items = *ref_ptrs;
Ref_item_array &ref_items = *ref_ptrs;
/** (1) build the map: {name} -> {item} */
for (uint i = 0; i < join->query_block_fields->size(); i++) {
Item *item = ref_items[i];
if (item && item->type() == Item::FIELD_ITEM) {
std::string field_name =
static_cast<Item_field *>(item)->field->field_name;
fields_map[field_name] = item;
fields_map[field_name] = &ref_items[i];
}
}

Expand All @@ -1163,15 +1172,15 @@ bool set_key_order(QEP_TAB *tab, std::vector<std::string> &key_fields,
THD *thd = join->thd;
SQL_I_List<ORDER> order_list;

for (Item *item : order_items) {
for (Item **item : order_items) {
ORDER *order = new (thd->pq_mem_root) ORDER();
if (!order) {
*order_ptr = NULL;
return true;
}

order->item_initial = item;
order->item = &item;
order->item_initial = *item;
order->item = item;
order->in_field_list = 1;
order->is_explicit = 0;
add_to_list(order_list, order);
Expand Down
4 changes: 2 additions & 2 deletions sql/sql_select.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3741,7 +3741,7 @@ bool JOIN::make_pq_tables_info() {
in tmplist, which we call it sum_field that use for recieving PQ workers's sum data.
*/
table = create_tmp_table(thd, tmp_param, *curr_fields, nullptr, false, true,
query_block->active_options(), HA_POS_ERROR, "", true);
query_block->active_options(), HA_POS_ERROR, "", true, true);
query_result->m_table = table;

// the leader/worker's table is not same
Expand Down Expand Up @@ -4409,7 +4409,7 @@ bool JOIN::make_leader_tables_info() {
*/
TABLE *table =
create_tmp_table(thd, tmp_param, tmplist, nullptr, false, true,
query_block->active_options(), HA_POS_ERROR, "", true);
query_block->active_options(), HA_POS_ERROR, "", true, true);
if (table == nullptr) { DBUG_RETURN(true); }
table->materialized= false;
tmp_tables = 1;
Expand Down
14 changes: 9 additions & 5 deletions sql/sql_tmp_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,8 @@ TABLE *create_tmp_table(THD *thd, Temp_table_param *param,
const mem_root_deque<Item *> &fields, ORDER *group,
bool distinct, bool save_sum_fields,
ulonglong select_options, ha_rows rows_limit,
const char *table_alias, bool force_disk_table) {
const char *table_alias, bool force_disk_table,
bool parallel_query) {
DBUG_TRACE;
if (!param->allow_group_via_temp_table)
group = nullptr; // Can't use group key
Expand Down Expand Up @@ -1065,10 +1066,13 @@ TABLE *create_tmp_table(THD *thd, Temp_table_param *param,
if (param->m_window == nullptr || !param->m_window->is_last())
store_column = false;
}
if (item->const_item() && (int)hidden_field_count <= 0) {
// mark this item and then we can identify it without sending a message to MQ.
item->skip_create_tmp_table = true;
continue; // We don't have to store this

if (item->const_item()) {
if (parallel_query || (int)hidden_field_count <= 0) {
// mark this item and then we can identify it without sending a message to MQ.
item->skip_create_tmp_table = true;
continue; // We don't have to store this
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion sql/sql_tmp_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ TABLE *create_tmp_table(THD *thd, Temp_table_param *param,
const mem_root_deque<Item *> &fields, ORDER *group,
bool distinct, bool save_sum_fields,
ulonglong select_options, ha_rows rows_limit,
const char *table_alias, bool force_disk_table = false);
const char *table_alias, bool force_disk_table = false,
bool parallel_query = false);
bool open_tmp_table(TABLE *table);
TABLE *create_tmp_table_from_fields(THD *thd, List<Create_field> &field_list,
bool is_virtual = true,
Expand Down

0 comments on commit 33a885c

Please sign in to comment.