From 234eef1bbc2aea272b94e9f20054e3a6b65f21a7 Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Tue, 3 Dec 2024 15:25:18 +0800 Subject: [PATCH 01/13] try to use all cpus when starting interactive Committed-by: xiaolei.zl from Dev container --- k8s/dockerfiles/interactive-entrypoint.sh | 3 +++ 1 file changed, 3 insertions(+) diff --git a/k8s/dockerfiles/interactive-entrypoint.sh b/k8s/dockerfiles/interactive-entrypoint.sh index 009323f6c31f..76715ec609d6 100644 --- a/k8s/dockerfiles/interactive-entrypoint.sh +++ b/k8s/dockerfiles/interactive-entrypoint.sh @@ -57,6 +57,9 @@ function prepare_workspace() { cp /opt/flex/share/interactive_config.yaml $engine_config_path #make sure the line which start with default_graph is changed to default_graph: ${DEFAULT_GRAPH_NAME} sed -i "s/default_graph:.*/default_graph: ${DEFAULT_GRAPH_NAME}/" $engine_config_path + # By default, we occupy the all available cpus + cpus=$(grep -c ^processor /proc/cpuinfo) + sed -i "s/thread_num_per_worker:.*/thread_num_per_worker: ${cpus}/" $engine_config_path echo "Using default graph: ${DEFAULT_GRAPH_NAME} to start the service" # copy the builtin graph From d00ca5026e537c575a7c4f0500a256131d100874 Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Wed, 4 Dec 2024 14:10:54 +0800 Subject: [PATCH 02/13] enable implicit conversion between i32 and i64 in within Committed-by: xiaolei.zl from Dev container --- .../graph_db/runtime/adhoc/expr_impl.h | 30 +++++++++++-------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/flex/engines/graph_db/runtime/adhoc/expr_impl.h b/flex/engines/graph_db/runtime/adhoc/expr_impl.h index 20a764de0696..281735f3f588 100644 --- a/flex/engines/graph_db/runtime/adhoc/expr_impl.h +++ b/flex/engines/graph_db/runtime/adhoc/expr_impl.h @@ -85,19 +85,25 @@ class WithInExpr : public ExprBase { WithInExpr(const ReadTransaction& txn, const Context& ctx, std::unique_ptr&& key, const common::Value& array) : key_(std::move(key)) { - if constexpr (std::is_same_v) { - CHECK(array.item_case() == common::Value::kI64Array); - size_t len = array.i64_array().item_size(); - for (size_t idx = 0; idx < len; ++idx) { - container_.push_back(array.i64_array().item(idx)); - } - } else if constexpr (std::is_same_v) { - CHECK(array.item_case() == common::Value::kI32Array); - size_t len = array.i32_array().item_size(); - for (size_t idx = 0; idx < len; ++idx) { - container_.push_back(array.i32_array().item(idx)); + if constexpr( (std::is_same_v) || (std::is_same_v)) { + // Implicitly convert to T + if (array.item_case() == common::Value::kI64Array) { + size_t len = array.i64_array().item_size(); + for (size_t idx = 0; idx < len; ++idx) { + container_.push_back(array.i64_array().item(idx)); + } + } else if (array.item_case() == common::Value::kI32Array) { + size_t len = array.i32_array().item_size(); + for (size_t idx = 0; idx < len; ++idx) { + container_.push_back(array.i32_array().item(idx)); + } + } else { + LOG(FATAL) << "Fail to construct WithInExpr of type " + << typeid(T).name() << " with array of type " + << array.item_case(); } - } else if constexpr (std::is_same_v) { + } + else if constexpr (std::is_same_v) { CHECK(array.item_case() == common::Value::kStrArray); size_t len = array.str_array().item_size(); for (size_t idx = 0; idx < len; ++idx) { From df1eb99836c24bacff6379dd26381cbe8d21b6b1 Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Wed, 4 Dec 2024 14:15:05 +0800 Subject: [PATCH 03/13] format Committed-by: xiaolei.zl from Dev container --- flex/engines/graph_db/runtime/adhoc/expr_impl.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flex/engines/graph_db/runtime/adhoc/expr_impl.h b/flex/engines/graph_db/runtime/adhoc/expr_impl.h index 281735f3f588..91046e31e19c 100644 --- a/flex/engines/graph_db/runtime/adhoc/expr_impl.h +++ b/flex/engines/graph_db/runtime/adhoc/expr_impl.h @@ -85,7 +85,8 @@ class WithInExpr : public ExprBase { WithInExpr(const ReadTransaction& txn, const Context& ctx, std::unique_ptr&& key, const common::Value& array) : key_(std::move(key)) { - if constexpr( (std::is_same_v) || (std::is_same_v)) { + if constexpr ((std::is_same_v) || + (std::is_same_v) ) { // Implicitly convert to T if (array.item_case() == common::Value::kI64Array) { size_t len = array.i64_array().item_size(); @@ -102,8 +103,7 @@ class WithInExpr : public ExprBase { << typeid(T).name() << " with array of type " << array.item_case(); } - } - else if constexpr (std::is_same_v) { + } else if constexpr (std::is_same_v) { CHECK(array.item_case() == common::Value::kStrArray); size_t len = array.str_array().item_size(); for (size_t idx = 0; idx < len; ++idx) { From 6a8207eef0c884aa1956892718edeba8d980aa1e Mon Sep 17 00:00:00 2001 From: "xiaolei.zl@alibaba-inc.com" Date: Fri, 6 Dec 2024 15:12:30 +0800 Subject: [PATCH 04/13] don't assume graph_id will be always unique Committed-by: xiaolei.zl@alibaba-inc.com from Dev container --- .../common/ir/meta/fetcher/DynamicIrMetaFetcher.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java index 71be66ef2d20..265d88fe721e 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java @@ -84,7 +84,10 @@ private synchronized void syncMeta() { (meta == null) ? null : meta.getSchema().getSchemaSpec(Type.IR_CORE_IN_JSON)); // if the graph id or schema version is changed, we need to update the statistics if (this.currentState == null - || !this.currentState.getGraphId().equals(meta.getGraphId()) + // NOTE(lei): We could not use graph id to determine whether the graph is + // changed. Because the graph id is generated by the graph store, it may be + // different. + // || !this.currentState.getGraphId().equals(meta.getGraphId()) || !this.currentState .getSchema() .getVersion() From 0534551c3450761b60ced56a90b1c6861dfbac03 Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Mon, 9 Dec 2024 14:13:54 +0800 Subject: [PATCH 05/13] debuging Committed-by: xiaolei.zl from Dev container --- .github/workflows/interactive.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/interactive.yml b/.github/workflows/interactive.yml index 716c79ac2627..0c6bde4fd96c 100644 --- a/.github/workflows/interactive.yml +++ b/.github/workflows/interactive.yml @@ -109,7 +109,7 @@ jobs: name: interactive_build-${{ github.sha }} - name: Setup tmate session - if: false + if: true uses: mxschmitt/action-tmate@v3 - name: Extract build artifacts From 01b65df61ff965f12a8e256a49239b72d68c3b5b Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Mon, 9 Dec 2024 15:41:00 +0800 Subject: [PATCH 06/13] fixing ci Committed-by: xiaolei.zl from Dev container --- .github/workflows/interactive.yml | 2 +- .../handler/graph_db_http_handler.cc | 2 +- .../handler/graph_db_http_handler.h | 2 +- .../python/gs_interactive/tests/conftest.py | 2 +- .../ir/meta/fetcher/DynamicIrMetaFetcher.java | 39 +++++++++---------- 5 files changed, 23 insertions(+), 24 deletions(-) diff --git a/.github/workflows/interactive.yml b/.github/workflows/interactive.yml index 0c6bde4fd96c..716c79ac2627 100644 --- a/.github/workflows/interactive.yml +++ b/.github/workflows/interactive.yml @@ -109,7 +109,7 @@ jobs: name: interactive_build-${{ github.sha }} - name: Setup tmate session - if: true + if: false uses: mxschmitt/action-tmate@v3 - name: Extract build artifacts diff --git a/flex/engines/http_server/handler/graph_db_http_handler.cc b/flex/engines/http_server/handler/graph_db_http_handler.cc index 319920459058..c530a1590569 100644 --- a/flex/engines/http_server/handler/graph_db_http_handler.cc +++ b/flex/engines/http_server/handler/graph_db_http_handler.cc @@ -176,7 +176,7 @@ class stored_proc_handler : public StoppableHandler { bool start() override { if (get_executors()[StoppableHandler::shard_id()].size() > 0) { - LOG(ERROR) << "The actors have been already created!"; + VLOG(1) << "The actors have been already created!"; return false; } return StoppableHandler::start_scope( diff --git a/flex/engines/http_server/handler/graph_db_http_handler.h b/flex/engines/http_server/handler/graph_db_http_handler.h index 6bc5c906910e..22090e66dc41 100644 --- a/flex/engines/http_server/handler/graph_db_http_handler.h +++ b/flex/engines/http_server/handler/graph_db_http_handler.h @@ -69,7 +69,7 @@ class StoppableHandler : public seastar::httpd::handler_base { } catch (const std::exception& e) { // In case the scope is already cancelled, we should ignore the // exception. - LOG(INFO) << "Failed to cancel IC scope: " << e.what(); + VLOG(1) << "Failed to cancel IC scope: " << e.what(); } func(); return seastar::make_ready_future<>(); diff --git a/flex/interactive/sdk/python/gs_interactive/tests/conftest.py b/flex/interactive/sdk/python/gs_interactive/tests/conftest.py index 94f0b34a25f2..34e4372d4f5d 100644 --- a/flex/interactive/sdk/python/gs_interactive/tests/conftest.py +++ b/flex/interactive/sdk/python/gs_interactive/tests/conftest.py @@ -478,4 +478,4 @@ def start_service_on_graph(interactive_session, graph_id: str): resp = interactive_session.start_service(StartServiceRequest(graph_id=graph_id)) assert resp.is_ok() # wait three second to let compiler get the new graph - time.sleep(3) + time.sleep(10) diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java index 265d88fe721e..4801aaa1ebcf 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java @@ -79,28 +79,27 @@ public Optional fetch() { private synchronized void syncMeta() { try { IrMeta meta = this.reader.readMeta(); - logger.debug( + logger.info( "schema from remote: {}", (meta == null) ? null : meta.getSchema().getSchemaSpec(Type.IR_CORE_IN_JSON)); - // if the graph id or schema version is changed, we need to update the statistics - if (this.currentState == null - // NOTE(lei): We could not use graph id to determine whether the graph is - // changed. Because the graph id is generated by the graph store, it may be - // different. - // || !this.currentState.getGraphId().equals(meta.getGraphId()) - || !this.currentState - .getSchema() - .getVersion() - .equals(meta.getSchema().getVersion())) { - this.statsState = StatsState.INITIALIZED; - this.currentState = - new IrMetaStats( - meta.getGraphId(), - meta.getSnapshotId(), - meta.getSchema(), - meta.getStoredProcedures(), - null); - } + // NOTE(lei): We could not use graph id to determine whether the graph is + // changed. Because the graph id is generated by the graph store, it may be + // different. So currently we need to update the schema and statistics every time. + // if (this.currentState == null + // || !this.currentState.getGraphId().equals(meta.getGraphId()) + // || !this.currentState + // .getSchema() + // .getVersion() + // .equals(meta.getSchema().getVersion())) { + this.statsState = StatsState.INITIALIZED; + this.currentState = + new IrMetaStats( + meta.getGraphId(), + meta.getSnapshotId(), + meta.getSchema(), + meta.getStoredProcedures(), + null); + // } boolean statsEnabled = getStatsEnabled(this.currentState.getGraphId()); if (statsEnabled && this.statsState != StatsState.SYNCED || (!statsEnabled && this.statsState != StatsState.MOCKED)) { From 66542b5b0917de5e46f5fe7b65779875534c415f Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Wed, 11 Dec 2024 15:17:44 +0800 Subject: [PATCH 07/13] let graph_id increasing, don't use previous ids Committed-by: xiaolei.zl from Dev container --- .../handler/graph_db_http_handler.cc | 2 +- .../metadata/local_file_metadata_store.cc | 53 +++++++++++-------- .../metadata/local_file_metadata_store.h | 8 ++- .../graphscope/common/ir/meta/GraphId.java | 5 ++ .../ir/meta/fetcher/DynamicIrMetaFetcher.java | 34 ++++++------ 5 files changed, 59 insertions(+), 43 deletions(-) diff --git a/flex/engines/http_server/handler/graph_db_http_handler.cc b/flex/engines/http_server/handler/graph_db_http_handler.cc index c530a1590569..4ff8c9e540ff 100644 --- a/flex/engines/http_server/handler/graph_db_http_handler.cc +++ b/flex/engines/http_server/handler/graph_db_http_handler.cc @@ -176,7 +176,7 @@ class stored_proc_handler : public StoppableHandler { bool start() override { if (get_executors()[StoppableHandler::shard_id()].size() > 0) { - VLOG(1) << "The actors have been already created!"; + VLOG(10) << "The actors have been already created!"; return false; } return StoppableHandler::start_scope( diff --git a/flex/storages/metadata/local_file_metadata_store.cc b/flex/storages/metadata/local_file_metadata_store.cc index 5f77d0cc2e7a..bcff4bf4ed38 100644 --- a/flex/storages/metadata/local_file_metadata_store.cc +++ b/flex/storages/metadata/local_file_metadata_store.cc @@ -181,8 +181,8 @@ Result LocalFileMetadataStore::UpdateMeta(const meta_kind_t& meta_kind, Result LocalFileMetadataStore::get_next_meta_key( - const LocalFileMetadataStore::meta_kind_t& meta_kind) const { - return std::to_string(get_max_id(meta_kind) + 1); + const LocalFileMetadataStore::meta_kind_t& meta_kind) { + return std::to_string(increase_and_get_id(meta_kind)); } std::string LocalFileMetadataStore::get_root_meta_dir() const { @@ -208,29 +208,38 @@ std::string LocalFileMetadataStore::get_meta_file(const meta_kind_t& meta_kind, return ret; } -int32_t LocalFileMetadataStore::get_max_id(const meta_kind_t& meta_kind) const { - // iterate all files in the directory, get the max id. - int max_id_ = 0; +// Guarded by meta_mutex_ outside. +int32_t LocalFileMetadataStore::increase_and_get_id( + const meta_kind_t& meta_kind) { auto dir = get_meta_kind_dir(meta_kind); - for (auto& p : std::filesystem::directory_iterator(dir)) { - if (std::filesystem::is_directory(p)) { - continue; - } - auto file_name = p.path().filename().string(); - if (file_name.find(META_FILE_PREFIX) != std::string::npos) { - auto id_str = file_name.substr(strlen(META_FILE_PREFIX)); - int32_t id; - try { - id = std::stoi(id_str); - } catch (std::invalid_argument& e) { - LOG(ERROR) << "Invalid id: " << id_str; - continue; - } - if (id > max_id_) { - max_id_ = id; - } + int max_id_ = 0; + // In the directory, we expect a file with name CUR_ID_FILE_NAME. + // If the file does not exist, we will create one with content "0". + auto cur_id_file = dir + "/" + CUR_ID_FILE_NAME; + if (!std::filesystem::exists(cur_id_file)) { + std::ofstream out_file(cur_id_file); + if (!out_file.is_open()) { + LOG(ERROR) << "Failed to create file: " << cur_id_file; + return -1; } + out_file << "0"; + out_file.close(); + } + std::ifstream in_file(cur_id_file); + if (!in_file.is_open()) { + LOG(ERROR) << "Failed to open file: " << cur_id_file; + return -1; } + in_file >> max_id_; + in_file.close(); + max_id_++; + std::ofstream out_file(cur_id_file); + if (!out_file.is_open()) { + LOG(ERROR) << "Failed to open file: " << cur_id_file; + return -1; + } + out_file << max_id_; + out_file.close(); return max_id_; } diff --git a/flex/storages/metadata/local_file_metadata_store.h b/flex/storages/metadata/local_file_metadata_store.h index 186aa68e7efd..bada3778d535 100644 --- a/flex/storages/metadata/local_file_metadata_store.h +++ b/flex/storages/metadata/local_file_metadata_store.h @@ -48,6 +48,7 @@ class LocalFileMetadataStore : public IMetaStore { static constexpr const char* METADATA_DIR = "METADATA"; static constexpr const char* META_FILE_PREFIX = "META_"; + static constexpr const char* CUR_ID_FILE_NAME = "CUR_ID"; LocalFileMetadataStore(const std::string& path); @@ -110,12 +111,15 @@ class LocalFileMetadataStore : public IMetaStore { update_func_t update_func) override; private: - Result get_next_meta_key(const meta_kind_t& meta_kind) const; + Result get_next_meta_key(const meta_kind_t& meta_kind); std::string get_root_meta_dir() const; std::string get_meta_kind_dir(const meta_kind_t& meta_kind) const; std::string get_meta_file(const meta_kind_t& meta_kind, const meta_key_t& meta_key) const; - int32_t get_max_id(const meta_kind_t& meta_kind) const; + /** + * For the specified meta_kind, increase the id and return the new id. + */ + int32_t increase_and_get_id(const meta_kind_t& meta_kind); bool is_key_exist(const meta_kind_t& meta_kind, const meta_key_t& meta_key) const; diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/GraphId.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/GraphId.java index 8ddf0d75a511..4c63425263cd 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/GraphId.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/GraphId.java @@ -51,4 +51,9 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hashCode(id); } + + @Override + public String toString() { + return "GraphId{" + "id=" + id + '}'; + } } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java index 4801aaa1ebcf..226be5a2f68b 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java @@ -82,24 +82,22 @@ private synchronized void syncMeta() { logger.info( "schema from remote: {}", (meta == null) ? null : meta.getSchema().getSchemaSpec(Type.IR_CORE_IN_JSON)); - // NOTE(lei): We could not use graph id to determine whether the graph is - // changed. Because the graph id is generated by the graph store, it may be - // different. So currently we need to update the schema and statistics every time. - // if (this.currentState == null - // || !this.currentState.getGraphId().equals(meta.getGraphId()) - // || !this.currentState - // .getSchema() - // .getVersion() - // .equals(meta.getSchema().getVersion())) { - this.statsState = StatsState.INITIALIZED; - this.currentState = - new IrMetaStats( - meta.getGraphId(), - meta.getSnapshotId(), - meta.getSchema(), - meta.getStoredProcedures(), - null); - // } + // if the graph id or schema version is changed, we need to update the statistics + if (this.currentState == null + || !this.currentState.getGraphId().equals(meta.getGraphId()) + || !this.currentState + .getSchema() + .getVersion() + .equals(meta.getSchema().getVersion())) { + this.statsState = StatsState.INITIALIZED; + this.currentState = + new IrMetaStats( + meta.getGraphId(), + meta.getSnapshotId(), + meta.getSchema(), + meta.getStoredProcedures(), + null); + } boolean statsEnabled = getStatsEnabled(this.currentState.getGraphId()); if (statsEnabled && this.statsState != StatsState.SYNCED || (!statsEnabled && this.statsState != StatsState.MOCKED)) { From 260be2dad60b06f436a3356dafeb6e9076f62ee8 Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Wed, 18 Dec 2024 11:49:43 +0800 Subject: [PATCH 08/13] fixing Committed-by: xiaolei.zl from Dev container --- .../python/gs_interactive/client/result.py | 3 +- .../python/gs_interactive/client/session.py | 12 +++--- .../python/gs_interactive/client/status.py | 8 ++-- .../python/gs_interactive/tests/conftest.py | 24 ++++++++++- .../gs_interactive/tests/test_robustness.py | 42 ++++++++++++++++++- 5 files changed, 76 insertions(+), 13 deletions(-) diff --git a/flex/interactive/sdk/python/gs_interactive/client/result.py b/flex/interactive/sdk/python/gs_interactive/client/result.py index f5aea2b6417f..79ec0d126bb5 100644 --- a/flex/interactive/sdk/python/gs_interactive/client/result.py +++ b/flex/interactive/sdk/python/gs_interactive/client/result.py @@ -20,9 +20,8 @@ from typing import TypeVar from gs_interactive.api_response import ApiResponse -from gs_interactive.exceptions import ApiException - from gs_interactive.client.status import Status +from gs_interactive.exceptions import ApiException # Define a generic type placeholder T = TypeVar("T") diff --git a/flex/interactive/sdk/python/gs_interactive/client/session.py b/flex/interactive/sdk/python/gs_interactive/client/session.py index 96da79ed7007..95ff753502a2 100644 --- a/flex/interactive/sdk/python/gs_interactive/client/session.py +++ b/flex/interactive/sdk/python/gs_interactive/client/session.py @@ -23,6 +23,11 @@ from typing import Optional from typing import Union +from pydantic import Field +from pydantic import StrictBytes +from pydantic import StrictStr +from typing_extensions import Annotated + from gs_interactive.api import AdminServiceGraphManagementApi from gs_interactive.api import AdminServiceJobManagementApi from gs_interactive.api import AdminServiceProcedureManagementApi @@ -32,18 +37,13 @@ from gs_interactive.api import QueryServiceApi from gs_interactive.api import UtilsApi from gs_interactive.api_client import ApiClient -from gs_interactive.configuration import Configuration -from pydantic import Field -from pydantic import StrictBytes -from pydantic import StrictStr -from typing_extensions import Annotated - from gs_interactive.client.generated.results_pb2 import CollectiveResults from gs_interactive.client.result import Result from gs_interactive.client.status import Status from gs_interactive.client.status import StatusCode from gs_interactive.client.utils import InputFormat from gs_interactive.client.utils import append_format_byte +from gs_interactive.configuration import Configuration from gs_interactive.models import CreateGraphRequest from gs_interactive.models import CreateGraphResponse from gs_interactive.models import CreateProcedureRequest diff --git a/flex/interactive/sdk/python/gs_interactive/client/status.py b/flex/interactive/sdk/python/gs_interactive/client/status.py index 13af0e0ed660..948d9ec65a3d 100644 --- a/flex/interactive/sdk/python/gs_interactive/client/status.py +++ b/flex/interactive/sdk/python/gs_interactive/client/status.py @@ -16,15 +16,15 @@ # limitations under the License. # +from urllib3.exceptions import MaxRetryError + from gs_interactive.api_response import ApiResponse +from gs_interactive.client.generated.interactive_pb2 import Code as StatusCode from gs_interactive.exceptions import ApiException from gs_interactive.exceptions import BadRequestException from gs_interactive.exceptions import ForbiddenException from gs_interactive.exceptions import NotFoundException from gs_interactive.exceptions import ServiceException -from urllib3.exceptions import MaxRetryError - -from gs_interactive.client.generated.interactive_pb2 import Code as StatusCode from gs_interactive.models.api_response_with_code import APIResponseWithCode @@ -108,6 +108,8 @@ def from_exception(exception: ApiException): return Status(StatusCode.INTERNAL_ERROR, exception.body) elif isinstance(exception, MaxRetryError): return Status(StatusCode.INTERNAL_ERROR, exception) + elif isinstance(exception, ProtocolError): + return Status(StatusCode.INTERNAL_ERROR, exception) return Status( StatusCode.UNKNOWN, "Unknown Error from exception " + exception.body ) diff --git a/flex/interactive/sdk/python/gs_interactive/tests/conftest.py b/flex/interactive/sdk/python/gs_interactive/tests/conftest.py index 34e4372d4f5d..f5bfec6b27e1 100644 --- a/flex/interactive/sdk/python/gs_interactive/tests/conftest.py +++ b/flex/interactive/sdk/python/gs_interactive/tests/conftest.py @@ -27,6 +27,7 @@ from gs_interactive.client.session import Session from gs_interactive.models import CreateGraphRequest from gs_interactive.models import CreateProcedureRequest +from gs_interactive.models import GetGraphSchemaResponse from gs_interactive.models import SchemaMapping from gs_interactive.models import StartServiceRequest from gs_interactive.models import UpdateProcedureRequest @@ -478,4 +479,25 @@ def start_service_on_graph(interactive_session, graph_id: str): resp = interactive_session.start_service(StartServiceRequest(graph_id=graph_id)) assert resp.is_ok() # wait three second to let compiler get the new graph - time.sleep(10) + time.sleep(3) + + +def ensure_compiler_schema_ready( + interactive_session, neo4j_session: Neo4jSession, graph_id: str +): + rel_graph_meta = interactive_session.get_graph_schema(graph_id).get_value() + max_times = 10 + while True: + if max_times == 0: + raise Exception("compiler schema is not ready") + res = neo4j_session.run("CALL gs.procedure.meta.schema();") + val = res.single().value() + compiler_graph_schema = GetGraphSchemaResponse.from_json(val) + # print("compiler_graph_schema: ", compiler_graph_schema) + # print("rel_graph_meta: ", rel_graph_meta) + if compiler_graph_schema == rel_graph_meta: + break + print("compiler schema is not ready, wait for 1 second") + time.sleep(1) + max_times -= 1 + print("compiler schema is ready") diff --git a/flex/interactive/sdk/python/gs_interactive/tests/test_robustness.py b/flex/interactive/sdk/python/gs_interactive/tests/test_robustness.py index a144a1e07948..2fb50d138f4e 100644 --- a/flex/interactive/sdk/python/gs_interactive/tests/test_robustness.py +++ b/flex/interactive/sdk/python/gs_interactive/tests/test_robustness.py @@ -27,6 +27,7 @@ from gs_interactive.tests.conftest import call_procedure # noqa: E402 from gs_interactive.tests.conftest import create_procedure from gs_interactive.tests.conftest import delete_procedure +from gs_interactive.tests.conftest import ensure_compiler_schema_ready from gs_interactive.tests.conftest import import_data_to_full_modern_graph from gs_interactive.tests.conftest import import_data_to_partial_modern_graph from gs_interactive.tests.conftest import import_data_to_vertex_only_modern_graph @@ -61,6 +62,9 @@ def test_query_on_vertex_only_graph( """ print("[Query on vertex only graph]") start_service_on_graph(interactive_session, create_vertex_only_modern_graph) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_vertex_only_modern_graph + ) run_cypher_test_suite( neo4j_session, create_vertex_only_modern_graph, vertex_only_cypher_queries ) @@ -69,6 +73,10 @@ def test_query_on_vertex_only_graph( import_data_to_vertex_only_modern_graph( interactive_session, create_vertex_only_modern_graph ) + start_service_on_graph(interactive_session, create_vertex_only_modern_graph) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_vertex_only_modern_graph + ) run_cypher_test_suite( neo4j_session, create_vertex_only_modern_graph, vertex_only_cypher_queries ) @@ -83,12 +91,19 @@ def test_query_on_partial_graph( print("[Query on partial graph]") # start service on new graph start_service_on_graph(interactive_session, create_partial_modern_graph) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_partial_modern_graph + ) # try to query on the graph run_cypher_test_suite(neo4j_session, create_partial_modern_graph, cypher_queries) start_service_on_graph(interactive_session, "1") import_data_to_partial_modern_graph( interactive_session, create_partial_modern_graph ) + start_service_on_graph(interactive_session, create_partial_modern_graph) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_partial_modern_graph + ) run_cypher_test_suite(neo4j_session, create_partial_modern_graph, cypher_queries) @@ -100,10 +115,17 @@ def test_query_on_full_modern_graph( """ print("[Query on full modern graph]") start_service_on_graph(interactive_session, create_modern_graph) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_modern_graph + ) # try to query on the graph run_cypher_test_suite(neo4j_session, create_modern_graph, cypher_queries) start_service_on_graph(interactive_session, "1") import_data_to_full_modern_graph(interactive_session, create_modern_graph) + start_service_on_graph(interactive_session, create_modern_graph) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_modern_graph + ) run_cypher_test_suite(neo4j_session, create_modern_graph, cypher_queries) @@ -129,6 +151,9 @@ def test_service_switching( ) print("Procedure id: ", a_proc_id) start_service_on_graph(interactive_session, create_modern_graph) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_modern_graph + ) call_procedure(neo4j_session, create_modern_graph, a_proc_id) # create procedure on graph_b_id @@ -139,6 +164,9 @@ def test_service_switching( "MATCH(n: person) return count(n);", ) start_service_on_graph(interactive_session, create_vertex_only_modern_graph) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_vertex_only_modern_graph + ) call_procedure(neo4j_session, create_vertex_only_modern_graph, b_proc_id) @@ -156,6 +184,9 @@ def test_procedure_creation(interactive_session, neo4j_session, create_modern_gr ) print("Procedure id: ", a_proc_id) start_service_on_graph(interactive_session, create_modern_graph) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_modern_graph + ) call_procedure(neo4j_session, create_modern_graph, a_proc_id) # create procedure with name containing space, @@ -202,6 +233,9 @@ def test_builtin_procedure(interactive_session, neo4j_session, create_modern_gra ) # Call the builtin procedure start_service_on_graph(interactive_session, create_modern_graph) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_modern_graph + ) call_procedure( neo4j_session, create_modern_graph, @@ -259,6 +293,10 @@ def test_list_jobs(interactive_session, create_vertex_only_modern_graph): def test_call_proc_in_cypher(interactive_session, neo4j_session, create_modern_graph): print("[Test call procedure in cypher]") import_data_to_full_modern_graph(interactive_session, create_modern_graph) + start_service_on_graph(interactive_session, create_modern_graph) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_modern_graph + ) result = neo4j_session.run( 'MATCH(p: person) with p.id as oid CALL k_neighbors("person", oid, 1) return label_name, vertex_oid;' ) @@ -276,6 +314,9 @@ def test_custom_pk_name( interactive_session, create_graph_with_custom_pk_name ) start_service_on_graph(interactive_session, create_graph_with_custom_pk_name) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_graph_with_custom_pk_name + ) result = neo4j_session.run( "MATCH (n: person) where n.custom_id = 4 return n.custom_id;" ) @@ -289,4 +330,3 @@ def test_custom_pk_name( ) records = result.fetch(1) assert len(records) == 1 and records[0]["$f0"] == 2 - start_service_on_graph(interactive_session, "1") From b7f7b84068c19873394829dc7b06cff6f87fdb2d Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Wed, 18 Dec 2024 12:08:27 +0800 Subject: [PATCH 09/13] formatting Committed-by: xiaolei.zl from Dev container --- .github/workflows/pr-check.yml | 6 +++++- flex/interactive/sdk/python/gs_interactive/client/status.py | 1 + flex/interactive/sdk/python/setup.cfg | 2 +- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/.github/workflows/pr-check.yml b/.github/workflows/pr-check.yml index 7f59f2182263..a66be7b6d96c 100644 --- a/.github/workflows/pr-check.yml +++ b/.github/workflows/pr-check.yml @@ -213,11 +213,15 @@ jobs: python3 -m black --check --diff . python3 -m flake8 . popd - pushd flex/interactive/sdk/python + # we need to generate the code first + pushd flex/interactive/sdk + bash generate_sdk.sh -g python + pushd python python3 -m isort --check --diff . python3 -m black --check --diff . python3 -m flake8 . popd + popd - name: Generate Docs shell: bash diff --git a/flex/interactive/sdk/python/gs_interactive/client/status.py b/flex/interactive/sdk/python/gs_interactive/client/status.py index 948d9ec65a3d..baa4964ce83d 100644 --- a/flex/interactive/sdk/python/gs_interactive/client/status.py +++ b/flex/interactive/sdk/python/gs_interactive/client/status.py @@ -24,6 +24,7 @@ from gs_interactive.exceptions import BadRequestException from gs_interactive.exceptions import ForbiddenException from gs_interactive.exceptions import NotFoundException +from gs_interactive.exceptions import ProtocolError from gs_interactive.exceptions import ServiceException from gs_interactive.models.api_response_with_code import APIResponseWithCode diff --git a/flex/interactive/sdk/python/setup.cfg b/flex/interactive/sdk/python/setup.cfg index 1e1839250158..255781291114 100644 --- a/flex/interactive/sdk/python/setup.cfg +++ b/flex/interactive/sdk/python/setup.cfg @@ -3,7 +3,7 @@ profile = black ensure_newline_before_comments = True line_length = 88 force_single_line = True -skip = build/,dist/,gs_interactive/api/,gs_interactive/api_response.py,gs_interactive/configuration.py,gs_interactive/exceptions.py,gs_interactive/models/,gs_interactiverest.py, +skip = build/,dist/,gs_interactive/api/,gs_interactive/api_response.py,gs_interactive/configuration.py,gs_interactive/exceptions.py,gs_interactive/models/,gs_interactiverest.py,gs_interactive/api_client.py,gs_interactive/__init__.py,gs_interactive/rest.py skip_glob = *_pb2.py,*_pb2_grpc.py,build/* [flake8] From f1caaa0bafb156142edcd1c946d2b7827e0dc6ed Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Wed, 18 Dec 2024 14:33:27 +0800 Subject: [PATCH 10/13] format Committed-by: xiaolei.zl from Dev container --- .../sdk/python/gs_interactive/client/result.py | 3 ++- .../sdk/python/gs_interactive/client/session.py | 12 ++++++------ .../sdk/python/gs_interactive/client/status.py | 6 +++--- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/flex/interactive/sdk/python/gs_interactive/client/result.py b/flex/interactive/sdk/python/gs_interactive/client/result.py index 79ec0d126bb5..f5aea2b6417f 100644 --- a/flex/interactive/sdk/python/gs_interactive/client/result.py +++ b/flex/interactive/sdk/python/gs_interactive/client/result.py @@ -20,9 +20,10 @@ from typing import TypeVar from gs_interactive.api_response import ApiResponse -from gs_interactive.client.status import Status from gs_interactive.exceptions import ApiException +from gs_interactive.client.status import Status + # Define a generic type placeholder T = TypeVar("T") diff --git a/flex/interactive/sdk/python/gs_interactive/client/session.py b/flex/interactive/sdk/python/gs_interactive/client/session.py index 95ff753502a2..96da79ed7007 100644 --- a/flex/interactive/sdk/python/gs_interactive/client/session.py +++ b/flex/interactive/sdk/python/gs_interactive/client/session.py @@ -23,11 +23,6 @@ from typing import Optional from typing import Union -from pydantic import Field -from pydantic import StrictBytes -from pydantic import StrictStr -from typing_extensions import Annotated - from gs_interactive.api import AdminServiceGraphManagementApi from gs_interactive.api import AdminServiceJobManagementApi from gs_interactive.api import AdminServiceProcedureManagementApi @@ -37,13 +32,18 @@ from gs_interactive.api import QueryServiceApi from gs_interactive.api import UtilsApi from gs_interactive.api_client import ApiClient +from gs_interactive.configuration import Configuration +from pydantic import Field +from pydantic import StrictBytes +from pydantic import StrictStr +from typing_extensions import Annotated + from gs_interactive.client.generated.results_pb2 import CollectiveResults from gs_interactive.client.result import Result from gs_interactive.client.status import Status from gs_interactive.client.status import StatusCode from gs_interactive.client.utils import InputFormat from gs_interactive.client.utils import append_format_byte -from gs_interactive.configuration import Configuration from gs_interactive.models import CreateGraphRequest from gs_interactive.models import CreateGraphResponse from gs_interactive.models import CreateProcedureRequest diff --git a/flex/interactive/sdk/python/gs_interactive/client/status.py b/flex/interactive/sdk/python/gs_interactive/client/status.py index baa4964ce83d..625d252aae95 100644 --- a/flex/interactive/sdk/python/gs_interactive/client/status.py +++ b/flex/interactive/sdk/python/gs_interactive/client/status.py @@ -16,16 +16,16 @@ # limitations under the License. # -from urllib3.exceptions import MaxRetryError - from gs_interactive.api_response import ApiResponse -from gs_interactive.client.generated.interactive_pb2 import Code as StatusCode from gs_interactive.exceptions import ApiException from gs_interactive.exceptions import BadRequestException from gs_interactive.exceptions import ForbiddenException from gs_interactive.exceptions import NotFoundException from gs_interactive.exceptions import ProtocolError from gs_interactive.exceptions import ServiceException +from urllib3.exceptions import MaxRetryError + +from gs_interactive.client.generated.interactive_pb2 import Code as StatusCode from gs_interactive.models.api_response_with_code import APIResponseWithCode From 9dcc7010fdeb442c88fb44a1576203429d6af881 Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Wed, 18 Dec 2024 17:24:31 +0800 Subject: [PATCH 11/13] fix Committed-by: xiaolei.zl from Dev container --- flex/interactive/sdk/python/gs_interactive/client/status.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flex/interactive/sdk/python/gs_interactive/client/status.py b/flex/interactive/sdk/python/gs_interactive/client/status.py index 625d252aae95..21564752ecdf 100644 --- a/flex/interactive/sdk/python/gs_interactive/client/status.py +++ b/flex/interactive/sdk/python/gs_interactive/client/status.py @@ -21,9 +21,9 @@ from gs_interactive.exceptions import BadRequestException from gs_interactive.exceptions import ForbiddenException from gs_interactive.exceptions import NotFoundException -from gs_interactive.exceptions import ProtocolError from gs_interactive.exceptions import ServiceException from urllib3.exceptions import MaxRetryError +from urllib3.exceptions import ProtocolError from gs_interactive.client.generated.interactive_pb2 import Code as StatusCode from gs_interactive.models.api_response_with_code import APIResponseWithCode From 91317857fad38c85e51eacb5a242b1200f598a4c Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Wed, 18 Dec 2024 19:45:05 +0800 Subject: [PATCH 12/13] fix Committed-by: xiaolei.zl from Dev container --- .../sdk/python/gs_interactive/tests/conftest.py | 9 +++++---- flex/tests/hqps/hqps_robust_test.sh | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/flex/interactive/sdk/python/gs_interactive/tests/conftest.py b/flex/interactive/sdk/python/gs_interactive/tests/conftest.py index f5bfec6b27e1..3617115b4740 100644 --- a/flex/interactive/sdk/python/gs_interactive/tests/conftest.py +++ b/flex/interactive/sdk/python/gs_interactive/tests/conftest.py @@ -17,6 +17,7 @@ # # get the directory of the current file +import copy import os import time @@ -40,7 +41,7 @@ modern_graph_full = { - "name": "modern_graph", + "name": "full_graph", "description": "This is a test graph", "schema": { "vertex_types": [ @@ -121,7 +122,7 @@ } modern_graph_vertex_only = { - "name": "modern_graph", + "name": "vertex_only", "description": "This is a test graph, only contains vertex", "schema": { "vertex_types": [ @@ -149,7 +150,7 @@ } modern_graph_partial = { - "name": "modern_graph", + "name": "partial_graph", "description": "This is a test graph", "schema": { "vertex_types": [ @@ -337,7 +338,7 @@ def create_partial_modern_graph(interactive_session): @pytest.fixture(scope="function") def create_graph_with_custom_pk_name(interactive_session): - modern_graph_custom_pk_name = modern_graph_full.copy() + modern_graph_custom_pk_name = copy.deepcopy(modern_graph_full) for vertex_type in modern_graph_custom_pk_name["schema"]["vertex_types"]: vertex_type["properties"][0]["property_name"] = "custom_id" vertex_type["primary_keys"] = ["custom_id"] diff --git a/flex/tests/hqps/hqps_robust_test.sh b/flex/tests/hqps/hqps_robust_test.sh index 8090d8be8164..af21a57459ac 100644 --- a/flex/tests/hqps/hqps_robust_test.sh +++ b/flex/tests/hqps/hqps_robust_test.sh @@ -82,7 +82,7 @@ start_engine_service(){ fi cmd="${SERVER_BIN} -c ${config_path} --enable-admin-service true " - cmd="${cmd} -w ${INTERACTIVE_WORKSPACE} --start-compiler true &" + cmd="${cmd} -w ${INTERACTIVE_WORKSPACE} --start-compiler true > /tmp/engine.log 2>&1 & " echo "Start engine service with command: ${cmd}" eval ${cmd} From 4265ca59b4dd1da541e6ba5b06602b4c3fbb7fde Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Thu, 19 Dec 2024 10:38:48 +0800 Subject: [PATCH 13/13] minor Committed-by: xiaolei.zl from Dev container --- .../graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java index 226be5a2f68b..71be66ef2d20 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java @@ -79,7 +79,7 @@ public Optional fetch() { private synchronized void syncMeta() { try { IrMeta meta = this.reader.readMeta(); - logger.info( + logger.debug( "schema from remote: {}", (meta == null) ? null : meta.getSchema().getSchemaSpec(Type.IR_CORE_IN_JSON)); // if the graph id or schema version is changed, we need to update the statistics