Skip to content

Commit

Permalink
fix(interactive): Fix bugs of runtime (#4379)
Browse files Browse the repository at this point in the history
Fix bugs in scan and group by. 
Fix #4378
  • Loading branch information
zhanglei1949 authored Dec 25, 2024
1 parent 9699556 commit a9a865b
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 2 deletions.
31 changes: 31 additions & 0 deletions flex/engines/graph_db/runtime/adhoc/operators/group_by.cc
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,29 @@ std::shared_ptr<IContextColumn> string_to_list(
return builder.finish();
}

template <typename T>
std::shared_ptr<IContextColumn> scalar_to_list(
const Var& var, const std::vector<std::vector<size_t>>& to_aggregate) {
ListValueColumnBuilder<T> builder;
size_t col_size = to_aggregate.size();
builder.reserve(col_size);
std::vector<std::shared_ptr<ListImplBase>> impls;
for (size_t k = 0; k < col_size; ++k) {
auto& vec = to_aggregate[k];

std::vector<T> elem;
for (auto idx : vec) {
elem.push_back(TypedConverter<T>::to_typed(var.get(idx)));
}
auto impl = ListImpl<T>::make_list_impl(std::move(elem));
auto list = List::make_list(impl);
impls.emplace_back(impl);
builder.push_back_opt(list);
}
builder.set_list_impls(impls);
return builder.finish();
}

bl::result<std::shared_ptr<IContextColumn>> apply_reduce(
const AggFunc& func, const std::vector<std::vector<size_t>>& to_aggregate) {
if (func.aggregate == AggrKind::kSum) {
Expand Down Expand Up @@ -497,6 +520,14 @@ bl::result<std::shared_ptr<IContextColumn>> apply_reduce(
return tuple_to_list(var, to_aggregate);
} else if (var.type() == RTAnyType::kStringValue) {
return string_to_list(var, to_aggregate);
} else if (var.type() == RTAnyType::kI32Value) {
return scalar_to_list<int32_t>(var, to_aggregate);
} else if (var.type() == RTAnyType::kI64Value) {
return scalar_to_list<int64_t>(var, to_aggregate);
} else if (var.type() == RTAnyType::kU64Value) {
return scalar_to_list<uint64_t>(var, to_aggregate);
} else if (var.type() == RTAnyType::kF64Value) {
return scalar_to_list<double>(var, to_aggregate);
} else {
LOG(FATAL) << "not support" << static_cast<int>(var.type().type_enum_);
}
Expand Down
3 changes: 1 addition & 2 deletions flex/engines/graph_db/runtime/common/operators/scan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,10 @@ bl::result<Context> Scan::find_vertex_with_id(const ReadTransaction& txn,
}
if (GlobalId::get_label_id(gid) == label) {
vid = GlobalId::get_vid(gid);
builder.push_back_opt(vid);
} else {
LOG(ERROR) << "Global id " << gid << " does not match label " << label;
return Context();
}
builder.push_back_opt(vid);
Context ctx;
ctx.set(alias, builder.finish());
return ctx;
Expand Down
1 change: 1 addition & 0 deletions flex/engines/graph_db/runtime/common/rt_any.h
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ class ListImpl : ListImplBase {
static std::shared_ptr<ListImplBase> make_list_impl(std::vector<T>&& vals) {
auto new_list = new ListImpl<T>();
new_list->list_ = std::move(vals);
new_list->is_valid_.resize(new_list->list_.size(), true);
return std::shared_ptr<ListImplBase>(static_cast<ListImplBase*>(new_list));
}

Expand Down

0 comments on commit a9a865b

Please sign in to comment.