Skip to content

Commit

Permalink
[fix](multi-table) fix single stream multi table load can not finish (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
sollhui authored Oct 13, 2023
1 parent 283bd59 commit 2ec53ff
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 8 deletions.
7 changes: 2 additions & 5 deletions be/src/runtime/routine_load/data_consumer_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include <utility>

#include "common/logging.h"
#include "io/fs/kafka_consumer_pipe.h"
#include "librdkafka/rdkafkacpp.h"
#include "runtime/routine_load/data_consumer.h"
#include "runtime/stream_load/stream_load_context.h"
Expand Down Expand Up @@ -72,7 +71,8 @@ KafkaDataConsumerGroup::~KafkaDataConsumerGroup() {
DCHECK(_queue.get_size() == 0);
}

Status KafkaDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> ctx) {
Status KafkaDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> ctx,
std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe) {
Status result_st = Status::OK();
// start all consumers
for (auto& consumer : _consumers) {
Expand Down Expand Up @@ -105,9 +105,6 @@ Status KafkaDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> ctx)
int64_t left_rows = ctx->max_batch_rows;
int64_t left_bytes = ctx->max_batch_size;

std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe =
std::static_pointer_cast<io::KafkaConsumerPipe>(ctx->body_sink);

LOG(INFO) << "start consumer group: " << _grp_id << ". max time(ms): " << left_time
<< ", batch rows: " << left_rows << ", batch size: " << left_bytes << ". "
<< ctx->brief();
Expand Down
9 changes: 7 additions & 2 deletions be/src/runtime/routine_load/data_consumer_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <vector>

#include "common/status.h"
#include "io/fs/kafka_consumer_pipe.h"
#include "runtime/routine_load/data_consumer.h"
#include "util/blocking_queue.hpp"
#include "util/uid_util.h"
Expand Down Expand Up @@ -60,7 +61,10 @@ class DataConsumerGroup {
}

// start all consumers
virtual Status start_all(std::shared_ptr<StreamLoadContext> ctx) { return Status::OK(); }
virtual Status start_all(std::shared_ptr<StreamLoadContext> ctx,
std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe) {
return Status::OK();
}

protected:
UniqueId _grp_id;
Expand All @@ -82,7 +86,8 @@ class KafkaDataConsumerGroup : public DataConsumerGroup {

virtual ~KafkaDataConsumerGroup();

Status start_all(std::shared_ptr<StreamLoadContext> ctx) override;
Status start_all(std::shared_ptr<StreamLoadContext> ctx,
std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe) override;
// assign topic partitions to all consumers equally
Status assign_topic_partitions(std::shared_ptr<StreamLoadContext> ctx);

Expand Down
6 changes: 5 additions & 1 deletion be/src/runtime/routine_load/routine_load_task_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,11 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
#endif
}

std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe =
std::static_pointer_cast<io::KafkaConsumerPipe>(ctx->body_sink);

// start to consume, this may block a while
HANDLE_ERROR(consumer_grp->start_all(ctx), "consuming failed");
HANDLE_ERROR(consumer_grp->start_all(ctx, kafka_pipe), "consuming failed");

if (ctx->is_multi_table) {
// plan the rest of unplanned data
Expand All @@ -346,6 +349,7 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
"multi tables task executes plan error");
// need memory order
multi_table_pipe->set_consume_finished();
HANDLE_ERROR(kafka_pipe->finish(), "finish multi table task failed");
}

// wait for all consumers finished
Expand Down

0 comments on commit 2ec53ff

Please sign in to comment.