Skip to content

Commit

Permalink
fix be core when partial table load failed
Browse files Browse the repository at this point in the history
  • Loading branch information
sollhui committed May 30, 2024
1 parent a48d93f commit de00c81
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 6 deletions.
5 changes: 4 additions & 1 deletion be/src/io/fs/multi_table_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "runtime/fragment_mgr.h"
#include "runtime/runtime_state.h"
#include "runtime/stream_load/new_load_stream_mgr.h"
#include "util/debug_points.h"
#include "util/thrift_rpc_helper.h"
#include "util/thrift_util.h"
#include "util/time.h"
Expand Down Expand Up @@ -222,8 +223,9 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para
_unplanned_pipes.size(), _planned_pipes.size(), params.size());
_unplanned_pipes.clear();

_inflight_cnt += params.size();
for (auto& plan : params) {
DBUG_EXECUTE_IF("MultiTablePipe.exec_plans.failed",
{ return Status::Aborted("MultiTablePipe.exec_plans.failed"); });
if (!plan.__isset.table_name ||
_planned_pipes.find(plan.table_name) == _planned_pipes.end()) {
return Status::Aborted("Missing vital param: table_name");
Expand All @@ -243,6 +245,7 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para
CHECK(false);
}

_inflight_cnt++;
exec_env->fragment_mgr()->exec_plan_fragment(plan, [this](RuntimeState* state,
Status* status) {
{
Expand Down
34 changes: 29 additions & 5 deletions be/src/runtime/routine_load/routine_load_task_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,20 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
} \
} while (false);

#define HANDLE_MULTI_TABLE_ERROR(stmt, err_msg) \
do { \
Status _status_ = (stmt); \
if (UNLIKELY(!_status_.ok() && !_status_.is<PUBLISH_TIMEOUT>())) { \
err_handler(ctx, _status_, err_msg); \
cb(ctx); \
_status_ = ctx->future.get(); \
if (!_status_.ok()) { \
LOG(ERROR) << "failed to get future, " << ctx->brief(); \
} \
return; \
} \
} while (false);

LOG(INFO) << "begin to execute routine load task: " << ctx->brief();

// create data consumer group
Expand Down Expand Up @@ -338,17 +352,27 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe =
std::static_pointer_cast<io::KafkaConsumerPipe>(ctx->body_sink);

// start to consume, this may block a while
HANDLE_ERROR(consumer_grp->start_all(ctx, kafka_pipe), "consuming failed");

if (ctx->is_multi_table) {
Status st;
// plan the rest of unplanned data
auto multi_table_pipe = std::static_pointer_cast<io::MultiTablePipe>(ctx->body_sink);
HANDLE_ERROR(multi_table_pipe->request_and_exec_plans(),
"multi tables task executes plan error");
// start to consume, this may block a while
st = consumer_grp->start_all(ctx, kafka_pipe);
if (!st.ok()) {
multi_table_pipe->handle_consume_finished();
HANDLE_MULTI_TABLE_ERROR(st, "consuming failed");
}
st = multi_table_pipe->request_and_exec_plans();
if (!st.ok()) {
multi_table_pipe->handle_consume_finished();
HANDLE_MULTI_TABLE_ERROR(st, "multi tables task executes plan error");
}
// need memory order
multi_table_pipe->handle_consume_finished();
HANDLE_ERROR(kafka_pipe->finish(), "finish multi table task failed");
} else {
// start to consume, this may block a while
HANDLE_ERROR(consumer_grp->start_all(ctx, kafka_pipe), "consuming failed");
}

// wait for all consumers finished
Expand Down

0 comments on commit de00c81

Please sign in to comment.