From ba952b067d3c81f62ac734b219eb3f5fed529cc3 Mon Sep 17 00:00:00 2001 From: Zhang Lei Date: Thu, 19 Dec 2024 11:41:04 +0800 Subject: [PATCH 1/2] feat(interactive): Make use of all available cpus at startup (#4343) At startup, try to make use of all available cpus on hosts. Also fix a deep copy bug in `test_robustness.py`. The CI failures are due to rust version, and will be fixed in PR: https://github.com/alibaba/GraphScope/pull/4373 --- .github/workflows/pr-check.yml | 6 ++- .../graph_db/runtime/adhoc/expr_impl.h | 28 ++++++---- .../handler/graph_db_http_handler.cc | 2 +- .../handler/graph_db_http_handler.h | 2 +- .../python/gs_interactive/client/status.py | 3 ++ .../python/gs_interactive/tests/conftest.py | 31 +++++++++-- .../gs_interactive/tests/test_robustness.py | 42 ++++++++++++++- flex/interactive/sdk/python/setup.cfg | 2 +- .../metadata/local_file_metadata_store.cc | 53 +++++++++++-------- .../metadata/local_file_metadata_store.h | 8 ++- flex/tests/hqps/hqps_robust_test.sh | 2 +- .../graphscope/common/ir/meta/GraphId.java | 5 ++ k8s/dockerfiles/interactive-entrypoint.sh | 3 ++ 13 files changed, 142 insertions(+), 45 deletions(-) diff --git a/.github/workflows/pr-check.yml b/.github/workflows/pr-check.yml index 7f59f2182263..a66be7b6d96c 100644 --- a/.github/workflows/pr-check.yml +++ b/.github/workflows/pr-check.yml @@ -213,11 +213,15 @@ jobs: python3 -m black --check --diff . python3 -m flake8 . popd - pushd flex/interactive/sdk/python + # we need to generate the code first + pushd flex/interactive/sdk + bash generate_sdk.sh -g python + pushd python python3 -m isort --check --diff . python3 -m black --check --diff . python3 -m flake8 . popd + popd - name: Generate Docs shell: bash diff --git a/flex/engines/graph_db/runtime/adhoc/expr_impl.h b/flex/engines/graph_db/runtime/adhoc/expr_impl.h index 20a764de0696..91046e31e19c 100644 --- a/flex/engines/graph_db/runtime/adhoc/expr_impl.h +++ b/flex/engines/graph_db/runtime/adhoc/expr_impl.h @@ -85,17 +85,23 @@ class WithInExpr : public ExprBase { WithInExpr(const ReadTransaction& txn, const Context& ctx, std::unique_ptr&& key, const common::Value& array) : key_(std::move(key)) { - if constexpr (std::is_same_v) { - CHECK(array.item_case() == common::Value::kI64Array); - size_t len = array.i64_array().item_size(); - for (size_t idx = 0; idx < len; ++idx) { - container_.push_back(array.i64_array().item(idx)); - } - } else if constexpr (std::is_same_v) { - CHECK(array.item_case() == common::Value::kI32Array); - size_t len = array.i32_array().item_size(); - for (size_t idx = 0; idx < len; ++idx) { - container_.push_back(array.i32_array().item(idx)); + if constexpr ((std::is_same_v) || + (std::is_same_v) ) { + // Implicitly convert to T + if (array.item_case() == common::Value::kI64Array) { + size_t len = array.i64_array().item_size(); + for (size_t idx = 0; idx < len; ++idx) { + container_.push_back(array.i64_array().item(idx)); + } + } else if (array.item_case() == common::Value::kI32Array) { + size_t len = array.i32_array().item_size(); + for (size_t idx = 0; idx < len; ++idx) { + container_.push_back(array.i32_array().item(idx)); + } + } else { + LOG(FATAL) << "Fail to construct WithInExpr of type " + << typeid(T).name() << " with array of type " + << array.item_case(); } } else if constexpr (std::is_same_v) { CHECK(array.item_case() == common::Value::kStrArray); diff --git a/flex/engines/http_server/handler/graph_db_http_handler.cc b/flex/engines/http_server/handler/graph_db_http_handler.cc index 319920459058..4ff8c9e540ff 100644 --- a/flex/engines/http_server/handler/graph_db_http_handler.cc +++ b/flex/engines/http_server/handler/graph_db_http_handler.cc @@ -176,7 +176,7 @@ class stored_proc_handler : public StoppableHandler { bool start() override { if (get_executors()[StoppableHandler::shard_id()].size() > 0) { - LOG(ERROR) << "The actors have been already created!"; + VLOG(10) << "The actors have been already created!"; return false; } return StoppableHandler::start_scope( diff --git a/flex/engines/http_server/handler/graph_db_http_handler.h b/flex/engines/http_server/handler/graph_db_http_handler.h index 6bc5c906910e..22090e66dc41 100644 --- a/flex/engines/http_server/handler/graph_db_http_handler.h +++ b/flex/engines/http_server/handler/graph_db_http_handler.h @@ -69,7 +69,7 @@ class StoppableHandler : public seastar::httpd::handler_base { } catch (const std::exception& e) { // In case the scope is already cancelled, we should ignore the // exception. - LOG(INFO) << "Failed to cancel IC scope: " << e.what(); + VLOG(1) << "Failed to cancel IC scope: " << e.what(); } func(); return seastar::make_ready_future<>(); diff --git a/flex/interactive/sdk/python/gs_interactive/client/status.py b/flex/interactive/sdk/python/gs_interactive/client/status.py index 13af0e0ed660..21564752ecdf 100644 --- a/flex/interactive/sdk/python/gs_interactive/client/status.py +++ b/flex/interactive/sdk/python/gs_interactive/client/status.py @@ -23,6 +23,7 @@ from gs_interactive.exceptions import NotFoundException from gs_interactive.exceptions import ServiceException from urllib3.exceptions import MaxRetryError +from urllib3.exceptions import ProtocolError from gs_interactive.client.generated.interactive_pb2 import Code as StatusCode from gs_interactive.models.api_response_with_code import APIResponseWithCode @@ -108,6 +109,8 @@ def from_exception(exception: ApiException): return Status(StatusCode.INTERNAL_ERROR, exception.body) elif isinstance(exception, MaxRetryError): return Status(StatusCode.INTERNAL_ERROR, exception) + elif isinstance(exception, ProtocolError): + return Status(StatusCode.INTERNAL_ERROR, exception) return Status( StatusCode.UNKNOWN, "Unknown Error from exception " + exception.body ) diff --git a/flex/interactive/sdk/python/gs_interactive/tests/conftest.py b/flex/interactive/sdk/python/gs_interactive/tests/conftest.py index 94f0b34a25f2..3617115b4740 100644 --- a/flex/interactive/sdk/python/gs_interactive/tests/conftest.py +++ b/flex/interactive/sdk/python/gs_interactive/tests/conftest.py @@ -17,6 +17,7 @@ # # get the directory of the current file +import copy import os import time @@ -27,6 +28,7 @@ from gs_interactive.client.session import Session from gs_interactive.models import CreateGraphRequest from gs_interactive.models import CreateProcedureRequest +from gs_interactive.models import GetGraphSchemaResponse from gs_interactive.models import SchemaMapping from gs_interactive.models import StartServiceRequest from gs_interactive.models import UpdateProcedureRequest @@ -39,7 +41,7 @@ modern_graph_full = { - "name": "modern_graph", + "name": "full_graph", "description": "This is a test graph", "schema": { "vertex_types": [ @@ -120,7 +122,7 @@ } modern_graph_vertex_only = { - "name": "modern_graph", + "name": "vertex_only", "description": "This is a test graph, only contains vertex", "schema": { "vertex_types": [ @@ -148,7 +150,7 @@ } modern_graph_partial = { - "name": "modern_graph", + "name": "partial_graph", "description": "This is a test graph", "schema": { "vertex_types": [ @@ -336,7 +338,7 @@ def create_partial_modern_graph(interactive_session): @pytest.fixture(scope="function") def create_graph_with_custom_pk_name(interactive_session): - modern_graph_custom_pk_name = modern_graph_full.copy() + modern_graph_custom_pk_name = copy.deepcopy(modern_graph_full) for vertex_type in modern_graph_custom_pk_name["schema"]["vertex_types"]: vertex_type["properties"][0]["property_name"] = "custom_id" vertex_type["primary_keys"] = ["custom_id"] @@ -479,3 +481,24 @@ def start_service_on_graph(interactive_session, graph_id: str): assert resp.is_ok() # wait three second to let compiler get the new graph time.sleep(3) + + +def ensure_compiler_schema_ready( + interactive_session, neo4j_session: Neo4jSession, graph_id: str +): + rel_graph_meta = interactive_session.get_graph_schema(graph_id).get_value() + max_times = 10 + while True: + if max_times == 0: + raise Exception("compiler schema is not ready") + res = neo4j_session.run("CALL gs.procedure.meta.schema();") + val = res.single().value() + compiler_graph_schema = GetGraphSchemaResponse.from_json(val) + # print("compiler_graph_schema: ", compiler_graph_schema) + # print("rel_graph_meta: ", rel_graph_meta) + if compiler_graph_schema == rel_graph_meta: + break + print("compiler schema is not ready, wait for 1 second") + time.sleep(1) + max_times -= 1 + print("compiler schema is ready") diff --git a/flex/interactive/sdk/python/gs_interactive/tests/test_robustness.py b/flex/interactive/sdk/python/gs_interactive/tests/test_robustness.py index a144a1e07948..2fb50d138f4e 100644 --- a/flex/interactive/sdk/python/gs_interactive/tests/test_robustness.py +++ b/flex/interactive/sdk/python/gs_interactive/tests/test_robustness.py @@ -27,6 +27,7 @@ from gs_interactive.tests.conftest import call_procedure # noqa: E402 from gs_interactive.tests.conftest import create_procedure from gs_interactive.tests.conftest import delete_procedure +from gs_interactive.tests.conftest import ensure_compiler_schema_ready from gs_interactive.tests.conftest import import_data_to_full_modern_graph from gs_interactive.tests.conftest import import_data_to_partial_modern_graph from gs_interactive.tests.conftest import import_data_to_vertex_only_modern_graph @@ -61,6 +62,9 @@ def test_query_on_vertex_only_graph( """ print("[Query on vertex only graph]") start_service_on_graph(interactive_session, create_vertex_only_modern_graph) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_vertex_only_modern_graph + ) run_cypher_test_suite( neo4j_session, create_vertex_only_modern_graph, vertex_only_cypher_queries ) @@ -69,6 +73,10 @@ def test_query_on_vertex_only_graph( import_data_to_vertex_only_modern_graph( interactive_session, create_vertex_only_modern_graph ) + start_service_on_graph(interactive_session, create_vertex_only_modern_graph) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_vertex_only_modern_graph + ) run_cypher_test_suite( neo4j_session, create_vertex_only_modern_graph, vertex_only_cypher_queries ) @@ -83,12 +91,19 @@ def test_query_on_partial_graph( print("[Query on partial graph]") # start service on new graph start_service_on_graph(interactive_session, create_partial_modern_graph) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_partial_modern_graph + ) # try to query on the graph run_cypher_test_suite(neo4j_session, create_partial_modern_graph, cypher_queries) start_service_on_graph(interactive_session, "1") import_data_to_partial_modern_graph( interactive_session, create_partial_modern_graph ) + start_service_on_graph(interactive_session, create_partial_modern_graph) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_partial_modern_graph + ) run_cypher_test_suite(neo4j_session, create_partial_modern_graph, cypher_queries) @@ -100,10 +115,17 @@ def test_query_on_full_modern_graph( """ print("[Query on full modern graph]") start_service_on_graph(interactive_session, create_modern_graph) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_modern_graph + ) # try to query on the graph run_cypher_test_suite(neo4j_session, create_modern_graph, cypher_queries) start_service_on_graph(interactive_session, "1") import_data_to_full_modern_graph(interactive_session, create_modern_graph) + start_service_on_graph(interactive_session, create_modern_graph) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_modern_graph + ) run_cypher_test_suite(neo4j_session, create_modern_graph, cypher_queries) @@ -129,6 +151,9 @@ def test_service_switching( ) print("Procedure id: ", a_proc_id) start_service_on_graph(interactive_session, create_modern_graph) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_modern_graph + ) call_procedure(neo4j_session, create_modern_graph, a_proc_id) # create procedure on graph_b_id @@ -139,6 +164,9 @@ def test_service_switching( "MATCH(n: person) return count(n);", ) start_service_on_graph(interactive_session, create_vertex_only_modern_graph) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_vertex_only_modern_graph + ) call_procedure(neo4j_session, create_vertex_only_modern_graph, b_proc_id) @@ -156,6 +184,9 @@ def test_procedure_creation(interactive_session, neo4j_session, create_modern_gr ) print("Procedure id: ", a_proc_id) start_service_on_graph(interactive_session, create_modern_graph) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_modern_graph + ) call_procedure(neo4j_session, create_modern_graph, a_proc_id) # create procedure with name containing space, @@ -202,6 +233,9 @@ def test_builtin_procedure(interactive_session, neo4j_session, create_modern_gra ) # Call the builtin procedure start_service_on_graph(interactive_session, create_modern_graph) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_modern_graph + ) call_procedure( neo4j_session, create_modern_graph, @@ -259,6 +293,10 @@ def test_list_jobs(interactive_session, create_vertex_only_modern_graph): def test_call_proc_in_cypher(interactive_session, neo4j_session, create_modern_graph): print("[Test call procedure in cypher]") import_data_to_full_modern_graph(interactive_session, create_modern_graph) + start_service_on_graph(interactive_session, create_modern_graph) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_modern_graph + ) result = neo4j_session.run( 'MATCH(p: person) with p.id as oid CALL k_neighbors("person", oid, 1) return label_name, vertex_oid;' ) @@ -276,6 +314,9 @@ def test_custom_pk_name( interactive_session, create_graph_with_custom_pk_name ) start_service_on_graph(interactive_session, create_graph_with_custom_pk_name) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_graph_with_custom_pk_name + ) result = neo4j_session.run( "MATCH (n: person) where n.custom_id = 4 return n.custom_id;" ) @@ -289,4 +330,3 @@ def test_custom_pk_name( ) records = result.fetch(1) assert len(records) == 1 and records[0]["$f0"] == 2 - start_service_on_graph(interactive_session, "1") diff --git a/flex/interactive/sdk/python/setup.cfg b/flex/interactive/sdk/python/setup.cfg index 1e1839250158..255781291114 100644 --- a/flex/interactive/sdk/python/setup.cfg +++ b/flex/interactive/sdk/python/setup.cfg @@ -3,7 +3,7 @@ profile = black ensure_newline_before_comments = True line_length = 88 force_single_line = True -skip = build/,dist/,gs_interactive/api/,gs_interactive/api_response.py,gs_interactive/configuration.py,gs_interactive/exceptions.py,gs_interactive/models/,gs_interactiverest.py, +skip = build/,dist/,gs_interactive/api/,gs_interactive/api_response.py,gs_interactive/configuration.py,gs_interactive/exceptions.py,gs_interactive/models/,gs_interactiverest.py,gs_interactive/api_client.py,gs_interactive/__init__.py,gs_interactive/rest.py skip_glob = *_pb2.py,*_pb2_grpc.py,build/* [flake8] diff --git a/flex/storages/metadata/local_file_metadata_store.cc b/flex/storages/metadata/local_file_metadata_store.cc index 5f77d0cc2e7a..bcff4bf4ed38 100644 --- a/flex/storages/metadata/local_file_metadata_store.cc +++ b/flex/storages/metadata/local_file_metadata_store.cc @@ -181,8 +181,8 @@ Result LocalFileMetadataStore::UpdateMeta(const meta_kind_t& meta_kind, Result LocalFileMetadataStore::get_next_meta_key( - const LocalFileMetadataStore::meta_kind_t& meta_kind) const { - return std::to_string(get_max_id(meta_kind) + 1); + const LocalFileMetadataStore::meta_kind_t& meta_kind) { + return std::to_string(increase_and_get_id(meta_kind)); } std::string LocalFileMetadataStore::get_root_meta_dir() const { @@ -208,29 +208,38 @@ std::string LocalFileMetadataStore::get_meta_file(const meta_kind_t& meta_kind, return ret; } -int32_t LocalFileMetadataStore::get_max_id(const meta_kind_t& meta_kind) const { - // iterate all files in the directory, get the max id. - int max_id_ = 0; +// Guarded by meta_mutex_ outside. +int32_t LocalFileMetadataStore::increase_and_get_id( + const meta_kind_t& meta_kind) { auto dir = get_meta_kind_dir(meta_kind); - for (auto& p : std::filesystem::directory_iterator(dir)) { - if (std::filesystem::is_directory(p)) { - continue; - } - auto file_name = p.path().filename().string(); - if (file_name.find(META_FILE_PREFIX) != std::string::npos) { - auto id_str = file_name.substr(strlen(META_FILE_PREFIX)); - int32_t id; - try { - id = std::stoi(id_str); - } catch (std::invalid_argument& e) { - LOG(ERROR) << "Invalid id: " << id_str; - continue; - } - if (id > max_id_) { - max_id_ = id; - } + int max_id_ = 0; + // In the directory, we expect a file with name CUR_ID_FILE_NAME. + // If the file does not exist, we will create one with content "0". + auto cur_id_file = dir + "/" + CUR_ID_FILE_NAME; + if (!std::filesystem::exists(cur_id_file)) { + std::ofstream out_file(cur_id_file); + if (!out_file.is_open()) { + LOG(ERROR) << "Failed to create file: " << cur_id_file; + return -1; } + out_file << "0"; + out_file.close(); + } + std::ifstream in_file(cur_id_file); + if (!in_file.is_open()) { + LOG(ERROR) << "Failed to open file: " << cur_id_file; + return -1; } + in_file >> max_id_; + in_file.close(); + max_id_++; + std::ofstream out_file(cur_id_file); + if (!out_file.is_open()) { + LOG(ERROR) << "Failed to open file: " << cur_id_file; + return -1; + } + out_file << max_id_; + out_file.close(); return max_id_; } diff --git a/flex/storages/metadata/local_file_metadata_store.h b/flex/storages/metadata/local_file_metadata_store.h index 186aa68e7efd..bada3778d535 100644 --- a/flex/storages/metadata/local_file_metadata_store.h +++ b/flex/storages/metadata/local_file_metadata_store.h @@ -48,6 +48,7 @@ class LocalFileMetadataStore : public IMetaStore { static constexpr const char* METADATA_DIR = "METADATA"; static constexpr const char* META_FILE_PREFIX = "META_"; + static constexpr const char* CUR_ID_FILE_NAME = "CUR_ID"; LocalFileMetadataStore(const std::string& path); @@ -110,12 +111,15 @@ class LocalFileMetadataStore : public IMetaStore { update_func_t update_func) override; private: - Result get_next_meta_key(const meta_kind_t& meta_kind) const; + Result get_next_meta_key(const meta_kind_t& meta_kind); std::string get_root_meta_dir() const; std::string get_meta_kind_dir(const meta_kind_t& meta_kind) const; std::string get_meta_file(const meta_kind_t& meta_kind, const meta_key_t& meta_key) const; - int32_t get_max_id(const meta_kind_t& meta_kind) const; + /** + * For the specified meta_kind, increase the id and return the new id. + */ + int32_t increase_and_get_id(const meta_kind_t& meta_kind); bool is_key_exist(const meta_kind_t& meta_kind, const meta_key_t& meta_key) const; diff --git a/flex/tests/hqps/hqps_robust_test.sh b/flex/tests/hqps/hqps_robust_test.sh index 8090d8be8164..af21a57459ac 100644 --- a/flex/tests/hqps/hqps_robust_test.sh +++ b/flex/tests/hqps/hqps_robust_test.sh @@ -82,7 +82,7 @@ start_engine_service(){ fi cmd="${SERVER_BIN} -c ${config_path} --enable-admin-service true " - cmd="${cmd} -w ${INTERACTIVE_WORKSPACE} --start-compiler true &" + cmd="${cmd} -w ${INTERACTIVE_WORKSPACE} --start-compiler true > /tmp/engine.log 2>&1 & " echo "Start engine service with command: ${cmd}" eval ${cmd} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/GraphId.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/GraphId.java index 8ddf0d75a511..4c63425263cd 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/GraphId.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/GraphId.java @@ -51,4 +51,9 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hashCode(id); } + + @Override + public String toString() { + return "GraphId{" + "id=" + id + '}'; + } } diff --git a/k8s/dockerfiles/interactive-entrypoint.sh b/k8s/dockerfiles/interactive-entrypoint.sh index 009323f6c31f..76715ec609d6 100644 --- a/k8s/dockerfiles/interactive-entrypoint.sh +++ b/k8s/dockerfiles/interactive-entrypoint.sh @@ -57,6 +57,9 @@ function prepare_workspace() { cp /opt/flex/share/interactive_config.yaml $engine_config_path #make sure the line which start with default_graph is changed to default_graph: ${DEFAULT_GRAPH_NAME} sed -i "s/default_graph:.*/default_graph: ${DEFAULT_GRAPH_NAME}/" $engine_config_path + # By default, we occupy the all available cpus + cpus=$(grep -c ^processor /proc/cpuinfo) + sed -i "s/thread_num_per_worker:.*/thread_num_per_worker: ${cpus}/" $engine_config_path echo "Using default graph: ${DEFAULT_GRAPH_NAME} to start the service" # copy the builtin graph From ce0ae5e2ac11e95358e0a2762891f7c15c180ffa Mon Sep 17 00:00:00 2001 From: Zhang Lei Date: Fri, 20 Dec 2024 13:58:45 +0800 Subject: [PATCH 2/2] fix(interactive): Fix yaml node parsing (#4377) As titled. --- .../sdk/python/gs_interactive/client/result.py | 3 +-- .../sdk/python/gs_interactive/client/session.py | 12 ++++++------ .../sdk/python/gs_interactive/client/status.py | 8 ++++---- flex/tests/hqps/interactive_config_test.yaml | 3 ++- flex/utils/yaml_utils.cc | 14 +++++++++++--- .../compiler/conf/ir.compiler.properties | 5 ++++- .../graphscope/common/config/GraphConfig.java | 3 +++ .../graphscope/common/config/YamlConfigs.java | 3 +++ .../common/ir/meta/reader/HttpIrMetaReader.java | 14 ++++++++++---- 9 files changed, 44 insertions(+), 21 deletions(-) diff --git a/flex/interactive/sdk/python/gs_interactive/client/result.py b/flex/interactive/sdk/python/gs_interactive/client/result.py index f5aea2b6417f..79ec0d126bb5 100644 --- a/flex/interactive/sdk/python/gs_interactive/client/result.py +++ b/flex/interactive/sdk/python/gs_interactive/client/result.py @@ -20,9 +20,8 @@ from typing import TypeVar from gs_interactive.api_response import ApiResponse -from gs_interactive.exceptions import ApiException - from gs_interactive.client.status import Status +from gs_interactive.exceptions import ApiException # Define a generic type placeholder T = TypeVar("T") diff --git a/flex/interactive/sdk/python/gs_interactive/client/session.py b/flex/interactive/sdk/python/gs_interactive/client/session.py index 96da79ed7007..95ff753502a2 100644 --- a/flex/interactive/sdk/python/gs_interactive/client/session.py +++ b/flex/interactive/sdk/python/gs_interactive/client/session.py @@ -23,6 +23,11 @@ from typing import Optional from typing import Union +from pydantic import Field +from pydantic import StrictBytes +from pydantic import StrictStr +from typing_extensions import Annotated + from gs_interactive.api import AdminServiceGraphManagementApi from gs_interactive.api import AdminServiceJobManagementApi from gs_interactive.api import AdminServiceProcedureManagementApi @@ -32,18 +37,13 @@ from gs_interactive.api import QueryServiceApi from gs_interactive.api import UtilsApi from gs_interactive.api_client import ApiClient -from gs_interactive.configuration import Configuration -from pydantic import Field -from pydantic import StrictBytes -from pydantic import StrictStr -from typing_extensions import Annotated - from gs_interactive.client.generated.results_pb2 import CollectiveResults from gs_interactive.client.result import Result from gs_interactive.client.status import Status from gs_interactive.client.status import StatusCode from gs_interactive.client.utils import InputFormat from gs_interactive.client.utils import append_format_byte +from gs_interactive.configuration import Configuration from gs_interactive.models import CreateGraphRequest from gs_interactive.models import CreateGraphResponse from gs_interactive.models import CreateProcedureRequest diff --git a/flex/interactive/sdk/python/gs_interactive/client/status.py b/flex/interactive/sdk/python/gs_interactive/client/status.py index 21564752ecdf..26745c230f5f 100644 --- a/flex/interactive/sdk/python/gs_interactive/client/status.py +++ b/flex/interactive/sdk/python/gs_interactive/client/status.py @@ -16,16 +16,16 @@ # limitations under the License. # +from urllib3.exceptions import MaxRetryError +from urllib3.exceptions import ProtocolError + from gs_interactive.api_response import ApiResponse +from gs_interactive.client.generated.interactive_pb2 import Code as StatusCode from gs_interactive.exceptions import ApiException from gs_interactive.exceptions import BadRequestException from gs_interactive.exceptions import ForbiddenException from gs_interactive.exceptions import NotFoundException from gs_interactive.exceptions import ServiceException -from urllib3.exceptions import MaxRetryError -from urllib3.exceptions import ProtocolError - -from gs_interactive.client.generated.interactive_pb2 import Code as StatusCode from gs_interactive.models.api_response_with_code import APIResponseWithCode diff --git a/flex/tests/hqps/interactive_config_test.yaml b/flex/tests/hqps/interactive_config_test.yaml index 0e1f9f15ed21..6013b0079d90 100644 --- a/flex/tests/hqps/interactive_config_test.yaml +++ b/flex/tests/hqps/interactive_config_test.yaml @@ -5,7 +5,7 @@ compute_engine: type: hiactor workers: - localhost:10000 - thread_num_per_worker: 1 + thread_num_per_worker: 4 store: type: cpp-mcsr metadata_store: @@ -26,6 +26,7 @@ compiler: statistics: uri: http://localhost:7777/v1/graph/%s/statistics interval: 86400000 # ms + timeout: 1000 # ms endpoint: default_listen_address: localhost bolt_connector: diff --git a/flex/utils/yaml_utils.cc b/flex/utils/yaml_utils.cc index 775c4245ea2a..3c36a0e85535 100644 --- a/flex/utils/yaml_utils.cc +++ b/flex/utils/yaml_utils.cc @@ -52,12 +52,20 @@ void convert_yaml_node_to_json(const YAML::Node& node, json.SetInt(node.as()); } catch (const YAML::BadConversion& e) { try { - json.SetDouble(node.as()); + json.SetInt64(node.as()); } catch (const YAML::BadConversion& e) { try { - json.SetBool(node.as()); + json.SetUint64(node.as()); } catch (const YAML::BadConversion& e) { - json.SetString(node.as().c_str(), allocator); + try { + json.SetDouble(node.as()); + } catch (const YAML::BadConversion& e) { + try { + json.SetBool(node.as()); + } catch (const YAML::BadConversion& e) { + json.SetString(node.as().c_str(), allocator); + } + } } } } diff --git a/interactive_engine/compiler/conf/ir.compiler.properties b/interactive_engine/compiler/conf/ir.compiler.properties index 992f828a2b2e..2ab6058f579c 100644 --- a/interactive_engine/compiler/conf/ir.compiler.properties +++ b/interactive_engine/compiler/conf/ir.compiler.properties @@ -68,5 +68,8 @@ calcite.default.charset: UTF-8 # set the interval in milliseconds to fetch graph schema # graph.meta.schema.fetch.interval.ms: 1000 -# set the timeout in milliseconds to fetch graph statistics +# set the interval in milliseconds to fetch graph statistics # graph.meta.statistics.fetch.interval.ms: 86400000l + +# set the timeout in milliseconds to fetch graph statistics +# graph.meta.fetch.timeout.ms: 1000 diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/GraphConfig.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/GraphConfig.java index 9ab065f61366..74a7086599bc 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/GraphConfig.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/GraphConfig.java @@ -31,6 +31,9 @@ public class GraphConfig { public static final Config GRAPH_META_STATISTICS_FETCH_INTERVAL_MS = Config.longConfig("graph.meta.statistics.fetch.interval.ms", 24 * 3600 * 1000l); + public static final Config GRAPH_META_FETCH_TIMEOUT_MS = + Config.longConfig("graph.meta.fetch.timeout.ms", 1000); + // an intermediate solution to support foreign key, will be integrated into schema public static final Config GRAPH_FOREIGN_KEY_URI = Config.stringConfig("graph.foreign.key", ""); diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/YamlConfigs.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/YamlConfigs.java index e55682a86cdb..ec35991327ad 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/YamlConfigs.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/YamlConfigs.java @@ -72,6 +72,9 @@ public class YamlConfigs extends Configs { "graph.meta.statistics.fetch.interval.ms", (Configs configs) -> configs.get("compiler.meta.reader.statistics.interval")) + .put( + "graph.meta.fetch.timeout.ms", + (Configs configs) -> configs.get("compiler.meta.reader.timeout")) .put( "graph.store", (Configs configs) -> { diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/reader/HttpIrMetaReader.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/reader/HttpIrMetaReader.java index 2e12792d7bf1..ba6cfb089d87 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/reader/HttpIrMetaReader.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/reader/HttpIrMetaReader.java @@ -57,7 +57,9 @@ public HttpIrMetaReader(Configs configs) { public IrMeta readMeta() throws IOException { try { HttpResponse response = - sendRequest(GraphConfig.GRAPH_META_SCHEMA_URI.get(configs)); + sendRequest( + GraphConfig.GRAPH_META_SCHEMA_URI.get(configs), + GraphConfig.GRAPH_META_FETCH_TIMEOUT_MS.get(configs)); String res = response.body(); Preconditions.checkArgument( response.statusCode() == 200, @@ -91,7 +93,8 @@ public IrGraphStatistics readStats(GraphId graphId) throws IOException { sendRequest( String.format( GraphConfig.GRAPH_META_STATISTICS_URI.get(configs), - graphId.getId())); + graphId.getId()), + GraphConfig.GRAPH_META_FETCH_TIMEOUT_MS.get(configs)); String res = response.body(); Preconditions.checkArgument( response.statusCode() == 200, @@ -109,7 +112,9 @@ public IrGraphStatistics readStats(GraphId graphId) throws IOException { public boolean syncStatsEnabled(GraphId graphId) throws IOException { try { HttpResponse response = - sendRequest(GraphConfig.GRAPH_META_SCHEMA_URI.get(configs)); + sendRequest( + GraphConfig.GRAPH_META_SCHEMA_URI.get(configs), + GraphConfig.GRAPH_META_FETCH_TIMEOUT_MS.get(configs)); String res = response.body(); Preconditions.checkArgument( response.statusCode() == 200, @@ -122,13 +127,14 @@ public boolean syncStatsEnabled(GraphId graphId) throws IOException { } } - private HttpResponse sendRequest(String requestUri) + private HttpResponse sendRequest(String requestUri, long timeOut) throws IOException, InterruptedException { HttpRequest request = HttpRequest.newBuilder() .uri(URI.create(requestUri)) .headers(CONTENT_TYPE, APPLICATION_JSON) .GET() + .timeout(java.time.Duration.ofMillis(timeOut)) .build(); return httpClient.send(request, HttpResponse.BodyHandlers.ofString()); }