Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(interactive): Add error handling for adhoc queries #4188

Merged
merged 14 commits into from
Sep 6, 2024
2 changes: 1 addition & 1 deletion .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
{
"name": "GraphScope",
// Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile
"image": "registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:latest",
"image": "registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.24.2-amd64",

// Features to add to the dev container. More info: https://containers.dev/features.
"features": {
Expand Down
15 changes: 4 additions & 11 deletions .github/workflows/interactive.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
runs-on: ubuntu-20.04
if: ${{ github.repository == 'alibaba/GraphScope' }}
container:
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.23.0
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.24.2-amd64
steps:
- uses: actions/checkout@v4

Expand Down Expand Up @@ -121,7 +121,6 @@ jobs:
GS_TEST_DIR: ${{ github.workspace }}/gstest
FLEX_DATA_DIR: ${{ github.workspace }}/flex/interactive/examples/modern_graph
TMP_INTERACTIVE_WORKSPACE: /tmp/temp_workspace
LD_LIBRARY_PATH: /usr/local/lib
run: |
rm -rf ${TMP_INTERACTIVE_WORKSPACE}
cd ${GITHUB_WORKSPACE}/flex/build/
Expand All @@ -137,7 +136,6 @@ jobs:
env:
FLEX_DATA_DIR: ${{ github.workspace }}/flex/interactive/examples/modern_graph
TMP_INTERACTIVE_WORKSPACE: /tmp/temp_workspace
LD_LIBRARY_PATH: /usr/local/lib
run: |
cd ${GITHUB_WORKSPACE}/flex/interactive/sdk/

Expand Down Expand Up @@ -210,7 +208,6 @@ jobs:
TMP_INTERACTIVE_WORKSPACE: /tmp/temp_workspace
PLUGIN_DIR: /tmp/temp_workspace/data/modern_graph/plugins
FLEX_DATA_DIR: ${{ github.workspace }}/flex/interactive/examples/modern_graph
LD_LIBRARY_PATH: /usr/local/lib
run: |
rm -rf ${TMP_INTERACTIVE_WORKSPACE}
cd ${GITHUB_WORKSPACE}/flex/build/
Expand Down Expand Up @@ -271,7 +268,6 @@ jobs:
GS_TEST_DIR: ${{ github.workspace }}/gstest
HOME : /home/graphscope/
INTERACTIVE_WORKSPACE: /tmp/interactive_workspace
LD_LIBRARY_PATH: /usr/local/lib
run: |
cd ${GITHUB_WORKSPACE}/flex/tests/hqps/
export ENGINE_TYPE=hiactor
Expand All @@ -285,7 +281,6 @@ jobs:
GS_TEST_DIR: ${{ github.workspace }}/gstest
HOME : /home/graphscope/
INTERACTIVE_WORKSPACE: /tmp/interactive_workspace
LD_LIBRARY_PATH: /usr/local/lib
run: |
cd ${GITHUB_WORKSPACE}/flex/tests/hqps/
export ENGINE_TYPE=hiactor
Expand All @@ -299,7 +294,6 @@ jobs:
GS_TEST_DIR: ${{ github.workspace }}/gstest
HOME : /home/graphscope/
INTERACTIVE_WORKSPACE: /tmp/interactive_workspace
LD_LIBRARY_PATH: /usr/local/lib
run: |
cd ${GITHUB_WORKSPACE}/flex/tests/hqps/
export ENGINE_TYPE=hiactor
Expand All @@ -313,7 +307,6 @@ jobs:
GS_TEST_DIR: ${{ github.workspace }}/gstest
HOME : /home/graphscope/
INTERACTIVE_WORKSPACE: /tmp/interactive_workspace
LD_LIBRARY_PATH: /usr/local/lib
run: |
cd ${GITHUB_WORKSPACE}/flex/tests/hqps/
export ENGINE_TYPE=hiactor
Expand Down Expand Up @@ -353,7 +346,7 @@ jobs:
test-cmake-options:
runs-on: ubuntu-20.04
container:
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.23.0
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.24.2-amd64
strategy:
matrix:
BUILD_TEST: [ON, OFF]
Expand All @@ -373,7 +366,7 @@ jobs:
test-AOCC-compilation:
runs-on: ubuntu-20.04
container:
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.23.0
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.24.2-amd64
steps:
- uses: actions/checkout@v4

Expand Down Expand Up @@ -402,7 +395,7 @@ jobs:
runs-on: ubuntu-20.04
if: ${{ github.repository == 'alibaba/GraphScope' }}
container:
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.23.0
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.24.2-amd64
steps:
- uses: actions/checkout@v4

Expand Down
1 change: 1 addition & 0 deletions flex/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ find_package(Boost REQUIRED COMPONENTS system filesystem
# required by folly
context program_options regex thread date_time)
add_definitions("-DBOOST_BIND_GLOBAL_PLACEHOLDERS")
include_directories(SYSTEM ${Boost_INCLUDE_DIRS})

#find hiactor----------------------------------------------------------------------
find_package(Hiactor)
Expand Down
33 changes: 30 additions & 3 deletions flex/bin/adhoc_runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "flex/engines/graph_db/runtime/adhoc/runtime.h"

namespace bpo = boost::program_options;
namespace bl = boost::leaf;

std::string read_pb(const std::string& filename) {
std::ifstream file(filename, std::ios::binary);
Expand Down Expand Up @@ -74,6 +75,32 @@ void load_params(const std::string& filename,
}
}

gs::runtime::Context eval_plan(
const physical::PhysicalPlan& plan, gs::ReadTransaction& txn,
const std::map<std::string, std::string>& params) {
gs::runtime::Context ctx;
{
ctx = bl::try_handle_all(
[&plan, &txn, &params]() {
return gs::runtime::runtime_eval(plan, txn, params);
},
[&ctx](const gs::Status& err) {
LOG(FATAL) << "Error in execution: " << err.error_message();
return ctx;
},
[&](const bl::error_info& err) {
LOG(FATAL) << "boost leaf error: " << err.error().value() << ", "
<< err.exception()->what();
return ctx;
},
[&]() {
LOG(FATAL) << "Unknown error in execution";
return ctx;
});
}
return ctx;
}

int main(int argc, char** argv) {
bpo::options_description desc("Usage:");
desc.add_options()("help", "Display help message")(
Expand Down Expand Up @@ -158,7 +185,7 @@ int main(int argc, char** argv) {
double t1 = -grape::GetCurrentTime();
for (int i = 0; i < query_num; ++i) {
auto& m = map[i % params_num];
auto ctx = gs::runtime::runtime_eval(pb, txn, m);
auto ctx = eval_plan(pb, txn, m);
gs::Encoder output(outputs[i]);
gs::runtime::eval_sink(ctx, txn, output);
}
Expand All @@ -167,7 +194,7 @@ int main(int argc, char** argv) {
double t2 = -grape::GetCurrentTime();
for (int i = 0; i < query_num; ++i) {
auto& m = map[i % params_num];
auto ctx = gs::runtime::runtime_eval(pb, txn, m);
auto ctx = eval_plan(pb, txn, m);
outputs[i].clear();
gs::Encoder output(outputs[i]);
gs::runtime::eval_sink(ctx, txn, output);
Expand All @@ -177,7 +204,7 @@ int main(int argc, char** argv) {
double t3 = -grape::GetCurrentTime();
for (int i = 0; i < query_num; ++i) {
auto& m = map[i % params_num];
auto ctx = gs::runtime::runtime_eval(pb, txn, m);
auto ctx = eval_plan(pb, txn, m);
outputs[i].clear();
gs::Encoder output(outputs[i]);
gs::runtime::eval_sink(ctx, txn, output);
Expand Down
51 changes: 48 additions & 3 deletions flex/engines/graph_db/app/adhoc_app.cc
Original file line number Diff line number Diff line change
@@ -1,10 +1,27 @@
#include "flex/engines/graph_db/app/adhoc_app.h"
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "flex/engines/graph_db/app/adhoc_app.h"
#include "flex/engines/graph_db/runtime/adhoc/operators/operators.h"
#include "flex/engines/graph_db/runtime/adhoc/runtime.h"

#include "flex/proto_generated_gie/physical.pb.h"

#include <string>

namespace bl = boost::leaf;

namespace gs {

bool AdhocReadApp::Query(const GraphDBSession& graph, Decoder& input,
Expand All @@ -20,7 +37,35 @@ bool AdhocReadApp::Query(const GraphDBSession& graph, Decoder& input,

LOG(INFO) << "plan: " << plan.DebugString();

auto ctx = runtime::runtime_eval(plan, txn, {});
gs::runtime::Context ctx;
gs::Status status = gs::Status::OK();
{
ctx = bl::try_handle_all(
[&plan, &txn]() { return runtime::runtime_eval(plan, txn, {}); },
[&ctx, &status](const gs::Status& err) {
status = err;
return ctx;
},
[&](const bl::error_info& err) {
status = gs::Status(
gs::StatusCode::INTERNAL_ERROR,
"BOOST LEAF Error: " + std::to_string(err.error().value()) +
", Exception: " + err.exception()->what());
return ctx;
},
[&]() {
status = gs::Status(gs::StatusCode::UNKNOWN, "Unknown error");
return ctx;
});
}

if (!status.ok()) {
LOG(ERROR) << "Error: " << status.ToString();
// We encode the error message to the output, so that the client can
// get the error message.
output.put_string(status.ToString());
return false;
}

runtime::eval_sink(ctx, txn, output);

Expand Down
18 changes: 14 additions & 4 deletions flex/engines/graph_db/database/graph_db_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,20 @@ Result<std::vector<char>> GraphDBSession::Eval(const std::string& input) {
std::chrono::duration_cast<std::chrono::microseconds>(end - start)
.count());
++query_num_;
return Result<std::vector<char>>(
StatusCode::QUERY_FAILED,
"Query failed for procedure id:" + std::to_string((int) type),
result_buffer);
// When query failed, we assume the user may put the error message in the
// output buffer.
// For example, for adhoc_app.cc, if the query failed, the error info will
// be put in the output buffer.
if (result_buffer.size() > 0) {
return Result<std::vector<char>>(
StatusCode::QUERY_FAILED,
std::string{result_buffer.data(), result_buffer.size()}, result_buffer);
} else {
return Result<std::vector<char>>(
StatusCode::QUERY_FAILED,
"Query failed for procedure id:" + std::to_string((int) type),
result_buffer);
}
}

void GraphDBSession::GetAppInfo(Encoder& result) { db_.GetAppInfo(result); }
Expand Down
1 change: 1 addition & 0 deletions flex/engines/graph_db/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ install_flex_target(runtime_common)
file(GLOB_RECURSE ADHOC_SOURCES "adhoc/*.cc")
add_library(runtime_adhoc SHARED ${ADHOC_SOURCES})
target_link_libraries(runtime_adhoc runtime_common)
target_link_libraries(runtime_adhoc Boost::headers)
install_flex_target(runtime_adhoc)


4 changes: 2 additions & 2 deletions flex/engines/graph_db/runtime/adhoc/operators/dedup.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ namespace gs {

namespace runtime {

Context eval_dedup(const algebra::Dedup& opr, const ReadTransaction& txn,
Context&& ctx) {
bl::result<Context> eval_dedup(const algebra::Dedup& opr,
const ReadTransaction& txn, Context&& ctx) {
std::vector<size_t> keys;
std::vector<std::function<RTAny(size_t)>> vars;
int keys_num = opr.keys_size();
Expand Down
19 changes: 13 additions & 6 deletions flex/engines/graph_db/runtime/adhoc/operators/edge_expand.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ namespace gs {

namespace runtime {

Context eval_edge_expand(const physical::EdgeExpand& opr,
const ReadTransaction& txn, Context&& ctx,
const std::map<std::string, std::string>& params,
const physical::PhysicalOpr_MetaData& meta) {
bl::result<Context> eval_edge_expand(
const physical::EdgeExpand& opr, const ReadTransaction& txn, Context&& ctx,
const std::map<std::string, std::string>& params,
const physical::PhysicalOpr_MetaData& meta) {
int v_tag;
if (!opr.has_v_tag()) {
v_tag = -1;
Expand All @@ -49,7 +49,9 @@ Context eval_edge_expand(const physical::EdgeExpand& opr,
if (opr.expand_opt() ==
physical::EdgeExpand_ExpandOpt::EdgeExpand_ExpandOpt_VERTEX) {
if (query_params.has_predicate()) {
LOG(FATAL) << "not support";
LOG(ERROR) << "edge expand vertex with predicate is not supported";
RETURN_UNSUPPORTED_ERROR(
"edge expand vertex with predicate is not supported");
} else {
EdgeExpandParams eep;
eep.v_tag = v_tag;
Expand Down Expand Up @@ -82,7 +84,12 @@ Context eval_edge_expand(const physical::EdgeExpand& opr,
eep);
}
} else {
LOG(FATAL) << "not support";
LOG(ERROR) << "EdgeExpand with expand_opt: " << opr.expand_opt()
<< " is "
"not supported";
RETURN_UNSUPPORTED_ERROR(
"EdgeExpand with expand_opt is not supported: " +
std::to_string(static_cast<int>(opr.expand_opt())));
}
return ctx;
}
Expand Down
9 changes: 5 additions & 4 deletions flex/engines/graph_db/runtime/adhoc/operators/get_v.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ VOpt parse_opt(const physical::GetV_VOpt& opt) {
}
}

Context eval_get_v(const physical::GetV& opr, const ReadTransaction& txn,
Context&& ctx,
const std::map<std::string, std::string>& params) {
bl::result<Context> eval_get_v(
const physical::GetV& opr, const ReadTransaction& txn, Context&& ctx,
const std::map<std::string, std::string>& params) {
int tag = -1;
if (opr.has_tag()) {
tag = opr.tag().value();
Expand Down Expand Up @@ -76,7 +76,8 @@ Context eval_get_v(const physical::GetV& opr, const ReadTransaction& txn,
}
}

LOG(FATAL) << "not support";
LOG(ERROR) << "Unsupported GetV operation: " << opr.DebugString();
RETURN_UNSUPPORTED_ERROR("Unsupported GetV operation: " + opr.DebugString());
return ctx;
}

Expand Down
Loading
Loading