From 2397f8a73f40b0e32d713159d057a02374825a20 Mon Sep 17 00:00:00 2001 From: Zhang Lei Date: Fri, 6 Sep 2024 16:39:06 +0800 Subject: [PATCH] refactor(interactive): Add error handling for adhoc queries (#4188) We Introduce boost::leaf for lightweight error handling in the Interactive Ad Hoc Query Service. For each operator, we refactor it returns a `bl::result` value, allowing the error to be caught and returned at the most outside, i.e. `adhoc_app.cc`. In this PR, we only wrap the errors about `UNSUPPORTED` `UNIMPLEMENTED` and `BAD_REQUET` at operator level. For lower level errors, we still let it crash. TBD: - GAE depends on boost::leaf via vineyard, but we don't want to depend on vineyard, so a new submodule is introduced, is this ok? Fix #4189 We will continue to resolve these `UNSUPPORTED` and `UNIMPLEMENTED` errors, by covering all combination of all params and operators in physical plan. TODO #4190 --- .devcontainer/devcontainer.json | 2 +- .github/workflows/interactive.yml | 15 +--- flex/CMakeLists.txt | 1 + flex/bin/adhoc_runner.cc | 33 ++++++- flex/engines/graph_db/app/adhoc_app.cc | 51 ++++++++++- .../graph_db/database/graph_db_session.cc | 18 +++- flex/engines/graph_db/runtime/CMakeLists.txt | 1 + .../graph_db/runtime/adhoc/operators/dedup.cc | 4 +- .../runtime/adhoc/operators/edge_expand.cc | 19 ++-- .../graph_db/runtime/adhoc/operators/get_v.cc | 9 +- .../runtime/adhoc/operators/group_by.cc | 29 +++++-- .../runtime/adhoc/operators/intersect.cc | 6 +- .../graph_db/runtime/adhoc/operators/join.cc | 12 ++- .../graph_db/runtime/adhoc/operators/limit.cc | 2 +- .../runtime/adhoc/operators/operators.h | 75 ++++++++-------- .../runtime/adhoc/operators/order_by.cc | 4 +- .../runtime/adhoc/operators/path_expand.cc | 29 ++++--- .../runtime/adhoc/operators/project.cc | 8 +- .../graph_db/runtime/adhoc/operators/scan.cc | 11 ++- .../runtime/adhoc/operators/select.cc | 6 +- .../engines/graph_db/runtime/adhoc/runtime.cc | 86 ++++++++++++------- flex/engines/graph_db/runtime/adhoc/runtime.h | 10 ++- .../graph_db/runtime/common/leaf_utils.h | 36 ++++++++ .../runtime/common/operators/edge_expand.cc | 57 ++++++++---- .../runtime/common/operators/edge_expand.h | 64 +++++++++----- .../graph_db/runtime/common/operators/get_v.h | 42 +++++---- .../runtime/common/operators/intersect.cc | 22 +++-- .../runtime/common/operators/intersect.h | 9 +- .../graph_db/runtime/common/operators/join.cc | 12 ++- .../graph_db/runtime/common/operators/join.h | 5 +- .../runtime/common/operators/path_expand.cc | 22 +++-- .../runtime/common/operators/path_expand.h | 23 +++-- .../runtime/common/operators/project.h | 9 +- .../graph_db/runtime/common/operators/scan.cc | 11 ++- .../graph_db/runtime/common/operators/scan.h | 14 +-- flex/engines/graph_db/runtime/common/types.h | 33 +++++++ .../handler/graph_db_http_handler.cc | 2 +- flex/engines/http_server/types.h | 3 +- flex/scripts/install_dependencies.sh | 11 ++- flex/utils/property/types.cc | 50 ++++++++++- flex/utils/property/types.h | 1 + flex/utils/result.h | 6 ++ flex/utils/service_utils.cc | 7 -- flex/utils/service_utils.h | 11 --- 44 files changed, 613 insertions(+), 268 deletions(-) create mode 100644 flex/engines/graph_db/runtime/common/leaf_utils.h diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 4e5ffa8b324d..282bab918a11 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -4,7 +4,7 @@ { "name": "GraphScope", // Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile - "image": "registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:latest", + "image": "registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.24.2-amd64", // Features to add to the dev container. More info: https://containers.dev/features. "features": { diff --git a/.github/workflows/interactive.yml b/.github/workflows/interactive.yml index 7c5610e28374..7fbd1cca6abe 100644 --- a/.github/workflows/interactive.yml +++ b/.github/workflows/interactive.yml @@ -28,7 +28,7 @@ jobs: runs-on: ubuntu-20.04 if: ${{ github.repository == 'alibaba/GraphScope' }} container: - image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.23.0 + image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.24.2-amd64 steps: - uses: actions/checkout@v4 @@ -121,7 +121,6 @@ jobs: GS_TEST_DIR: ${{ github.workspace }}/gstest FLEX_DATA_DIR: ${{ github.workspace }}/flex/interactive/examples/modern_graph TMP_INTERACTIVE_WORKSPACE: /tmp/temp_workspace - LD_LIBRARY_PATH: /usr/local/lib run: | rm -rf ${TMP_INTERACTIVE_WORKSPACE} cd ${GITHUB_WORKSPACE}/flex/build/ @@ -137,7 +136,6 @@ jobs: env: FLEX_DATA_DIR: ${{ github.workspace }}/flex/interactive/examples/modern_graph TMP_INTERACTIVE_WORKSPACE: /tmp/temp_workspace - LD_LIBRARY_PATH: /usr/local/lib run: | cd ${GITHUB_WORKSPACE}/flex/interactive/sdk/ @@ -210,7 +208,6 @@ jobs: TMP_INTERACTIVE_WORKSPACE: /tmp/temp_workspace PLUGIN_DIR: /tmp/temp_workspace/data/modern_graph/plugins FLEX_DATA_DIR: ${{ github.workspace }}/flex/interactive/examples/modern_graph - LD_LIBRARY_PATH: /usr/local/lib run: | rm -rf ${TMP_INTERACTIVE_WORKSPACE} cd ${GITHUB_WORKSPACE}/flex/build/ @@ -271,7 +268,6 @@ jobs: GS_TEST_DIR: ${{ github.workspace }}/gstest HOME : /home/graphscope/ INTERACTIVE_WORKSPACE: /tmp/interactive_workspace - LD_LIBRARY_PATH: /usr/local/lib run: | cd ${GITHUB_WORKSPACE}/flex/tests/hqps/ export ENGINE_TYPE=hiactor @@ -285,7 +281,6 @@ jobs: GS_TEST_DIR: ${{ github.workspace }}/gstest HOME : /home/graphscope/ INTERACTIVE_WORKSPACE: /tmp/interactive_workspace - LD_LIBRARY_PATH: /usr/local/lib run: | cd ${GITHUB_WORKSPACE}/flex/tests/hqps/ export ENGINE_TYPE=hiactor @@ -299,7 +294,6 @@ jobs: GS_TEST_DIR: ${{ github.workspace }}/gstest HOME : /home/graphscope/ INTERACTIVE_WORKSPACE: /tmp/interactive_workspace - LD_LIBRARY_PATH: /usr/local/lib run: | cd ${GITHUB_WORKSPACE}/flex/tests/hqps/ export ENGINE_TYPE=hiactor @@ -313,7 +307,6 @@ jobs: GS_TEST_DIR: ${{ github.workspace }}/gstest HOME : /home/graphscope/ INTERACTIVE_WORKSPACE: /tmp/interactive_workspace - LD_LIBRARY_PATH: /usr/local/lib run: | cd ${GITHUB_WORKSPACE}/flex/tests/hqps/ export ENGINE_TYPE=hiactor @@ -353,7 +346,7 @@ jobs: test-cmake-options: runs-on: ubuntu-20.04 container: - image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.23.0 + image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.24.2-amd64 strategy: matrix: BUILD_TEST: [ON, OFF] @@ -373,7 +366,7 @@ jobs: test-AOCC-compilation: runs-on: ubuntu-20.04 container: - image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.23.0 + image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.24.2-amd64 steps: - uses: actions/checkout@v4 @@ -402,7 +395,7 @@ jobs: runs-on: ubuntu-20.04 if: ${{ github.repository == 'alibaba/GraphScope' }} container: - image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.23.0 + image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.24.2-amd64 steps: - uses: actions/checkout@v4 diff --git a/flex/CMakeLists.txt b/flex/CMakeLists.txt index f0a01900c838..a5d92f829bdc 100644 --- a/flex/CMakeLists.txt +++ b/flex/CMakeLists.txt @@ -125,6 +125,7 @@ find_package(Boost REQUIRED COMPONENTS system filesystem # required by folly context program_options regex thread date_time) add_definitions("-DBOOST_BIND_GLOBAL_PLACEHOLDERS") +include_directories(SYSTEM ${Boost_INCLUDE_DIRS}) #find hiactor---------------------------------------------------------------------- find_package(Hiactor) diff --git a/flex/bin/adhoc_runner.cc b/flex/bin/adhoc_runner.cc index 88f49b3f20fb..976dac8b51f4 100644 --- a/flex/bin/adhoc_runner.cc +++ b/flex/bin/adhoc_runner.cc @@ -23,6 +23,7 @@ #include "flex/engines/graph_db/runtime/adhoc/runtime.h" namespace bpo = boost::program_options; +namespace bl = boost::leaf; std::string read_pb(const std::string& filename) { std::ifstream file(filename, std::ios::binary); @@ -74,6 +75,32 @@ void load_params(const std::string& filename, } } +gs::runtime::Context eval_plan( + const physical::PhysicalPlan& plan, gs::ReadTransaction& txn, + const std::map& params) { + gs::runtime::Context ctx; + { + ctx = bl::try_handle_all( + [&plan, &txn, ¶ms]() { + return gs::runtime::runtime_eval(plan, txn, params); + }, + [&ctx](const gs::Status& err) { + LOG(FATAL) << "Error in execution: " << err.error_message(); + return ctx; + }, + [&](const bl::error_info& err) { + LOG(FATAL) << "boost leaf error: " << err.error().value() << ", " + << err.exception()->what(); + return ctx; + }, + [&]() { + LOG(FATAL) << "Unknown error in execution"; + return ctx; + }); + } + return ctx; +} + int main(int argc, char** argv) { bpo::options_description desc("Usage:"); desc.add_options()("help", "Display help message")( @@ -158,7 +185,7 @@ int main(int argc, char** argv) { double t1 = -grape::GetCurrentTime(); for (int i = 0; i < query_num; ++i) { auto& m = map[i % params_num]; - auto ctx = gs::runtime::runtime_eval(pb, txn, m); + auto ctx = eval_plan(pb, txn, m); gs::Encoder output(outputs[i]); gs::runtime::eval_sink(ctx, txn, output); } @@ -167,7 +194,7 @@ int main(int argc, char** argv) { double t2 = -grape::GetCurrentTime(); for (int i = 0; i < query_num; ++i) { auto& m = map[i % params_num]; - auto ctx = gs::runtime::runtime_eval(pb, txn, m); + auto ctx = eval_plan(pb, txn, m); outputs[i].clear(); gs::Encoder output(outputs[i]); gs::runtime::eval_sink(ctx, txn, output); @@ -177,7 +204,7 @@ int main(int argc, char** argv) { double t3 = -grape::GetCurrentTime(); for (int i = 0; i < query_num; ++i) { auto& m = map[i % params_num]; - auto ctx = gs::runtime::runtime_eval(pb, txn, m); + auto ctx = eval_plan(pb, txn, m); outputs[i].clear(); gs::Encoder output(outputs[i]); gs::runtime::eval_sink(ctx, txn, output); diff --git a/flex/engines/graph_db/app/adhoc_app.cc b/flex/engines/graph_db/app/adhoc_app.cc index e8dbb626c80e..a500c15cb76c 100644 --- a/flex/engines/graph_db/app/adhoc_app.cc +++ b/flex/engines/graph_db/app/adhoc_app.cc @@ -1,10 +1,27 @@ -#include "flex/engines/graph_db/app/adhoc_app.h" +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "flex/engines/graph_db/app/adhoc_app.h" #include "flex/engines/graph_db/runtime/adhoc/operators/operators.h" #include "flex/engines/graph_db/runtime/adhoc/runtime.h" - #include "flex/proto_generated_gie/physical.pb.h" +#include + +namespace bl = boost::leaf; + namespace gs { bool AdhocReadApp::Query(const GraphDBSession& graph, Decoder& input, @@ -20,7 +37,35 @@ bool AdhocReadApp::Query(const GraphDBSession& graph, Decoder& input, LOG(INFO) << "plan: " << plan.DebugString(); - auto ctx = runtime::runtime_eval(plan, txn, {}); + gs::runtime::Context ctx; + gs::Status status = gs::Status::OK(); + { + ctx = bl::try_handle_all( + [&plan, &txn]() { return runtime::runtime_eval(plan, txn, {}); }, + [&ctx, &status](const gs::Status& err) { + status = err; + return ctx; + }, + [&](const bl::error_info& err) { + status = gs::Status( + gs::StatusCode::INTERNAL_ERROR, + "BOOST LEAF Error: " + std::to_string(err.error().value()) + + ", Exception: " + err.exception()->what()); + return ctx; + }, + [&]() { + status = gs::Status(gs::StatusCode::UNKNOWN, "Unknown error"); + return ctx; + }); + } + + if (!status.ok()) { + LOG(ERROR) << "Error: " << status.ToString(); + // We encode the error message to the output, so that the client can + // get the error message. + output.put_string(status.ToString()); + return false; + } runtime::eval_sink(ctx, txn, output); diff --git a/flex/engines/graph_db/database/graph_db_session.cc b/flex/engines/graph_db/database/graph_db_session.cc index 49712fdeecd2..634e73eb2786 100644 --- a/flex/engines/graph_db/database/graph_db_session.cc +++ b/flex/engines/graph_db/database/graph_db_session.cc @@ -166,10 +166,20 @@ Result> GraphDBSession::Eval(const std::string& input) { std::chrono::duration_cast(end - start) .count()); ++query_num_; - return Result>( - StatusCode::QUERY_FAILED, - "Query failed for procedure id:" + std::to_string((int) type), - result_buffer); + // When query failed, we assume the user may put the error message in the + // output buffer. + // For example, for adhoc_app.cc, if the query failed, the error info will + // be put in the output buffer. + if (result_buffer.size() > 0) { + return Result>( + StatusCode::QUERY_FAILED, + std::string{result_buffer.data(), result_buffer.size()}, result_buffer); + } else { + return Result>( + StatusCode::QUERY_FAILED, + "Query failed for procedure id:" + std::to_string((int) type), + result_buffer); + } } void GraphDBSession::GetAppInfo(Encoder& result) { db_.GetAppInfo(result); } diff --git a/flex/engines/graph_db/runtime/CMakeLists.txt b/flex/engines/graph_db/runtime/CMakeLists.txt index 40e05ab80e78..8868a143501a 100644 --- a/flex/engines/graph_db/runtime/CMakeLists.txt +++ b/flex/engines/graph_db/runtime/CMakeLists.txt @@ -6,6 +6,7 @@ install_flex_target(runtime_common) file(GLOB_RECURSE ADHOC_SOURCES "adhoc/*.cc") add_library(runtime_adhoc SHARED ${ADHOC_SOURCES}) target_link_libraries(runtime_adhoc runtime_common) +target_link_libraries(runtime_adhoc Boost::headers) install_flex_target(runtime_adhoc) diff --git a/flex/engines/graph_db/runtime/adhoc/operators/dedup.cc b/flex/engines/graph_db/runtime/adhoc/operators/dedup.cc index 8f3fcf92aae7..0e5470b8fce4 100644 --- a/flex/engines/graph_db/runtime/adhoc/operators/dedup.cc +++ b/flex/engines/graph_db/runtime/adhoc/operators/dedup.cc @@ -21,8 +21,8 @@ namespace gs { namespace runtime { -Context eval_dedup(const algebra::Dedup& opr, const ReadTransaction& txn, - Context&& ctx) { +bl::result eval_dedup(const algebra::Dedup& opr, + const ReadTransaction& txn, Context&& ctx) { std::vector keys; std::vector> vars; int keys_num = opr.keys_size(); diff --git a/flex/engines/graph_db/runtime/adhoc/operators/edge_expand.cc b/flex/engines/graph_db/runtime/adhoc/operators/edge_expand.cc index d8d82d5e7302..c0e75d69cd40 100644 --- a/flex/engines/graph_db/runtime/adhoc/operators/edge_expand.cc +++ b/flex/engines/graph_db/runtime/adhoc/operators/edge_expand.cc @@ -23,10 +23,10 @@ namespace gs { namespace runtime { -Context eval_edge_expand(const physical::EdgeExpand& opr, - const ReadTransaction& txn, Context&& ctx, - const std::map& params, - const physical::PhysicalOpr_MetaData& meta) { +bl::result eval_edge_expand( + const physical::EdgeExpand& opr, const ReadTransaction& txn, Context&& ctx, + const std::map& params, + const physical::PhysicalOpr_MetaData& meta) { int v_tag; if (!opr.has_v_tag()) { v_tag = -1; @@ -49,7 +49,9 @@ Context eval_edge_expand(const physical::EdgeExpand& opr, if (opr.expand_opt() == physical::EdgeExpand_ExpandOpt::EdgeExpand_ExpandOpt_VERTEX) { if (query_params.has_predicate()) { - LOG(FATAL) << "not support"; + LOG(ERROR) << "edge expand vertex with predicate is not supported"; + RETURN_UNSUPPORTED_ERROR( + "edge expand vertex with predicate is not supported"); } else { EdgeExpandParams eep; eep.v_tag = v_tag; @@ -82,7 +84,12 @@ Context eval_edge_expand(const physical::EdgeExpand& opr, eep); } } else { - LOG(FATAL) << "not support"; + LOG(ERROR) << "EdgeExpand with expand_opt: " << opr.expand_opt() + << " is " + "not supported"; + RETURN_UNSUPPORTED_ERROR( + "EdgeExpand with expand_opt is not supported: " + + std::to_string(static_cast(opr.expand_opt()))); } return ctx; } diff --git a/flex/engines/graph_db/runtime/adhoc/operators/get_v.cc b/flex/engines/graph_db/runtime/adhoc/operators/get_v.cc index aec1fd39d6ff..c1e18a10fc6a 100644 --- a/flex/engines/graph_db/runtime/adhoc/operators/get_v.cc +++ b/flex/engines/graph_db/runtime/adhoc/operators/get_v.cc @@ -39,9 +39,9 @@ VOpt parse_opt(const physical::GetV_VOpt& opt) { } } -Context eval_get_v(const physical::GetV& opr, const ReadTransaction& txn, - Context&& ctx, - const std::map& params) { +bl::result eval_get_v( + const physical::GetV& opr, const ReadTransaction& txn, Context&& ctx, + const std::map& params) { int tag = -1; if (opr.has_tag()) { tag = opr.tag().value(); @@ -76,7 +76,8 @@ Context eval_get_v(const physical::GetV& opr, const ReadTransaction& txn, } } - LOG(FATAL) << "not support"; + LOG(ERROR) << "Unsupported GetV operation: " << opr.DebugString(); + RETURN_UNSUPPORTED_ERROR("Unsupported GetV operation: " + opr.DebugString()); return ctx; } diff --git a/flex/engines/graph_db/runtime/adhoc/operators/group_by.cc b/flex/engines/graph_db/runtime/adhoc/operators/group_by.cc index c318691d2066..14f070759c1e 100644 --- a/flex/engines/graph_db/runtime/adhoc/operators/group_by.cc +++ b/flex/engines/graph_db/runtime/adhoc/operators/group_by.cc @@ -18,6 +18,7 @@ #include "flex/engines/graph_db/runtime/adhoc/var.h" #include "flex/engines/graph_db/runtime/common/columns/value_columns.h" #include "flex/engines/graph_db/runtime/common/columns/vertex_columns.h" +#include "flex/engines/graph_db/runtime/common/leaf_utils.h" namespace gs { @@ -412,7 +413,7 @@ std::shared_ptr string_to_list( return builder.finish(); } -std::shared_ptr apply_reduce( +bl::result> apply_reduce( const AggFunc& func, const std::vector>& to_aggregate) { if (func.aggregate == AggrKind::kSum) { if (func.vars.size() != 1) { @@ -422,8 +423,12 @@ std::shared_ptr apply_reduce( if (var.type() == RTAnyType::kI32Value) { return numeric_sum(var, to_aggregate); } else { - LOG(FATAL) << "reduce on " << static_cast(var.type().type_enum_) + LOG(ERROR) << "reduce on " << static_cast(var.type().type_enum_) << " is not supported..."; + RETURN_UNSUPPORTED_ERROR( + "reduce on " + + std::to_string(static_cast(var.type().type_enum_)) + + " is not supported..."); } } else if (func.aggregate == AggrKind::kToSet) { if (func.vars.size() != 1) { @@ -433,7 +438,12 @@ std::shared_ptr apply_reduce( if (var.type() == RTAnyType::kStringValue) { return string_to_set(var, to_aggregate); } else { - LOG(FATAL) << "not support"; + LOG(ERROR) << "reduce on " << static_cast(var.type().type_enum_) + << " is not supported..."; + RETURN_UNSUPPORTED_ERROR( + "reduce on " + + std::to_string(static_cast(var.type().type_enum_)) + + " is not supported..."); } } else if (func.aggregate == AggrKind::kCountDistinct) { if (func.vars.size() == 1 && func.vars[0].type() == RTAnyType::kVertex) { @@ -513,12 +523,15 @@ std::shared_ptr apply_reduce( } } - LOG(FATAL) << "unsupport " << static_cast(func.aggregate); + LOG(ERROR) << "Unsupported aggregate function " + << static_cast(func.aggregate); + RETURN_UNSUPPORTED_ERROR("Unsupported aggregate function " + + std::to_string(static_cast(func.aggregate))); return nullptr; } -Context eval_group_by(const physical::GroupBy& opr, const ReadTransaction& txn, - Context&& ctx) { +bl::result eval_group_by(const physical::GroupBy& opr, + const ReadTransaction& txn, Context&& ctx) { std::vector functions; std::vector mappings; int func_num = opr.functions_size(); @@ -535,7 +548,7 @@ Context eval_group_by(const physical::GroupBy& opr, const ReadTransaction& txn, for (size_t _i = 0; _i < ctx.row_num(); ++_i) { tmp.emplace_back(_i); } - auto new_col = apply_reduce(functions[i], {tmp}); + BOOST_LEAF_AUTO(new_col, apply_reduce(functions[i], {tmp})); ret.set(functions[i].alias, new_col); ret.append_tag_id(functions[i].alias); } @@ -569,7 +582,7 @@ Context eval_group_by(const physical::GroupBy& opr, const ReadTransaction& txn, } for (int i = 0; i < func_num; ++i) { - auto new_col = apply_reduce(functions[i], to_aggregate); + BOOST_LEAF_AUTO(new_col, apply_reduce(functions[i], to_aggregate)); ret.set(functions[i].alias, new_col); ret.append_tag_id(functions[i].alias); } diff --git a/flex/engines/graph_db/runtime/adhoc/operators/intersect.cc b/flex/engines/graph_db/runtime/adhoc/operators/intersect.cc index 083875309f1e..0129b2040505 100644 --- a/flex/engines/graph_db/runtime/adhoc/operators/intersect.cc +++ b/flex/engines/graph_db/runtime/adhoc/operators/intersect.cc @@ -20,9 +20,9 @@ namespace gs { namespace runtime { -Context eval_intersect(const ReadTransaction& txn, - const physical::Intersect& opr, - std::vector&& ctxs) { +bl::result eval_intersect(const ReadTransaction& txn, + const physical::Intersect& opr, + std::vector&& ctxs) { int32_t key = opr.key(); if (ctxs.size() == 1) { return std::move(ctxs[0]); diff --git a/flex/engines/graph_db/runtime/adhoc/operators/join.cc b/flex/engines/graph_db/runtime/adhoc/operators/join.cc index 96a6a6dea3b6..613451173562 100644 --- a/flex/engines/graph_db/runtime/adhoc/operators/join.cc +++ b/flex/engines/graph_db/runtime/adhoc/operators/join.cc @@ -18,19 +18,22 @@ namespace gs { namespace runtime { -Context eval_join(const physical::Join& opr, Context&& ctx, Context&& ctx2) { +bl::result eval_join(const physical::Join& opr, Context&& ctx, + Context&& ctx2) { JoinParams p; auto left_keys = opr.left_keys(); for (int i = 0; i < left_keys.size(); i++) { if (!left_keys.Get(i).has_tag()) { - LOG(FATAL) << "left_keys should have tag"; + LOG(ERROR) << "left_keys should have tag"; + RETURN_BAD_REQUEST_ERROR("left_keys should have tag"); } p.left_columns.push_back(left_keys.Get(i).tag().id()); } auto right_keys = opr.right_keys(); for (int i = 0; i < right_keys.size(); i++) { if (!right_keys.Get(i).has_tag()) { - LOG(FATAL) << "right_keys should have tag"; + LOG(ERROR) << "right_keys should have tag"; + RETURN_BAD_REQUEST_ERROR("right_keys should have tag"); } p.right_columns.push_back(right_keys.Get(i).tag().id()); } @@ -48,7 +51,8 @@ Context eval_join(const physical::Join& opr, Context&& ctx, Context&& ctx2) { p.join_type = JoinKind::kLeftOuterJoin; break; default: - LOG(FATAL) << "unsupported join kind" << opr.join_kind(); + RETURN_UNSUPPORTED_ERROR("Unsupported join kind: " + + std::to_string(static_cast(opr.join_kind()))); } return Join::join(std::move(ctx), std::move(ctx2), p); } diff --git a/flex/engines/graph_db/runtime/adhoc/operators/limit.cc b/flex/engines/graph_db/runtime/adhoc/operators/limit.cc index 0fe37a290229..21cf1008abdc 100644 --- a/flex/engines/graph_db/runtime/adhoc/operators/limit.cc +++ b/flex/engines/graph_db/runtime/adhoc/operators/limit.cc @@ -19,7 +19,7 @@ namespace gs { namespace runtime { -Context eval_limit(const algebra::Limit& opr, Context&& ctx) { +bl::result eval_limit(const algebra::Limit& opr, Context&& ctx) { int lower = 0; int upper = ctx.row_num(); diff --git a/flex/engines/graph_db/runtime/adhoc/operators/operators.h b/flex/engines/graph_db/runtime/adhoc/operators/operators.h index d72c99ce2111..395d94acae9c 100644 --- a/flex/engines/graph_db/runtime/adhoc/operators/operators.h +++ b/flex/engines/graph_db/runtime/adhoc/operators/operators.h @@ -21,61 +21,62 @@ #include "flex/engines/graph_db/database/read_transaction.h" #include "flex/engines/graph_db/runtime/common/context.h" +#include "flex/engines/graph_db/runtime/common/leaf_utils.h" #include "flex/utils/app_utils.h" namespace gs { namespace runtime { -Context eval_dedup(const algebra::Dedup& opr, const ReadTransaction& txn, - Context&& ctx); +bl::result eval_dedup(const algebra::Dedup& opr, + const ReadTransaction& txn, Context&& ctx); -Context eval_group_by(const physical::GroupBy& opr, const ReadTransaction& txn, - Context&& ctx); +bl::result eval_group_by(const physical::GroupBy& opr, + const ReadTransaction& txn, Context&& ctx); -Context eval_order_by(const algebra::OrderBy& opr, const ReadTransaction& txn, - Context&& ctx); +bl::result eval_order_by(const algebra::OrderBy& opr, + const ReadTransaction& txn, Context&& ctx); -Context eval_path_expand_v(const physical::PathExpand& opr, - const ReadTransaction& txn, Context&& ctx, - const std::map& params, - const physical::PhysicalOpr_MetaData& meta, - int alias); +bl::result eval_path_expand_v( + const physical::PathExpand& opr, const ReadTransaction& txn, Context&& ctx, + const std::map& params, + const physical::PhysicalOpr_MetaData& meta, int alias); -Context eval_path_expand_p(const physical::PathExpand& opr, - const ReadTransaction& txn, Context&& ctx, - const std::map& params, - const physical::PhysicalOpr_MetaData& meta, - int alias); +bl::result eval_path_expand_p( + const physical::PathExpand& opr, const ReadTransaction& txn, Context&& ctx, + const std::map& params, + const physical::PhysicalOpr_MetaData& meta, int alias); -Context eval_project(const physical::Project& opr, const ReadTransaction& txn, - Context&& ctx, - const std::map& params, - const std::vector& data_types); +bl::result eval_project( + const physical::Project& opr, const ReadTransaction& txn, Context&& ctx, + const std::map& params, + const std::vector& data_types); -Context eval_scan(const physical::Scan& scan_opr, const ReadTransaction& txn, - const std::map& params); +bl::result eval_scan(const physical::Scan& scan_opr, + const ReadTransaction& txn, + const std::map& params); -Context eval_select(const algebra::Select& opr, const ReadTransaction& txn, - Context&& ctx, - const std::map& params); +bl::result eval_select( + const algebra::Select& opr, const ReadTransaction& txn, Context&& ctx, + const std::map& params); -Context eval_edge_expand(const physical::EdgeExpand& opr, - const ReadTransaction& txn, Context&& ctx, - const std::map& params, - const physical::PhysicalOpr_MetaData& meta); +bl::result eval_edge_expand( + const physical::EdgeExpand& opr, const ReadTransaction& txn, Context&& ctx, + const std::map& params, + const physical::PhysicalOpr_MetaData& meta); -Context eval_get_v(const physical::GetV& opr, const ReadTransaction& txn, - Context&& ctx, - const std::map& params); +bl::result eval_get_v( + const physical::GetV& opr, const ReadTransaction& txn, Context&& ctx, + const std::map& params); -Context eval_intersect(const ReadTransaction& txn, - const physical::Intersect& opr, - std::vector&& ctx); +bl::result eval_intersect(const ReadTransaction& txn, + const physical::Intersect& opr, + std::vector&& ctx); -Context eval_join(const physical::Join& opr, Context&& ctx, Context&& ctx2); +bl::result eval_join(const physical::Join& opr, Context&& ctx, + Context&& ctx2); -Context eval_limit(const algebra::Limit& opr, Context&& ctx); +bl::result eval_limit(const algebra::Limit& opr, Context&& ctx); void eval_sink(const Context& ctx, const ReadTransaction& txn, Encoder& output); diff --git a/flex/engines/graph_db/runtime/adhoc/operators/order_by.cc b/flex/engines/graph_db/runtime/adhoc/operators/order_by.cc index 88e042ec55b9..2e85ef0607f1 100644 --- a/flex/engines/graph_db/runtime/adhoc/operators/order_by.cc +++ b/flex/engines/graph_db/runtime/adhoc/operators/order_by.cc @@ -57,8 +57,8 @@ class GeneralComparer { size_t keys_num_; }; -Context eval_order_by(const algebra::OrderBy& opr, const ReadTransaction& txn, - Context&& ctx) { +bl::result eval_order_by(const algebra::OrderBy& opr, + const ReadTransaction& txn, Context&& ctx) { int lower = 0; int upper = std::numeric_limits::max(); if (opr.has_limit()) { diff --git a/flex/engines/graph_db/runtime/adhoc/operators/path_expand.cc b/flex/engines/graph_db/runtime/adhoc/operators/path_expand.cc index f374c0261f1a..5b2ba7e244cf 100644 --- a/flex/engines/graph_db/runtime/adhoc/operators/path_expand.cc +++ b/flex/engines/graph_db/runtime/adhoc/operators/path_expand.cc @@ -21,11 +21,10 @@ namespace gs { namespace runtime { -Context eval_path_expand_v(const physical::PathExpand& opr, - const ReadTransaction& txn, Context&& ctx, - const std::map& params, - const physical::PhysicalOpr_MetaData& meta, - int alias) { +bl::result eval_path_expand_v( + const physical::PathExpand& opr, const ReadTransaction& txn, Context&& ctx, + const std::map& params, + const physical::PhysicalOpr_MetaData& meta, int alias) { CHECK(opr.has_start_tag()); int start_tag = opr.start_tag().value(); CHECK(opr.path_opt() == @@ -54,22 +53,24 @@ Context eval_path_expand_v(const physical::PathExpand& opr, if (opr.base().edge_expand().expand_opt() == physical::EdgeExpand_ExpandOpt::EdgeExpand_ExpandOpt_VERTEX) { if (query_params.has_predicate()) { - LOG(FATAL) << "not support"; + LOG(ERROR) << "path expand vertex with predicate is not supported"; + RETURN_UNSUPPORTED_ERROR( + "path expand vertex with predicate is not supported"); } else { return PathExpand::edge_expand_v(txn, std::move(ctx), pep); } } else { - LOG(FATAL) << "not support"; + LOG(ERROR) << "Currently only support edge expand to vertex"; + RETURN_UNSUPPORTED_ERROR("Currently only support edge expand to vertex"); } return ctx; } -Context eval_path_expand_p(const physical::PathExpand& opr, - const ReadTransaction& txn, Context&& ctx, - const std::map& params, - const physical::PhysicalOpr_MetaData& meta, - int alias) { +bl::result eval_path_expand_p( + const physical::PathExpand& opr, const ReadTransaction& txn, Context&& ctx, + const std::map& params, + const physical::PhysicalOpr_MetaData& meta, int alias) { CHECK(opr.has_start_tag()); int start_tag = opr.start_tag().value(); CHECK(opr.path_opt() == @@ -94,7 +95,9 @@ Context eval_path_expand_p(const physical::PathExpand& opr, pep.labels = parse_label_triplets(meta); if (query_params.has_predicate()) { - LOG(FATAL) << "not support"; + LOG(ERROR) << "Currently can not support predicate in path expand"; + RETURN_UNSUPPORTED_ERROR( + "Currently can not support predicate in path expand"); } else { return PathExpand::edge_expand_p(txn, std::move(ctx), pep); } diff --git a/flex/engines/graph_db/runtime/adhoc/operators/project.cc b/flex/engines/graph_db/runtime/adhoc/operators/project.cc index 371faa2531fe..3c372aef709c 100644 --- a/flex/engines/graph_db/runtime/adhoc/operators/project.cc +++ b/flex/engines/graph_db/runtime/adhoc/operators/project.cc @@ -42,10 +42,10 @@ bool exchange_tag_alias(const physical::Project_ExprAlias& m, int& tag, return false; } -Context eval_project(const physical::Project& opr, const ReadTransaction& txn, - Context&& ctx, - const std::map& params, - const std::vector& data_types) { +bl::result eval_project( + const physical::Project& opr, const ReadTransaction& txn, Context&& ctx, + const std::map& params, + const std::vector& data_types) { bool is_append = opr.is_append(); Context ret; if (is_append) { diff --git a/flex/engines/graph_db/runtime/adhoc/operators/scan.cc b/flex/engines/graph_db/runtime/adhoc/operators/scan.cc index 33ddd9f2cd60..6bb21e0c43bc 100644 --- a/flex/engines/graph_db/runtime/adhoc/operators/scan.cc +++ b/flex/engines/graph_db/runtime/adhoc/operators/scan.cc @@ -230,8 +230,9 @@ bool parse_idx_predicate(const algebra::IndexPredicate& predicate, return true; } -Context eval_scan(const physical::Scan& scan_opr, const ReadTransaction& txn, - const std::map& params) { +bl::result eval_scan( + const physical::Scan& scan_opr, const ReadTransaction& txn, + const std::map& params) { label_t label; int64_t vertex_id; int alias; @@ -260,7 +261,8 @@ Context eval_scan(const physical::Scan& scan_opr, const ReadTransaction& txn, scan_params.tables.push_back(table.id()); const auto& pks = txn.schema().get_vertex_primary_key(table.id()); if (pks.size() > 1) { - LOG(FATAL) << "only support one primary key"; + LOG(ERROR) << "only support one primary key"; + RETURN_UNSUPPORTED_ERROR("only support one primary key"); } auto [type, _, __] = pks[0]; if (type != PropertyType::kInt64) { @@ -379,8 +381,9 @@ Context eval_scan(const physical::Scan& scan_opr, const ReadTransaction& txn, [](label_t, vid_t) { return true; }); } } - LOG(FATAL) << "unsupport scan option " << scan_opr.DebugString() + LOG(ERROR) << "unsupport scan option " << scan_opr.DebugString() << " we only support scan vertex currently"; + RETURN_UNSUPPORTED_ERROR("unsupport scan option " + scan_opr.DebugString()); return Context(); } diff --git a/flex/engines/graph_db/runtime/adhoc/operators/select.cc b/flex/engines/graph_db/runtime/adhoc/operators/select.cc index 13b78e674ccb..0d786c224789 100644 --- a/flex/engines/graph_db/runtime/adhoc/operators/select.cc +++ b/flex/engines/graph_db/runtime/adhoc/operators/select.cc @@ -20,9 +20,9 @@ namespace gs { namespace runtime { -Context eval_select(const algebra::Select& opr, const ReadTransaction& txn, - Context&& ctx, - const std::map& params) { +bl::result eval_select( + const algebra::Select& opr, const ReadTransaction& txn, Context&& ctx, + const std::map& params) { Expr expr(txn, ctx, params, opr.predicate(), VarType::kPathVar); std::vector offsets; size_t row_num = ctx.row_num(); diff --git a/flex/engines/graph_db/runtime/adhoc/runtime.cc b/flex/engines/graph_db/runtime/adhoc/runtime.cc index 2405dc805cbc..48f35b9b2992 100644 --- a/flex/engines/graph_db/runtime/adhoc/runtime.cc +++ b/flex/engines/graph_db/runtime/adhoc/runtime.cc @@ -15,6 +15,8 @@ #include "flex/engines/graph_db/runtime/adhoc/runtime.h" +#include "flex/engines/graph_db/runtime/common/leaf_utils.h" + namespace gs { namespace runtime { @@ -80,9 +82,10 @@ static std::string get_opr_name(const physical::PhysicalOpr& opr) { } } -Context runtime_eval_impl(const physical::PhysicalPlan& plan, Context&& ctx, - const ReadTransaction& txn, - const std::map& params) { +bl::result runtime_eval_impl( + const physical::PhysicalPlan& plan, Context&& ctx, + const ReadTransaction& txn, + const std::map& params) { Context ret = ctx; auto& op_cost = OpCost::get().table; @@ -96,19 +99,21 @@ Context runtime_eval_impl(const physical::PhysicalPlan& plan, Context&& ctx, switch (opr.opr().op_kind_case()) { LOG(INFO) << "eval: " << get_opr_name(opr); case physical::PhysicalOpr_Operator::OpKindCase::kScan: { - ret = eval_scan(opr.opr().scan(), txn, params); + BOOST_LEAF_ASSIGN(ret, eval_scan(opr.opr().scan(), txn, params)); t += grape::GetCurrentTime(); op_cost["scan"] += t; } break; case physical::PhysicalOpr_Operator::OpKindCase::kEdge: { CHECK_EQ(opr.meta_data_size(), 1); - ret = eval_edge_expand(opr.opr().edge(), txn, std::move(ret), params, - opr.meta_data(0)); + BOOST_LEAF_ASSIGN( + ret, eval_edge_expand(opr.opr().edge(), txn, std::move(ret), params, + opr.meta_data(0))); t += grape::GetCurrentTime(); op_cost["edge_expand"] += t; } break; case physical::PhysicalOpr_Operator::OpKindCase::kVertex: { - ret = eval_get_v(opr.opr().vertex(), txn, std::move(ret), params); + BOOST_LEAF_ASSIGN( + ret, eval_get_v(opr.opr().vertex(), txn, std::move(ret), params)); t += grape::GetCurrentTime(); op_cost["get_v"] += t; } break; @@ -123,28 +128,32 @@ Context runtime_eval_impl(const physical::PhysicalPlan& plan, Context&& ctx, data_types.push_back(opr.meta_data(i).type()); } } - ret = eval_project(opr.opr().project(), txn, std::move(ret), params, - data_types); + BOOST_LEAF_ASSIGN(ret, eval_project(opr.opr().project(), txn, + std::move(ret), params, data_types)); t += grape::GetCurrentTime(); op_cost["project"] += t; } break; case physical::PhysicalOpr_Operator::OpKindCase::kOrderBy: { - ret = eval_order_by(opr.opr().order_by(), txn, std::move(ret)); + BOOST_LEAF_ASSIGN( + ret, eval_order_by(opr.opr().order_by(), txn, std::move(ret))); t += grape::GetCurrentTime(); op_cost["order_by"] += t; } break; case physical::PhysicalOpr_Operator::OpKindCase::kGroupBy: { - ret = eval_group_by(opr.opr().group_by(), txn, std::move(ret)); + BOOST_LEAF_ASSIGN( + ret, eval_group_by(opr.opr().group_by(), txn, std::move(ret))); t += grape::GetCurrentTime(); op_cost["group_by"] += t; } break; case physical::PhysicalOpr_Operator::OpKindCase::kDedup: { - ret = eval_dedup(opr.opr().dedup(), txn, std::move(ret)); + BOOST_LEAF_ASSIGN(ret, + eval_dedup(opr.opr().dedup(), txn, std::move(ret))); t += grape::GetCurrentTime(); op_cost["dedup"] += t; } break; case physical::PhysicalOpr_Operator::OpKindCase::kSelect: { - ret = eval_select(opr.opr().select(), txn, std::move(ret), params); + BOOST_LEAF_ASSIGN( + ret, eval_select(opr.opr().select(), txn, std::move(ret), params)); t += grape::GetCurrentTime(); op_cost["select"] += t; } break; @@ -160,19 +169,23 @@ Context runtime_eval_impl(const physical::PhysicalPlan& plan, Context&& ctx, if (next_opr.opr().vertex().has_alias()) { alias = next_opr.opr().vertex().alias().value(); } - ret = eval_path_expand_v(opr.opr().path(), txn, std::move(ret), - params, opr.meta_data(0), alias); + BOOST_LEAF_ASSIGN( + ret, eval_path_expand_v(opr.opr().path(), txn, std::move(ret), + params, opr.meta_data(0), alias)); ++i; } else { int alias = -1; if (opr.opr().path().has_alias()) { alias = opr.opr().path().alias().value(); } - ret = eval_path_expand_p(opr.opr().path(), txn, std::move(ret), - params, opr.meta_data(0), alias); + BOOST_LEAF_ASSIGN( + ret, eval_path_expand_p(opr.opr().path(), txn, std::move(ret), + params, opr.meta_data(0), alias)); } } else { - LOG(FATAL) << "not support"; + LOG(ERROR) << "Path Expand to Path is currently not supported"; + RETURN_UNSUPPORTED_ERROR( + "Path Expand to Path is currently not supported"); } t += grape::GetCurrentTime(); op_cost["path_expand"] += t; @@ -186,10 +199,11 @@ Context runtime_eval_impl(const physical::PhysicalPlan& plan, Context&& ctx, case physical::PhysicalOpr_Operator::OpKindCase::kJoin: { auto op = opr.opr().join(); auto ret_dup = ret.dup(); - auto ctx = runtime_eval_impl(op.left_plan(), std::move(ret), txn, params); - auto ctx2 = - runtime_eval_impl(op.right_plan(), std::move(ret_dup), txn, params); - ret = eval_join(op, std::move(ctx), std::move(ctx2)); + BOOST_LEAF_AUTO( + ctx, runtime_eval_impl(op.left_plan(), std::move(ret), txn, params)); + BOOST_LEAF_AUTO(ctx2, runtime_eval_impl(op.right_plan(), + std::move(ret_dup), txn, params)); + BOOST_LEAF_ASSIGN(ret, eval_join(op, std::move(ctx), std::move(ctx2))); } break; case physical::PhysicalOpr_Operator::OpKindCase::kIntersect: { @@ -200,22 +214,28 @@ Context runtime_eval_impl(const physical::PhysicalPlan& plan, Context&& ctx, for (size_t i = 0; i < num; ++i) { if (i + 1 < num) { auto ret_dup = ret.dup(); - ctxs.push_back(runtime_eval_impl(op.sub_plans(i), std::move(ret_dup), - txn, params)); + BOOST_LEAF_AUTO( + ctx, runtime_eval_impl(op.sub_plans(i), std::move(ret_dup), txn, + params)); + ctxs.push_back(std::move(ctx)); } else { - ctxs.push_back( - runtime_eval_impl(op.sub_plans(i), std::move(ret), txn, params)); + BOOST_LEAF_AUTO(ctx, runtime_eval_impl(op.sub_plans(i), + std::move(ret), txn, params)); + ctxs.push_back(std::move(ctx)); } } - ret = eval_intersect(txn, op, std::move(ctxs)); + BOOST_LEAF_ASSIGN(ret, eval_intersect(txn, op, std::move(ctxs))); } break; case physical::PhysicalOpr_Operator::OpKindCase::kLimit: { - ret = eval_limit(opr.opr().limit(), std::move(ret)); + BOOST_LEAF_ASSIGN(ret, eval_limit(opr.opr().limit(), std::move(ret))); } break; default: - LOG(FATAL) << "opr not support..." << get_opr_name(opr) - << opr.DebugString(); + LOG(ERROR) << "Unknown operator type: " + << static_cast(opr.opr().op_kind_case()); + RETURN_UNSUPPORTED_ERROR( + "Unknown operator type: " + + std::to_string(static_cast(opr.opr().op_kind_case()))); break; } if (terminate) { @@ -225,9 +245,9 @@ Context runtime_eval_impl(const physical::PhysicalPlan& plan, Context&& ctx, return ret; } -Context runtime_eval(const physical::PhysicalPlan& plan, - const ReadTransaction& txn, - const std::map& params) { +bl::result runtime_eval( + const physical::PhysicalPlan& plan, const ReadTransaction& txn, + const std::map& params) { return runtime_eval_impl(plan, Context(), txn, params); } diff --git a/flex/engines/graph_db/runtime/adhoc/runtime.h b/flex/engines/graph_db/runtime/adhoc/runtime.h index 9d9ccf5fd1be..9f2e93aa204f 100644 --- a/flex/engines/graph_db/runtime/adhoc/runtime.h +++ b/flex/engines/graph_db/runtime/adhoc/runtime.h @@ -18,13 +18,17 @@ #include "flex/engines/graph_db/runtime/adhoc/operators/operators.h" #include "flex/proto_generated_gie/physical.pb.h" +#include "boost/leaf.hpp" + +namespace bl = boost::leaf; + namespace gs { namespace runtime { -Context runtime_eval(const physical::PhysicalPlan& plan, - const ReadTransaction& txn, - const std::map& params); +bl::result runtime_eval( + const physical::PhysicalPlan& plan, const ReadTransaction& txn, + const std::map& params); } // namespace runtime diff --git a/flex/engines/graph_db/runtime/common/leaf_utils.h b/flex/engines/graph_db/runtime/common/leaf_utils.h new file mode 100644 index 000000000000..d7bbe69cba15 --- /dev/null +++ b/flex/engines/graph_db/runtime/common/leaf_utils.h @@ -0,0 +1,36 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef RUNTIME_COMMON_LEAF_UTILS_H_ +#define RUNTIME_COMMON_LEAF_UTILS_H_ + +#include +#include "flex/utils/result.h" + +namespace bl = boost::leaf; + +#define RETURN_UNSUPPORTED_ERROR(msg) \ + return ::boost::leaf::new_error( \ + ::gs::Status(::gs::StatusCode::UNSUPPORTED_OPERATION, msg)) + +#define RETURN_BAD_REQUEST_ERROR(msg) \ + return ::boost::leaf::new_error( \ + ::gs::Status(::gs::StatusCode::BAD_REQUEST, msg)) + +#define RETURN_NOT_IMPLEMENTED_ERROR(msg) \ + return ::boost::leaf::new_error( \ + ::gs::Status(::gs::StatusCode::UNIMPLEMENTED, msg)) + +#endif // RUNTIME_COMMON_LEAF_UTILS_H_ diff --git a/flex/engines/graph_db/runtime/common/operators/edge_expand.cc b/flex/engines/graph_db/runtime/common/operators/edge_expand.cc index b38da41e2570..d94f9017afa1 100644 --- a/flex/engines/graph_db/runtime/common/operators/edge_expand.cc +++ b/flex/engines/graph_db/runtime/common/operators/edge_expand.cc @@ -46,7 +46,7 @@ static std::vector get_expand_label_set( return label_triplets; } -Context EdgeExpand::expand_edge_without_predicate( +bl::result EdgeExpand::expand_edge_without_predicate( const ReadTransaction& txn, Context&& ctx, const EdgeExpandParams& params) { std::vector shuffle_offset; @@ -350,11 +350,12 @@ Context EdgeExpand::expand_edge_without_predicate( } } - LOG(FATAL) << "not support"; - return ctx; + LOG(ERROR) << "Unsupported edge expand direction: " << params.dir; + RETURN_UNSUPPORTED_ERROR("Unsupported edge expand direction" + + std::to_string(params.dir)); } -Context EdgeExpand::expand_vertex_without_predicate( +bl::result EdgeExpand::expand_vertex_without_predicate( const ReadTransaction& txn, Context&& ctx, const EdgeExpandParams& params) { std::shared_ptr input_vertex_list = std::dynamic_pointer_cast(ctx.get(params.v_tag)); @@ -504,7 +505,8 @@ Context EdgeExpand::expand_vertex_without_predicate( ctx.set_with_reshuffle(params.alias, builder.finish(), shuffle_offset); } else { - LOG(FATAL) << "xxx"; + LOG(ERROR) << "Unsupported edge expand direction"; + RETURN_UNSUPPORTED_ERROR("Unsupported edge expand direction"); } } else { MLVertexColumnBuilder builder; @@ -552,7 +554,8 @@ Context EdgeExpand::expand_vertex_without_predicate( auto casted_input_vertex_list = std::dynamic_pointer_cast(input_vertex_list); if (params.dir == Direction::kBoth) { - LOG(FATAL) << "AAAAAAAAA"; + LOG(ERROR) << "Unsupported edge expand direction"; + RETURN_UNSUPPORTED_ERROR("Unsupported edge expand direction"); } else if (params.dir == Direction::kIn) { casted_input_vertex_list->foreach_vertex( [&](size_t index, label_t label, vid_t v) { @@ -588,13 +591,17 @@ Context EdgeExpand::expand_vertex_without_predicate( }); ctx.set_with_reshuffle(params.alias, builder.finish(), shuffle_offset); } else { - LOG(FATAL) << "xxx"; + LOG(ERROR) << "Unsupported edge expand direction: " << params.dir; + RETURN_UNSUPPORTED_ERROR("Unsupported edge expand direction" + + std::to_string(params.dir)); } } else if (input_vertex_list_type == VertexColumnType::kMultiSegment) { auto casted_input_vertex_list = std::dynamic_pointer_cast(input_vertex_list); if (params.dir == Direction::kBoth) { - LOG(FATAL) << "AAAAAAAAA"; + LOG(ERROR) << "Unsupported edge expand direction: " << params.dir; + RETURN_UNSUPPORTED_ERROR("Unsupported edge expand direction" + + std::to_string(params.dir)); } else if (params.dir == Direction::kIn) { casted_input_vertex_list->foreach_vertex( [&](size_t index, label_t label, vid_t v) { @@ -630,10 +637,13 @@ Context EdgeExpand::expand_vertex_without_predicate( }); ctx.set_with_reshuffle(params.alias, builder.finish(), shuffle_offset); } else { - LOG(FATAL) << "xxx"; + LOG(ERROR) << "Unsupported edge expand direction: " << params.dir; + RETURN_UNSUPPORTED_ERROR("Unsupported edge expand direction" + + std::to_string(params.dir)); } } else { - LOG(FATAL) << "unexpected input vertex list type"; + LOG(ERROR) << "unexpected input vertex list type"; + RETURN_UNSUPPORTED_ERROR("unexpected input vertex list type"); } } else { if (input_vertex_list_type == VertexColumnType::kSingle) { @@ -672,7 +682,9 @@ Context EdgeExpand::expand_vertex_without_predicate( for (label_t output_vertex_label : output_vertex_set) { builder.start_label(output_vertex_label); if (params.dir == Direction::kBoth) { - LOG(FATAL) << "AAAAA"; + LOG(ERROR) << "Unsupported edge expand direction: " << params.dir; + RETURN_UNSUPPORTED_ERROR("Unsupported edge expand direction" + + std::to_string(params.dir)); } else if (params.dir == Direction::kIn) { for (auto& triplet : params.labels) { if (triplet.dst_label == input_vertex_label && @@ -691,7 +703,9 @@ Context EdgeExpand::expand_vertex_without_predicate( } } } else if (params.dir == Direction::kOut) { - LOG(FATAL) << "AAAAA"; + LOG(ERROR) << "Unsupported edge expand direction: " << params.dir; + RETURN_UNSUPPORTED_ERROR("Unsupported edge expand direction" + + std::to_string(params.dir)); } } #endif @@ -754,16 +768,19 @@ Context EdgeExpand::expand_vertex_without_predicate( ctx.set_with_reshuffle(params.alias, builder.finish(), shuffle_offset); return ctx; } else { - LOG(FATAL) << "not support"; + LOG(ERROR) << "Unsupported edge expand direction: " + << static_cast(params.dir); } - LOG(FATAL) << "edge expand vertex input multiple vertex label"; + LOG(ERROR) << "edge expand vertex input multiple vertex label"; + RETURN_UNSUPPORTED_ERROR( + "edge expand vertex input multiple vertex label"); } } return ctx; } -Context EdgeExpand::expand_2d_vertex_without_predicate( +bl::result EdgeExpand::expand_2d_vertex_without_predicate( const ReadTransaction& txn, Context&& ctx, const EdgeExpandParams& params1, const EdgeExpandParams& params2) { std::shared_ptr input_vertex_list = @@ -824,9 +841,13 @@ Context EdgeExpand::expand_2d_vertex_without_predicate( } } } - LOG(FATAL) << "XXXX"; - - return ctx; + LOG(ERROR) << "Unsupported edge expand 2d vertex without predicate, " + << "params1.dir: " << static_cast(params1.dir) + << ", params2.dir: " << static_cast(params2.dir) + << ", params1.labels.size: " << params1.labels.size() + << ", params2.labels.size: " << params2.labels.size(); + RETURN_UNSUPPORTED_ERROR( + "Unsupported params for edge expand 2d vertex without predicate"); } } // namespace runtime diff --git a/flex/engines/graph_db/runtime/common/operators/edge_expand.h b/flex/engines/graph_db/runtime/common/operators/edge_expand.h index ed1e8bb7feeb..7ad9dfd442fa 100644 --- a/flex/engines/graph_db/runtime/common/operators/edge_expand.h +++ b/flex/engines/graph_db/runtime/common/operators/edge_expand.h @@ -22,6 +22,7 @@ #include "flex/engines/graph_db/runtime/common/columns/edge_columns.h" #include "flex/engines/graph_db/runtime/common/columns/vertex_columns.h" #include "flex/engines/graph_db/runtime/common/context.h" +#include "flex/engines/graph_db/runtime/common/leaf_utils.h" #include "glog/logging.h" @@ -38,9 +39,10 @@ struct EdgeExpandParams { class EdgeExpand { public: template - static Context expand_edge(const ReadTransaction& txn, Context&& ctx, - const EdgeExpandParams& params, - const PRED_T& pred) { + static bl::result expand_edge(const ReadTransaction& txn, + Context&& ctx, + const EdgeExpandParams& params, + const PRED_T& pred) { std::vector shuffle_offset; if (params.labels.size() == 1) { if (params.dir == Direction::kIn) { @@ -124,7 +126,9 @@ class EdgeExpand { ctx.set_with_reshuffle(params.alias, builder.finish(), shuffle_offset); return ctx; } else { - LOG(FATAL) << "expand edge both"; + LOG(ERROR) << "Unsupported direction: " << params.dir; + RETURN_UNSUPPORTED_ERROR("Unsupported direction: " + + std::to_string(params.dir)); } } else { if (params.dir == Direction::kBoth) { @@ -219,19 +223,23 @@ class EdgeExpand { }); ctx.set_with_reshuffle(params.alias, builder.finish(), shuffle_offset); return ctx; + } else { + LOG(ERROR) << "Unsupported direction: " << params.dir; + RETURN_UNSUPPORTED_ERROR("Unsupported direction" + + std::to_string(params.dir)); } } - LOG(FATAL) << "not support"; } - static Context expand_edge_without_predicate(const ReadTransaction& txn, - Context&& ctx, - const EdgeExpandParams& params); + static bl::result expand_edge_without_predicate( + const ReadTransaction& txn, Context&& ctx, + const EdgeExpandParams& params); template - static Context expand_vertex(const ReadTransaction& txn, Context&& ctx, - const EdgeExpandParams& params, - const PRED_T& pred) { + static bl::result expand_vertex(const ReadTransaction& txn, + Context&& ctx, + const EdgeExpandParams& params, + const PRED_T& pred) { std::shared_ptr input_vertex_list = std::dynamic_pointer_cast(ctx.get(params.v_tag)); VertexColumnType input_vertex_list_type = @@ -268,7 +276,8 @@ class EdgeExpand { } if (output_vertex_set.empty()) { - LOG(FATAL) << "output vertex label set is empty..."; + LOG(ERROR) << "No output vertex label found..."; + RETURN_UNSUPPORTED_ERROR("No output vertex label found..."); } std::vector shuffle_offset; @@ -335,13 +344,18 @@ class EdgeExpand { ctx.set_with_reshuffle(params.alias, builder.finish(), shuffle_offset); } else { - LOG(FATAL) << "xxx, " << (int) params.dir; + LOG(ERROR) << "Unsupported direction and label triplet..."; + RETURN_UNSUPPORTED_ERROR( + "Unsupported direction and label triplet..."); } } else { - LOG(FATAL) << "multiple label triplet..."; + LOG(ERROR) << "multiple label triplet..."; + RETURN_UNSUPPORTED_ERROR("multiple label triplet..."); } } else { - LOG(FATAL) << "edge expand vertex input multiple vertex label"; + LOG(ERROR) << "edge expand vertex input multiple vertex label"; + RETURN_UNSUPPORTED_ERROR( + "edge expand vertex input multiple vertex label"); } } else { MLVertexColumnBuilder builder; @@ -352,7 +366,9 @@ class EdgeExpand { label_t input_vertex_label = casted_input_vertex_list->label(); for (label_t output_vertex_label : output_vertex_set) { if (params.dir == Direction::kBoth) { - LOG(FATAL) << "AAAAA"; + LOG(ERROR) << "expand vertex with both direction is not supported"; + RETURN_UNSUPPORTED_ERROR( + "expand vertex with both direction is not supported"); } else if (params.dir == Direction::kIn) { for (auto& triplet : params.labels) { if (triplet.dst_label == input_vertex_label && @@ -375,23 +391,31 @@ class EdgeExpand { } } } else if (params.dir == Direction::kOut) { - LOG(FATAL) << "AAAAA"; + LOG(ERROR) << "expand vertex with out direction is not supported"; + RETURN_UNSUPPORTED_ERROR( + "expand vertex with out direction is not supported"); + } else { + // Must be both + LOG(ERROR) << "Unknow direction"; + RETURN_UNSUPPORTED_ERROR("Unknow direction"); } } ctx.set_with_reshuffle(params.alias, builder.finish(), shuffle_offset); } else { - LOG(FATAL) << "edge expand vertex input multiple vertex label"; + LOG(ERROR) << "edge expand vertex input multiple vertex label"; + RETURN_UNSUPPORTED_ERROR( + "edge expand vertex input multiple vertex label"); } } return ctx; } - static Context expand_vertex_without_predicate( + static bl::result expand_vertex_without_predicate( const ReadTransaction& txn, Context&& ctx, const EdgeExpandParams& params); - static Context expand_2d_vertex_without_predicate( + static bl::result expand_2d_vertex_without_predicate( const ReadTransaction& txn, Context&& ctx, const EdgeExpandParams& params1, const EdgeExpandParams& params2); }; diff --git a/flex/engines/graph_db/runtime/common/operators/get_v.h b/flex/engines/graph_db/runtime/common/operators/get_v.h index 6d9c2f05c6e1..6a509c825631 100644 --- a/flex/engines/graph_db/runtime/common/operators/get_v.h +++ b/flex/engines/graph_db/runtime/common/operators/get_v.h @@ -19,6 +19,7 @@ #include "flex/engines/graph_db/runtime/common/columns/path_columns.h" #include "flex/engines/graph_db/runtime/common/columns/vertex_columns.h" #include "flex/engines/graph_db/runtime/common/context.h" +#include "flex/engines/graph_db/runtime/common/leaf_utils.h" namespace gs { namespace runtime { @@ -30,9 +31,9 @@ struct GetVParams { int alias; }; -std::vector extract_labels(const std::vector& labels, - const std::vector& tables, - VOpt opt) { +bl::result> extract_labels( + const std::vector& labels, const std::vector& tables, + VOpt opt) { std::vector output_labels; for (const auto& label : labels) { if (opt == VOpt::kStart) { @@ -48,7 +49,9 @@ std::vector extract_labels(const std::vector& labels, output_labels.push_back(label.dst_label); } } else { - LOG(FATAL) << "not support" << static_cast(opt); + LOG(ERROR) << "Vopt: " << static_cast(opt) << " not supported"; + RETURN_UNSUPPORTED_ERROR("Vopt not supported: " + + std::to_string(static_cast(opt))); } } return output_labels; @@ -56,9 +59,10 @@ std::vector extract_labels(const std::vector& labels, class GetV { public: template - static Context get_vertex_from_edges(const ReadTransaction& txn, - Context&& ctx, const GetVParams& params, - const PRED_T& pred) { + static bl::result get_vertex_from_edges(const ReadTransaction& txn, + Context&& ctx, + const GetVParams& params, + const PRED_T& pred) { std::vector shuffle_offset; auto col = ctx.get(params.tag); if (col->column_type() == ContextColumnType::kPath) { @@ -97,7 +101,9 @@ class GetV { } else if (opt == VOpt::kEnd) { output_vertex_label = edge_label.dst_label; } else { - LOG(FATAL) << "not support"; + LOG(ERROR) << "Vopt: " << static_cast(opt) << " not supported"; + RETURN_UNSUPPORTED_ERROR("Vopt not supported: " + + std::to_string(static_cast(opt))); } // params tables size may be 0 if (params.tables.size() == 1) { @@ -123,7 +129,9 @@ class GetV { } }); } else { - LOG(FATAL) << "not support"; + LOG(ERROR) << "Vopt: " << static_cast(opt) << " not supported"; + RETURN_UNSUPPORTED_ERROR("Vopt not supported: " + + std::to_string(static_cast(opt))); } ctx.set_with_reshuffle(params.alias, builder.finish(), shuffle_offset); return ctx; @@ -139,8 +147,8 @@ class GetV { } } - auto labels = - extract_labels(input_edge_list.get_labels(), params.tables, opt); + BOOST_LEAF_AUTO(labels, extract_labels(input_edge_list.get_labels(), + params.tables, opt)); if (labels.size() == 0) { MLVertexColumnBuilder builder; ctx.set_with_reshuffle(params.alias, builder.finish(), {}); @@ -171,7 +179,9 @@ class GetV { } }); } else { - LOG(FATAL) << "not support"; + LOG(ERROR) << "Vopt: " << static_cast(opt) << " not supported"; + RETURN_UNSUPPORTED_ERROR("Vopt not supported: " + + std::to_string(static_cast(opt))); } ctx.set_with_reshuffle(params.alias, builder.finish(), shuffle_offset); return ctx; @@ -337,9 +347,11 @@ class GetV { return ctx; } } - - LOG(FATAL) << "not support" << static_cast(column->edge_column_type()); - return ctx; + LOG(ERROR) << "Unsupported edge column type: " + << static_cast(column->edge_column_type()); + RETURN_UNSUPPORTED_ERROR( + "Unsupported edge column type: " + + std::to_string(static_cast(column->edge_column_type()))); } template diff --git a/flex/engines/graph_db/runtime/common/operators/intersect.cc b/flex/engines/graph_db/runtime/common/operators/intersect.cc index ec98818c5359..458fb2a48fbd 100644 --- a/flex/engines/graph_db/runtime/common/operators/intersect.cc +++ b/flex/engines/graph_db/runtime/common/operators/intersect.cc @@ -31,9 +31,9 @@ static void ensure_sorted(std::shared_ptr> idx_col, } } -Context Intersect::intersect(Context&& ctx, - std::vector>&& ctxs, - int alias) { +bl::result Intersect::intersect( + Context&& ctx, std::vector>&& ctxs, + int alias) { std::vector>, std::shared_ptr>> cols; @@ -99,11 +99,13 @@ Context Intersect::intersect(Context&& ctx, } } } - - LOG(FATAL) << "not support"; + LOG(ERROR) << "Currently we only support intersect on two columns"; + RETURN_NOT_IMPLEMENTED_ERROR( + "Currently we only support intersect on two columns"); } -static Context intersect_impl(std::vector&& ctxs, int key) { +static bl::result intersect_impl(std::vector&& ctxs, + int key) { if (ctxs[0].get(key)->column_type() == ContextColumnType::kVertex) { if (ctxs.size() == 2) { auto& vlist0 = @@ -176,11 +178,13 @@ static Context intersect_impl(std::vector&& ctxs, int key) { return ctxs[0]; } } - LOG(FATAL) << "not support"; - return Context(); + LOG(ERROR) << "Currently we only support intersect on vertex columns"; + RETURN_NOT_IMPLEMENTED_ERROR( + "Currently we only support intersect on vertex " + "columns"); } -Context Intersect::intersect(std::vector&& ctxs, int key) { +bl::result Intersect::intersect(std::vector&& ctxs, int key) { return intersect_impl(std::move(ctxs), key); } diff --git a/flex/engines/graph_db/runtime/common/operators/intersect.h b/flex/engines/graph_db/runtime/common/operators/intersect.h index 597aea5ab2bd..7c09e3d00324 100644 --- a/flex/engines/graph_db/runtime/common/operators/intersect.h +++ b/flex/engines/graph_db/runtime/common/operators/intersect.h @@ -20,6 +20,7 @@ #include #include "flex/engines/graph_db/runtime/common/context.h" +#include "flex/engines/graph_db/runtime/common/leaf_utils.h" namespace gs { @@ -27,11 +28,11 @@ namespace runtime { class Intersect { public: - static Context intersect(Context&& ctx, - std::vector>&& ctxs, - int alias); + static bl::result intersect( + Context&& ctx, std::vector>&& ctxs, + int alias); - static Context intersect(std::vector&& ctxs, int key); + static bl::result intersect(std::vector&& ctxs, int key); }; } // namespace runtime diff --git a/flex/engines/graph_db/runtime/common/operators/join.cc b/flex/engines/graph_db/runtime/common/operators/join.cc index 04e9dcd6cc41..c6b46f4c10fd 100644 --- a/flex/engines/graph_db/runtime/common/operators/join.cc +++ b/flex/engines/graph_db/runtime/common/operators/join.cc @@ -16,10 +16,13 @@ #include "flex/engines/graph_db/runtime/common/operators/join.h" #include "flex/engines/graph_db/runtime/common/columns/vertex_columns.h" +#include "flex/engines/graph_db/runtime/common/leaf_utils.h" + namespace gs { namespace runtime { -Context Join::join(Context&& ctx, Context&& ctx2, const JoinParams& params) { +bl::result Join::join(Context&& ctx, Context&& ctx2, + const JoinParams& params) { CHECK(params.left_columns.size() == params.right_columns.size()) << "Join columns size mismatch"; if (params.join_type == JoinKind::kSemiJoin || @@ -176,8 +179,13 @@ Context Join::join(Context&& ctx, Context&& ctx2, const JoinParams& params) { } return ctx; + } else { + LOG(ERROR) << "Unsupported join type: " + << static_cast(params.join_type); + RETURN_NOT_IMPLEMENTED_ERROR( + "Join of type " + std::to_string(static_cast(params.join_type)) + + " is not supported"); } - LOG(FATAL) << "Unsupported join type"; return Context(); } diff --git a/flex/engines/graph_db/runtime/common/operators/join.h b/flex/engines/graph_db/runtime/common/operators/join.h index 1e8360b3ce58..cf6976b60d4c 100644 --- a/flex/engines/graph_db/runtime/common/operators/join.h +++ b/flex/engines/graph_db/runtime/common/operators/join.h @@ -19,6 +19,8 @@ #include #include "flex/engines/graph_db/runtime/common/context.h" +#include "flex/engines/graph_db/runtime/common/leaf_utils.h" + namespace gs { namespace runtime { @@ -30,7 +32,8 @@ struct JoinParams { class Join { public: - static Context join(Context&& ctx, Context&& ctx2, const JoinParams& params); + static bl::result join(Context&& ctx, Context&& ctx2, + const JoinParams& params); }; } // namespace runtime } // namespace gs diff --git a/flex/engines/graph_db/runtime/common/operators/path_expand.cc b/flex/engines/graph_db/runtime/common/operators/path_expand.cc index 2d2237411b66..56d1a4928dda 100644 --- a/flex/engines/graph_db/runtime/common/operators/path_expand.cc +++ b/flex/engines/graph_db/runtime/common/operators/path_expand.cc @@ -19,8 +19,9 @@ namespace gs { namespace runtime { -Context PathExpand::edge_expand_v(const ReadTransaction& txn, Context&& ctx, - const PathExpandParams& params) { +bl::result PathExpand::edge_expand_v(const ReadTransaction& txn, + Context&& ctx, + const PathExpandParams& params) { std::vector shuffle_offset; if (params.labels.size() == 1) { if (params.dir == Direction::kOut) { @@ -85,10 +86,10 @@ Context PathExpand::edge_expand_v(const ReadTransaction& txn, Context&& ctx, CHECK_GE(params.hop_lower, 0); CHECK_GE(params.hop_upper, params.hop_lower); if (params.hop_lower == 0) { - LOG(FATAL) << "xxx"; + RETURN_BAD_REQUEST_ERROR("hop_lower should be greater than 0"); } else { if (params.hop_upper == 1) { - LOG(FATAL) << "xxx"; + RETURN_BAD_REQUEST_ERROR("hop_upper should be greater than 1"); } else { input_vertex_list.foreach_vertex( [&](size_t index, label_t label, vid_t v) { @@ -262,14 +263,17 @@ Context PathExpand::edge_expand_v(const ReadTransaction& txn, Context&& ctx, ctx.set_with_reshuffle_beta(params.alias, builder.finish(), shuffle_offset, params.keep_cols); return ctx; + } else { + LOG(ERROR) << "Not implemented yet"; + RETURN_NOT_IMPLEMENTED_ERROR("Not implemented yet for direction in"); } } - LOG(FATAL) << "not support..."; return ctx; } -Context PathExpand::edge_expand_p(const ReadTransaction& txn, Context&& ctx, - const PathExpandParams& params) { +bl::result PathExpand::edge_expand_p(const ReadTransaction& txn, + Context&& ctx, + const PathExpandParams& params) { std::vector shuffle_offset; auto& input_vertex_list = *std::dynamic_pointer_cast(ctx.get(params.start_tag)); @@ -383,8 +387,10 @@ Context PathExpand::edge_expand_p(const ReadTransaction& txn, Context&& ctx, ctx.set_with_reshuffle_beta(params.alias, builder.finish(), shuffle_offset, params.keep_cols); return ctx; + } else { + LOG(ERROR) << "Not implemented yet"; + RETURN_NOT_IMPLEMENTED_ERROR("Not implemented yet for direction in"); } - LOG(FATAL) << "not support..."; return ctx; } diff --git a/flex/engines/graph_db/runtime/common/operators/path_expand.h b/flex/engines/graph_db/runtime/common/operators/path_expand.h index 8a70326428e4..b89b76cc7019 100644 --- a/flex/engines/graph_db/runtime/common/operators/path_expand.h +++ b/flex/engines/graph_db/runtime/common/operators/path_expand.h @@ -24,6 +24,8 @@ #include "flex/engines/graph_db/runtime/common/context.h" #include "flex/engines/graph_db/runtime/common/types.h" +#include "flex/engines/graph_db/runtime/common/leaf_utils.h" + namespace gs { namespace runtime { @@ -42,15 +44,18 @@ class PathExpand { public: // PathExpand(expandOpt == Vertex && alias == -1 && resultOpt == END_V) + // GetV(opt == END) - static Context edge_expand_v(const ReadTransaction& txn, Context&& ctx, - const PathExpandParams& params); - static Context edge_expand_p(const ReadTransaction& txn, Context&& ctx, - const PathExpandParams& params); + static bl::result edge_expand_v(const ReadTransaction& txn, + Context&& ctx, + const PathExpandParams& params); + static bl::result edge_expand_p(const ReadTransaction& txn, + Context&& ctx, + const PathExpandParams& params); template - static Context edge_expand_v_pred(const ReadTransaction& txn, Context&& ctx, - const PathExpandParams& params, - const PRED_T& pred) { + static bl::result edge_expand_v_pred(const ReadTransaction& txn, + Context&& ctx, + const PathExpandParams& params, + const PRED_T& pred) { std::vector shuffle_offset; if (params.labels.size() == 1 && params.labels[0].src_label == params.labels[0].dst_label) { @@ -148,7 +153,9 @@ class PathExpand { return ctx; } } - LOG(FATAL) << "not support..."; + RETURN_UNSUPPORTED_ERROR( + "Unsupported path expand. Currently only support " + "single edge label expand with src_label = dst_label."); return ctx; } }; diff --git a/flex/engines/graph_db/runtime/common/operators/project.h b/flex/engines/graph_db/runtime/common/operators/project.h index 907f666bd77e..bc6fb0e5fcb3 100644 --- a/flex/engines/graph_db/runtime/common/operators/project.h +++ b/flex/engines/graph_db/runtime/common/operators/project.h @@ -22,6 +22,8 @@ #include "flex/engines/graph_db/runtime/common/columns/i_context_column.h" #include "flex/engines/graph_db/runtime/common/columns/value_columns.h" +#include "flex/engines/graph_db/runtime/common/leaf_utils.h" + namespace gs { namespace runtime { @@ -133,7 +135,7 @@ class Project { } template - static Context map_value_general( + static bl::result map_value_general( const ReadTransaction& txn, Context&& ctx, const std::vector>& expressions, bool is_append) { if (!is_append) { @@ -146,9 +148,8 @@ class Project { } } } - - LOG(FATAL) << "not support"; - return ctx; + RETURN_UNSUPPORTED_ERROR( + "Currently we don't support project with is_append=true"); } }; diff --git a/flex/engines/graph_db/runtime/common/operators/scan.cc b/flex/engines/graph_db/runtime/common/operators/scan.cc index 0ef9da87acd1..9b260008cd71 100644 --- a/flex/engines/graph_db/runtime/common/operators/scan.cc +++ b/flex/engines/graph_db/runtime/common/operators/scan.cc @@ -15,11 +15,14 @@ #include "flex/engines/graph_db/runtime/common/operators/scan.h" +#include "flex/engines/graph_db/runtime/common/leaf_utils.h" + namespace gs { namespace runtime { -Context Scan::find_vertex_with_id(const ReadTransaction& txn, label_t label, - const Any& pk, int alias, bool scan_oid) { +bl::result Scan::find_vertex_with_id(const ReadTransaction& txn, + label_t label, const Any& pk, + int alias, bool scan_oid) { if (scan_oid) { SLVertexColumnBuilder builder(label); vid_t vid; @@ -38,7 +41,9 @@ Context Scan::find_vertex_with_id(const ReadTransaction& txn, label_t label, } else if (pk.type == PropertyType::kInt32) { gid = pk.AsInt32(); } else { - LOG(FATAL) << "Unsupported primary key type"; + LOG(ERROR) << "Unsupported primary key type " << pk.type; + RETURN_UNSUPPORTED_ERROR("Unsupported primary key type" + + pk.type.ToString()); } if (GlobalId::get_label_id(gid) == label) { vid = GlobalId::get_vid(gid); diff --git a/flex/engines/graph_db/runtime/common/operators/scan.h b/flex/engines/graph_db/runtime/common/operators/scan.h index c334a3d0df8f..55ed52f225a7 100644 --- a/flex/engines/graph_db/runtime/common/operators/scan.h +++ b/flex/engines/graph_db/runtime/common/operators/scan.h @@ -19,6 +19,9 @@ #include "flex/engines/graph_db/runtime/common/columns/vertex_columns.h" #include "flex/engines/graph_db/runtime/common/context.h" +#include "boost/leaf.hpp" + +namespace bl = boost::leaf; namespace gs { namespace runtime { @@ -29,9 +32,9 @@ struct ScanParams { class Scan { public: template - static Context scan_vertex(const ReadTransaction& txn, - const ScanParams& params, - const PRED_T& predicate) { + static bl::result scan_vertex(const ReadTransaction& txn, + const ScanParams& params, + const PRED_T& predicate) { Context ctx; if (params.tables.size() == 1) { label_t label = params.tables[0]; @@ -150,8 +153,9 @@ class Scan { return ctx; } - static Context find_vertex_with_id(const ReadTransaction& txn, label_t label, - const Any& pk, int alias, bool scan_oid); + static bl::result find_vertex_with_id(const ReadTransaction& txn, + label_t label, const Any& pk, + int alias, bool scan_oid); }; } // namespace runtime diff --git a/flex/engines/graph_db/runtime/common/types.h b/flex/engines/graph_db/runtime/common/types.h index cd4e576733b6..28f2a5784bff 100644 --- a/flex/engines/graph_db/runtime/common/types.h +++ b/flex/engines/graph_db/runtime/common/types.h @@ -84,4 +84,37 @@ struct LabelTriplet { } // namespace gs +namespace std { + +// operator << for Direction +inline std::ostream& operator<<(std::ostream& os, + const gs::runtime::Direction& dir) { + switch (dir) { + case gs::runtime::Direction::kOut: + os << "OUT"; + break; + case gs::runtime::Direction::kIn: + os << "IN"; + break; + case gs::runtime::Direction::kBoth: + os << "BOTH"; + break; + } + return os; +} + +// std::to_string +inline std::string to_string(const gs::runtime::Direction& dir) { + switch (dir) { + case gs::runtime::Direction::kOut: + return "OUT"; + case gs::runtime::Direction::kIn: + return "IN"; + case gs::runtime::Direction::kBoth: + return "BOTH"; + } + return "UNKNOWN"; +} +} // namespace std + #endif // RUNTIME_COMMON_TYPES_H_ \ No newline at end of file diff --git a/flex/engines/http_server/handler/graph_db_http_handler.cc b/flex/engines/http_server/handler/graph_db_http_handler.cc index 56020f9538b6..2b5391d35683 100644 --- a/flex/engines/http_server/handler/graph_db_http_handler.cc +++ b/flex/engines/http_server/handler/graph_db_http_handler.cc @@ -863,7 +863,7 @@ graph_db_http_handler::graph_db_http_handler(uint16_t http_port, actors_running_(true) { current_graph_query_handlers_.resize(shard_num); all_graph_query_handlers_.resize(shard_num); - adhoc_query_handlers_.resize(shard_num); + adhoc_query_handlers_.resize(shard_num); vertex_handlers_.resize(shard_num); edge_handlers_.resize(shard_num); if (enable_adhoc_handlers_) { diff --git a/flex/engines/http_server/types.h b/flex/engines/http_server/types.h index c05f8711dd0a..681cd9764360 100644 --- a/flex/engines/http_server/types.h +++ b/flex/engines/http_server/types.h @@ -60,7 +60,8 @@ using admin_query_result = payload>; using graph_management_param = payload>; using graph_management_query_param = - payload>>; + payload>>; using procedure_query_param = payload>; using create_procedure_query_param = diff --git a/flex/scripts/install_dependencies.sh b/flex/scripts/install_dependencies.sh index cc298d4463f6..b70039ef6c51 100755 --- a/flex/scripts/install_dependencies.sh +++ b/flex/scripts/install_dependencies.sh @@ -14,11 +14,20 @@ echo "parallelism: $parallelism" sudo apt-get update && sudo apt install -y \ ninja-build ragel libhwloc-dev libnuma-dev libpciaccess-dev vim wget curl \ git g++ libunwind-dev libgoogle-glog-dev cmake libopenmpi-dev default-jdk libcrypto++-dev \ - libboost-all-dev libxml2-dev protobuf-compiler libprotobuf-dev libncurses5-dev libcurl4-openssl-dev + libxml2-dev protobuf-compiler libprotobuf-dev libncurses5-dev libcurl4-openssl-dev sudo apt install -y xfslibs-dev libgnutls28-dev liblz4-dev maven openssl pkg-config \ libsctp-dev gcc make python3 systemtap-sdt-dev libtool libyaml-cpp-dev \ libc-ares-dev stow libfmt-dev diffutils valgrind doxygen python3-pip net-tools graphviz +# install boost +pushd /tmp/ +curl -L -o boost_1_75_0.tar.gz "https://graphscope.oss-cn-beijing.aliyuncs.com/dependencies/boost_1_75_0.tar.gz" +tar -xzf boost_1_75_0.tar.gz +pushd boost_1_75_0 && ./bootstrap.sh --with-libraries=system,filesystem,context,atomic,program_options,regex,thread,chrono,date_time,test # unit_test_framework used by seastar +sudo ./b2 install link=shared runtime-link=shared variant=release threading=multi +popd && sudo rm -rf boost_1_75_0 +rm boost_1_75_0.tar.gz + pushd /tmp git clone https://github.com/alibaba/libgrape-lite.git cd libgrape-lite diff --git a/flex/utils/property/types.cc b/flex/utils/property/types.cc index abf74593d0c4..456524b07e3f 100644 --- a/flex/utils/property/types.cc +++ b/flex/utils/property/types.cc @@ -226,6 +226,53 @@ bool PropertyType::IsVarchar() const { return type_enum == impl::PropertyTypeImpl::kVarChar; } +std::string PropertyType::ToString() const { + switch (type_enum) { + case impl::PropertyTypeImpl::kEmpty: + return "Empty"; + case impl::PropertyTypeImpl::kBool: + return "Bool"; + case impl::PropertyTypeImpl::kUInt8: + return "UInt8"; + case impl::PropertyTypeImpl::kUInt16: + return "UInt16"; + case impl::PropertyTypeImpl::kInt32: + return "Int32"; + case impl::PropertyTypeImpl::kUInt32: + return "UInt32"; + case impl::PropertyTypeImpl::kFloat: + return "Float"; + case impl::PropertyTypeImpl::kInt64: + return "Int64"; + case impl::PropertyTypeImpl::kUInt64: + return "UInt64"; + case impl::PropertyTypeImpl::kDouble: + return "Double"; + case impl::PropertyTypeImpl::kDate: + return "Date"; + case impl::PropertyTypeImpl::kDay: + return "Day"; + case impl::PropertyTypeImpl::kString: + return "String"; + case impl::PropertyTypeImpl::kStringView: + return "StringView"; + case impl::PropertyTypeImpl::kStringMap: + return "StringMap"; + case impl::PropertyTypeImpl::kVertexGlobalId: + return "VertexGlobalId"; + case impl::PropertyTypeImpl::kLabel: + return "Label"; + case impl::PropertyTypeImpl::kRecordView: + return "RecordView"; + case impl::PropertyTypeImpl::kRecord: + return "Record"; + case impl::PropertyTypeImpl::kVarChar: + return "VarChar"; + default: + return "Unknown"; + } +} + /////////////////////////////// Get Type Instance ////////////////////////////////// PropertyType PropertyType::Empty() { @@ -508,7 +555,8 @@ Any ConvertStringToAny(const std::string& value, const gs::PropertyType& type) { return gs::Any(gs::Date(static_cast(std::stoll(value)))); } else if (type == gs::PropertyType::Day()) { return gs::Any(gs::Day(static_cast(std::stoll(value)))); - } else if (type == gs::PropertyType::String() || type == gs::PropertyType::StringMap()) { + } else if (type == gs::PropertyType::String() || + type == gs::PropertyType::StringMap()) { return gs::Any(std::string(value)); } else if (type == gs::PropertyType::Int64()) { return gs::Any(static_cast(std::stoll(value))); diff --git a/flex/utils/property/types.h b/flex/utils/property/types.h index deb37b940650..ad7eb1f36f5a 100644 --- a/flex/utils/property/types.h +++ b/flex/utils/property/types.h @@ -107,6 +107,7 @@ struct PropertyType { } bool IsVarchar() const; + std::string ToString() const; static PropertyType Empty(); static PropertyType Bool(); diff --git a/flex/utils/result.h b/flex/utils/result.h index e04d4c37c250..2ebd56505a37 100644 --- a/flex/utils/result.h +++ b/flex/utils/result.h @@ -148,6 +148,12 @@ struct is_gs_status_type : std::true_type {}; // calling code of a function, the function name, and the variable name. #define FLEX_AUTO(var, expr) ASSIGN_AND_RETURN_IF_NOT_OK(auto var, expr) +// Return boost::leaf::error object with error code and error message, + +#define RETURN_FLEX_LEAF_ERROR(code, msg) \ + return ::boost::leaf::new_error( \ + gs::Status(::gs::flex::interactive::Code::code, msg)) + } // namespace gs namespace std { diff --git a/flex/utils/service_utils.cc b/flex/utils/service_utils.cc index 9421a7e01d19..fb834fbdec41 100644 --- a/flex/utils/service_utils.cc +++ b/flex/utils/service_utils.cc @@ -20,13 +20,6 @@ namespace gs { static unsigned long long lastTotalUser, lastTotalUserLow, lastTotalSys, lastTotalIdle; -FlexException::FlexException(std::string&& error_msg) - : std::exception(), _err_msg(error_msg) {} - -FlexException::~FlexException() {} - -const char* FlexException::what() const noexcept { return _err_msg.c_str(); } - // get current executable's directory std::string get_current_dir() { char buf[1024]; diff --git a/flex/utils/service_utils.h b/flex/utils/service_utils.h index cf969b3845d0..2ee77efa5fc3 100644 --- a/flex/utils/service_utils.h +++ b/flex/utils/service_utils.h @@ -123,17 +123,6 @@ inline std::string jsonToString(const nlohmann::json& json) { } } -class FlexException : public std::exception { - public: - explicit FlexException(std::string&& error_msg); - ~FlexException() override; - - const char* what() const noexcept override; - - private: - std::string _err_msg; -}; - // Get the directory of the current executable std::string get_current_dir();