diff --git a/.github/workflows/pr-check.yml b/.github/workflows/pr-check.yml index 0bd7efd05ebb..e90f91e164bd 100644 --- a/.github/workflows/pr-check.yml +++ b/.github/workflows/pr-check.yml @@ -109,9 +109,10 @@ jobs: sudo chmod +x /usr/bin/clang-format # run format - cd analytical_engine/ + pushd analytical_engine/ find ./apps ./benchmarks ./core ./frame ./misc ./test -name "*.h" | xargs clang-format -i --style=file find ./apps ./benchmarks ./core ./frame ./misc ./test -name "*.cc" | xargs clang-format -i --style=file + popd # validate format function prepend() { while read line; do echo "${1}${line}"; done; } @@ -137,6 +138,33 @@ jobs: exit -1 fi + push flex + # except for files end with act.h + find ./bin ./codegen ./engines ./storages ./engines ./tests ./utils ./otel -name "*.h" ! -name "*act.h" ! -name "*actg.h" | xargs clang-format -i --style=file + find ./bin ./codegen ./engines ./storages ./engines ./tests ./utils ./otel -name "*.cc" ! -name "*act.cc" | xargs clang-format -i --style=file + popd + + GIT_DIFF=$(git diff --ignore-submodules) + if [[ -n $GIT_DIFF ]]; then + echo "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + echo "| clang-format failures found!" + echo "|" + echo "$GIT_DIFF" | prepend "| " + echo "|" + echo "| Run: " + echo "|" + echo "| cd flex && make flex_clformat" + echo "|" + echo "| to fix this error." + echo "|" + echo "| Ensure you are working with clang-format-8, which can be obtained from" + echo "|" + echo "| https://github.com/muttleyxd/clang-tools-static-binaries/releases" + echo "|" + echo "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + exit -1 + fi + - name: Java Format and Lint Check run: | wget https://github.com/google/google-java-format/releases/download/v1.13.0/google-java-format-1.13.0-all-deps.jar @@ -184,6 +212,14 @@ jobs: python3 -m isort --check --diff . python3 -m black --check --diff . python3 -m flake8 . + popd + pushd flex/interactive/sdk/python/gs_interactive + python3 -m isort --check --diff . + python3 -m black --check --diff . + # only check client and tests, to avoid checking generated code under api, client/generated, etc. + python3 -m flake8 ./client + python3 -m flake8 ./tests + popd - name: Generate Docs shell: bash diff --git a/flex/CMakeLists.txt b/flex/CMakeLists.txt index 2ebfa4f0e6b2..9a5d35065295 100644 --- a/flex/CMakeLists.txt +++ b/flex/CMakeLists.txt @@ -249,6 +249,8 @@ install(EXPORT flex-targets file(GLOB_RECURSE FILES_NEED_LINT "engines/*.cc" "engines/*.h" + "codegen/*.cc" + "codegen/*.h" "bin/*.cc" "storages/*.h" "storages/*.cc" @@ -256,6 +258,8 @@ file(GLOB_RECURSE FILES_NEED_LINT "otel/*.cc" "test/*.h" "test/*.cc" + "utils/*.h" + "utils/*.cc" EXCEPT "*.act.h" "*.actg.h" "*.autogen.h" "*.autogen.cc") list(FILTER FILES_NEED_LINT EXCLUDE REGEX ".*\.act.h$|.*\.actg.h$|.*\.autogen.h$|.*\.autogen.cc$") # gsa_clformat diff --git a/flex/codegen/src/building_context.h b/flex/codegen/src/building_context.h index b4135943c86f..414b9add95e4 100644 --- a/flex/codegen/src/building_context.h +++ b/flex/codegen/src/building_context.h @@ -126,9 +126,7 @@ struct TagIndMapping { tag_id_2_tag_inds_[tag_id] != -1; } - int32_t GetMaxTagId() const { - return tag_id_2_tag_inds_.size() - 1; - } + int32_t GetMaxTagId() const { return tag_id_2_tag_inds_.size() - 1; } // convert tag_ind (us) to tag ids std::vector tag_ind_2_tag_ids_; diff --git a/flex/codegen/src/string_utils.h b/flex/codegen/src/string_utils.h index 0fd455e732fc..b4a0cfa74f12 100644 --- a/flex/codegen/src/string_utils.h +++ b/flex/codegen/src/string_utils.h @@ -82,7 +82,6 @@ std::string res_alias_to_append_opt(int res_alias, int in_alias) { } } - template std::string ensure_label_id(LabelIdT label_id) { return std::string(LABEL_ID_T_CASTER) + std::string(" ") + diff --git a/flex/engines/graph_db/database/graph_db_operations.h b/flex/engines/graph_db/database/graph_db_operations.h index 78da9ae61802..bc03efbd1cec 100644 --- a/flex/engines/graph_db/database/graph_db_operations.h +++ b/flex/engines/graph_db/database/graph_db_operations.h @@ -19,7 +19,6 @@ #include #include - #include "flex/engines/graph_db/database/graph_db.h" #include "flex/engines/graph_db/database/graph_db_session.h" #include "utils/result.h" diff --git a/flex/engines/graph_db/runtime/common/operators/dedup.h b/flex/engines/graph_db/runtime/common/operators/dedup.h index 37810c701bfa..fbf192232bb5 100644 --- a/flex/engines/graph_db/runtime/common/operators/dedup.h +++ b/flex/engines/graph_db/runtime/common/operators/dedup.h @@ -26,7 +26,6 @@ namespace gs { namespace runtime { - class Dedup { public: static void dedup(const ReadTransaction& txn, Context& ctx, diff --git a/flex/engines/hqps_db/structures/collection.h b/flex/engines/hqps_db/structures/collection.h index 0c9addabc95e..3820d2845959 100644 --- a/flex/engines/hqps_db/structures/collection.h +++ b/flex/engines/hqps_db/structures/collection.h @@ -434,9 +434,7 @@ class CountBuilder { return true; } - Collection Build() { - return Collection(std::move(vec_)); - } + Collection Build() { return Collection(std::move(vec_)); } private: std::vector vec_; diff --git a/flex/interactive/sdk/examples/python/basic_example.py b/flex/interactive/sdk/examples/python/basic_example.py index 76be355e2562..acb478fec44f 100644 --- a/flex/interactive/sdk/examples/python/basic_example.py +++ b/flex/interactive/sdk/examples/python/basic_example.py @@ -15,14 +15,17 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import time import argparse import os +import time + from gs_interactive.client.driver import Driver from gs_interactive.client.session import Session from gs_interactive.models import * -MODERN_GRAPH_CSV_DIR=os.path.join(os.path.dirname(__file__), "../../../../interactive/examples/modern_graph") +MODERN_GRAPH_CSV_DIR = os.path.join( + os.path.dirname(__file__), "../../../../interactive/examples/modern_graph" +) # get current dir test_graph_def = { @@ -72,12 +75,7 @@ } test_graph_datasource = { - "loading_config": { - "data_source" : { - "scheme": "file" - }, - "import_option" : "init" - }, + "loading_config": {"data_source": {"scheme": "file"}, "import_option": "init"}, "vertex_mappings": [ { "type_name": "person", @@ -96,9 +94,7 @@ "source_vertex": "person", "destination_vertex": "person", }, - "inputs": [ - f"@{MODERN_GRAPH_CSV_DIR}/person_knows_person.csv" - ], + "inputs": [f"@{MODERN_GRAPH_CSV_DIR}/person_knows_person.csv"], "source_vertex_mappings": [ {"column": {"index": 0, "name": "person.id"}, "property": "id"} ], @@ -112,6 +108,7 @@ ], } + def createGraph(sess: Session): create_graph_request = CreateGraphRequest.from_dict(test_graph_def) resp = sess.create_graph(create_graph_request) @@ -169,7 +166,9 @@ def addVertex(sess: Session, graph_id: str): if api_response.is_ok(): print("The response of add_vertex:\n", api_response) else: - raise Exception("add_vertex failed with error: %s" % api_response.get_status_message()) + raise Exception( + "add_vertex failed with error: %s" % api_response.get_status_message() + ) def updateVertex(sess: Session, graph_id: str): @@ -182,7 +181,9 @@ def updateVertex(sess: Session, graph_id: str): if api_response.is_ok(): print("The response of update_vertex", api_response) else: - raise Exception("update_vertex failed with error: %s" % api_response.get_status_message()) + raise Exception( + "update_vertex failed with error: %s" % api_response.get_status_message() + ) def getVertex(sess: Session, graph_id: str): @@ -192,7 +193,9 @@ def getVertex(sess: Session, graph_id: str): if api_response.is_ok(): print("The response of get_vertex", api_response) else: - raise Exception("get_vertex failed with error: %s" % api_response.get_status_message()) + raise Exception( + "get_vertex failed with error: %s" % api_response.get_status_message() + ) def updateEdge(sess: Session, graph_id: str): @@ -230,8 +233,9 @@ def getEdge(sess: Session, graph_id: str): if api_response.is_ok(): print("The response of get_edge", api_response) else: - raise Exception("get_edge failed with error: %s" % api_response.get_status_message()) - + raise Exception( + "get_edge failed with error: %s" % api_response.get_status_message() + ) def addEdge(sess: Session, graph_id: str): @@ -257,8 +261,9 @@ def addEdge(sess: Session, graph_id: str): if api_response.is_ok(): print("The response of add_edge", api_response) else: - raise Exception("add_edge failed with error: %s" % api_response.get_status_message()) - + raise Exception( + "add_edge failed with error: %s" % api_response.get_status_message() + ) if __name__ == "__main__": @@ -299,7 +304,7 @@ def addEdge(sess: Session, graph_id: str): ) resp = sess.create_procedure(graph_id, create_proc_request) assert resp.is_ok() - + get_proc_res = sess.get_procedure(graph_id, "test_procedure") assert get_proc_res.is_ok() # Check the description of the procedure @@ -316,11 +321,11 @@ def addEdge(sess: Session, graph_id: str): result = session.run("CALL test_procedure();") for record in result: print(record) - + addVertex(sess, graph_id) getVertex(sess, graph_id) updateVertex(sess, graph_id) - + addEdge(sess, graph_id) getEdge(sess, graph_id) updateEdge(sess, graph_id) diff --git a/flex/interactive/sdk/python/gs_interactive/client/driver.py b/flex/interactive/sdk/python/gs_interactive/client/driver.py index 385748683bd8..dfd5f366b687 100644 --- a/flex/interactive/sdk/python/gs_interactive/client/driver.py +++ b/flex/interactive/sdk/python/gs_interactive/client/driver.py @@ -17,14 +17,8 @@ # import os -import sys -from gremlin_python import statics from gremlin_python.driver.client import Client -from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection -from gremlin_python.process.graph_traversal import __ -from gremlin_python.process.strategies import * -from gremlin_python.structure.graph import Graph from neo4j import GraphDatabase from neo4j import Session as Neo4jSession @@ -33,10 +27,11 @@ class Driver: """ - The main entry point for the Interactive SDK. With the Interactive endpoints provided, you can create a Interactive Session to interact with the Interactive service, + The main entry point for the Interactive SDK. With the Interactive endpoints provided, + you can create a Interactive Session to interact with the Interactive service, and create a Neo4j Session to interact with the Neo4j service. """ - + def __init__( self, admin_endpoint: str = None, @@ -45,8 +40,8 @@ def __init__( gremlin_endpoint: str = None, ): """ - Construct a new driver using the specified endpoints. - If no endpoints are provided, the driver will read them from environment variables. + Construct a new driver using the specified endpoints. + If no endpoints are provided, the driver will read them from environment variables. You will receive the endpoints after starting the Interactive service. Args: @@ -71,7 +66,7 @@ def close(self): """ if self._neo4j_driver is not None: self._neo4j_driver.close() - + def __del__(self): self.close() @@ -96,21 +91,26 @@ def read_endpoints_from_env(self): self._admin_endpoint = os.environ.get("INTERACTIVE_ADMIN_ENDPOINT") assert ( self._admin_endpoint is not None - ), "INTERACTIVE_ADMIN_ENDPOINT is not set, did you forget to export the environment variable after deploying Interactive? see https://graphscope.io/docs/latest/flex/interactive/installation" + ), "INTERACTIVE_ADMIN_ENDPOINT is not set, " + "did you forget to export the environment variable after deploying Interactive?" + " see https://graphscope.io/docs/latest/flex/interactive/installation" self._stored_proc_endpoint = os.environ.get("INTERACTIVE_STORED_PROC_ENDPOINT") if self._stored_proc_endpoint is None: print( - "INTERACTIVE_STORED_PROC_ENDPOINT is not set, will try to get it from service status endpoint" + "INTERACTIVE_STORED_PROC_ENDPOINT is not set," + "will try to get it from service status endpoint" ) self._cypher_endpoint = os.environ.get("INTERACTIVE_CYPHER_ENDPOINT") if self._cypher_endpoint is None: print( - "INTERACTIVE_CYPHER_ENDPOINT is not set, will try to get it from service status endpoint" + "INTERACTIVE_CYPHER_ENDPOINT is not set," + "will try to get it from service status endpoint" ) self._gremlin_endpoint = os.environ.get("INTERACTIVE_GREMLIN_ENDPOINT") if self._gremlin_endpoint is None: print( - "INTERACTIVE_GREMLIN_ENDPOINT is not set, will try to get it from service status endpoint" + "INTERACTIVE_GREMLIN_ENDPOINT is not set," + "will try to get it from service status endpoint" ) def session(self) -> Session: @@ -133,7 +133,8 @@ def getNeo4jSession(self, **config) -> Neo4jSession: """ Create a neo4j session with the specified endpoints. Args: - config: a dictionary of configuration options. The same as the ones in neo4j.Driver.session + config: a dictionary of configuration options, the same as the ones + in neo4j.Driver.session """ return self.getNeo4jSessionImpl(**config) diff --git a/flex/interactive/sdk/python/gs_interactive/client/result.py b/flex/interactive/sdk/python/gs_interactive/client/result.py index 29e25cbe4bb9..38057443bc83 100644 --- a/flex/interactive/sdk/python/gs_interactive/client/result.py +++ b/flex/interactive/sdk/python/gs_interactive/client/result.py @@ -18,8 +18,6 @@ from typing import Generic, TypeVar -from pydantic import Field - from gs_interactive.api_response import ApiResponse from gs_interactive.client.status import Status from gs_interactive.exceptions import ApiException @@ -30,12 +28,14 @@ class Result(Generic[T]): """ - This is a generic class that wraps the result of an operation. It contains the status of the operation and the value returned by the operation. + This is a generic class that wraps the result of an operation, + It contains the status of the operation and the value returned by the operation. """ + def __init__(self, status: Status, value: T): """ Construct a new Result object with the specified status and value. - + Args: status: the status of the operation. value: the value returned by the operation. diff --git a/flex/interactive/sdk/python/gs_interactive/client/session.py b/flex/interactive/sdk/python/gs_interactive/client/session.py index c69e1a19b647..aea6b68c75d1 100644 --- a/flex/interactive/sdk/python/gs_interactive/client/session.py +++ b/flex/interactive/sdk/python/gs_interactive/client/session.py @@ -17,28 +17,41 @@ # from abc import ABCMeta, abstractmethod -from enum import Enum +from typing import Any, List, Optional, Union -from typing_extensions import Annotated -from typing import Any, Dict, List, Optional, Union from pydantic import Field, StrictBytes, StrictStr +from typing_extensions import Annotated -from gs_interactive.client.status import Status, StatusCode - -from gs_interactive.api import * - +from gs_interactive.api import (AdminServiceGraphManagementApi, + AdminServiceJobManagementApi, + AdminServiceProcedureManagementApi, + AdminServiceServiceManagementApi, + GraphServiceEdgeManagementApi, + GraphServiceVertexManagementApi, + QueryServiceApi, UtilsApi) from gs_interactive.api_client import ApiClient from gs_interactive.client.generated.results_pb2 import CollectiveResults from gs_interactive.client.result import Result from gs_interactive.client.status import Status, StatusCode +from gs_interactive.client.utils import InputFormat, append_format_byte from gs_interactive.configuration import Configuration -from gs_interactive.models import * -from gs_interactive.client.utils import append_format_byte, InputFormat -from gs_interactive.client.generated.results_pb2 import CollectiveResults +from gs_interactive.models import (CreateGraphRequest, CreateGraphResponse, + CreateProcedureRequest, + CreateProcedureResponse, EdgeRequest, + GetGraphResponse, GetGraphSchemaResponse, + GetGraphStatisticsResponse, + GetProcedureResponse, JobResponse, + JobStatus, QueryRequest, SchemaMapping, + ServiceStatus, StartServiceRequest, + UpdateProcedureRequest, UploadFileResponse, + VertexEdgeRequest, VertexRequest) + class EdgeInterface(metaclass=ABCMeta): @abstractmethod - def add_edge(self, graph_id: StrictStr, edge_request: List[EdgeRequest]) -> Result[str]: + def add_edge( + self, graph_id: StrictStr, edge_request: List[EdgeRequest] + ) -> Result[str]: raise NotImplementedError @abstractmethod @@ -64,9 +77,7 @@ def delete_edge( def get_edge( self, graph_id: StrictStr, - edge_label: Annotated[ - StrictStr, Field(description="The label name of edge.") - ], + edge_label: Annotated[StrictStr, Field(description="The label name of edge.")], src_label: Annotated[ StrictStr, Field(description="The label name of src vertex.") ], @@ -282,15 +293,15 @@ class DefaultSession(Session): The default session implementation for Interactive SDK. It provides the implementation of all service APIs. """ - + def __init__(self, admin_uri: str, stored_proc_uri: str = None): """ Construct a new session using the specified admin_uri and stored_proc_uri. - + Args: admin_uri (str): the uri for the admin service. - stored_proc_uri (str, optional): the uri for the stored procedure service. If not provided, - the uri will be read from the service status. + stored_proc_uri (str, optional): the uri for the stored procedure service. + If not provided,the uri will be read from the service status. """ self._client = ApiClient(Configuration(host=admin_uri)) @@ -324,7 +335,6 @@ def __exit__(self, exc_type, exc_val, exc_tb): self._client.__exit__(exc_type=exc_type, exc_value=exc_val, traceback=exc_tb) # implementations of the methods from the interfaces - ################ Vertex Interfaces ########## def add_vertex( self, graph_id: StrictStr, @@ -335,11 +345,12 @@ def add_vertex( """ graph_id = self.ensure_param_str("graph_id", graph_id) try: - api_response = self._vertex_api.add_vertex_with_http_info(graph_id, vertex_edge_request) + api_response = self._vertex_api.add_vertex_with_http_info( + graph_id, vertex_edge_request + ) return Result.from_response(api_response) except Exception as e: return Result.from_exception(e) - def delete_vertex( self, @@ -362,38 +373,44 @@ def get_vertex( """ Get a vertex from the specified graph with primary key value. """ - + graph_id = self.ensure_param_str("graph_id", graph_id) try: - api_response = self._vertex_api.get_vertex_with_http_info(graph_id, label, primary_key_value) + api_response = self._vertex_api.get_vertex_with_http_info( + graph_id, label, primary_key_value + ) return Result.from_response(api_response) except Exception as e: return Result.from_exception(e) - def update_vertex( self, graph_id: StrictStr, vertex_request: VertexRequest ) -> Result[str]: """ Update a vertex in the specified graph. """ - + graph_id = self.ensure_param_str("graph_id", graph_id) try: - api_response = self._vertex_api.update_vertex_with_http_info(graph_id, vertex_request) + api_response = self._vertex_api.update_vertex_with_http_info( + graph_id, vertex_request + ) return Result.from_response(api_response) except Exception as e: return Result.from_exception(e) - ################ Edge Interfaces ########## - def add_edge(self, graph_id: StrictStr, edge_request: List[EdgeRequest]) -> Result[str]: + def add_edge( + self, graph_id: StrictStr, edge_request: List[EdgeRequest] + ) -> Result[str]: """ Add an edge to the specified graph. """ - + graph_id = self.ensure_param_str("graph_id", graph_id) try: - api_response = self._edge_api.add_edge_with_http_info(graph_id, edge_request) + api_response = self._edge_api.add_edge_with_http_info( + graph_id, edge_request + ) return Result.from_response(api_response) except Exception as e: return Result.from_exception(e) @@ -419,9 +436,7 @@ def delete_edge( def get_edge( self, graph_id: StrictStr, - edge_label: Annotated[ - StrictStr, Field(description="The label name of edge.") - ], + edge_label: Annotated[StrictStr, Field(description="The label name of edge.")], src_label: Annotated[ StrictStr, Field(description="The label name of src vertex.") ], @@ -438,11 +453,16 @@ def get_edge( """ Get an edge from the specified graph with primary key value. """ - + graph_id = self.ensure_param_str("graph_id", graph_id) try: api_response = self._edge_api.get_edge_with_http_info( - graph_id, edge_label, src_label, src_primary_key_value, dst_label, dst_primary_key_value + graph_id, + edge_label, + src_label, + src_primary_key_value, + dst_label, + dst_primary_key_value, ) return Result.from_response(api_response) except Exception as e: @@ -454,20 +474,21 @@ def update_edge( """ Update an edge in the specified graph. """ - + graph_id = self.ensure_param_str("graph_id", graph_id) try: - api_response = self._edge_api.update_edge_with_http_info(graph_id, edge_request) + api_response = self._edge_api.update_edge_with_http_info( + graph_id, edge_request + ) return Result.from_response(api_response) except Exception as e: return Result.from_exception(e) - ################ Graph Interfaces ########## def create_graph(self, graph: CreateGraphRequest) -> Result[CreateGraphResponse]: """ Create a new graph with the specified graph request. """ - + try: response = self._graph_api.create_graph_with_http_info(graph) return Result.from_response(response) @@ -483,7 +504,8 @@ def get_graph_schema( Parameters: graph_id (str): The ID of the graph whose schema is to be retrieved. Returns: - Result[GetGraphSchemaResponse]: The result containing the schema of the specified graph. + Result[GetGraphSchemaResponse]: The result containing the schema of + the specified graph. """ graph_id = self.ensure_param_str("graph_id", graph_id) try: @@ -499,7 +521,7 @@ def get_graph_meta( """ Get the meta information of a specified graph. """ - + graph_id = self.ensure_param_str("graph_id", graph_id) try: response = self._graph_api.get_graph_with_http_info(graph_id) @@ -514,7 +536,7 @@ def get_graph_statistics( """ Get the statistics of a specified graph. """ - + graph_id = self.ensure_param_str("graph_id", graph_id) try: response = self._graph_api.get_graph_statistic_with_http_info(graph_id) @@ -529,7 +551,7 @@ def delete_graph( """ Delete a graph with the specified graph id. """ - + graph_id = self.ensure_param_str("graph_id", graph_id) try: response = self._graph_api.delete_graph_with_http_info(graph_id) @@ -541,7 +563,7 @@ def list_graphs(self) -> Result[List[GetGraphResponse]]: """ List all graphs. """ - + try: response = self._graph_api.list_graphs_with_http_info() return Result.from_response(response) @@ -556,7 +578,7 @@ def bulk_loading( """ Submit a bulk loading job to the specified graph. """ - + graph_id = self.ensure_param_str("graph_id", graph_id) # First try to upload the input files if they are specified with a starting @ # return a new schema_mapping with the uploaded files @@ -573,14 +595,13 @@ def bulk_loading( except Exception as e: return Result.from_exception(e) - ################ Procedure Interfaces ########## def create_procedure( self, graph_id: StrictStr, procedure: CreateProcedureRequest ) -> Result[CreateProcedureResponse]: """ Create a new procedure in the specified graph. """ - + graph_id = self.ensure_param_str("graph_id", graph_id) try: response = self._procedure_api.create_procedure_with_http_info( @@ -612,7 +633,7 @@ def list_procedures( """ List all procedures in the specified graph. """ - + graph_id = self.ensure_param_str("graph_id", graph_id) try: response = self._procedure_api.list_procedures_with_http_info(graph_id) @@ -641,7 +662,7 @@ def get_procedure( """ Get a procedure in the specified graph. """ - + graph_id = self.ensure_param_str("graph_id", graph_id) try: response = self._procedure_api.get_procedure_with_http_info( @@ -657,14 +678,17 @@ def call_procedure( """ Call a procedure in the specified graph. """ - + graph_id = self.ensure_param_str("graph_id", graph_id) try: - # gs_interactive currently support four type of inputformat, see flex/engines/graph_db/graph_db_session.h + # gs_interactive currently support four type of inputformat, + # see flex/engines/graph_db/graph_db_session.h # Here we add byte of value 1 to denote the input format is in json format response = self._query_api.call_proc_with_http_info( - graph_id = graph_id, - body=append_format_byte(params.to_json().encode(), InputFormat.CYPHER_JSON) + graph_id=graph_id, + body=append_format_byte( + params.to_json().encode(), InputFormat.CYPHER_JSON + ), ) result = CollectiveResults() if response.status_code == 200: @@ -679,12 +703,15 @@ def call_procedure_current(self, params: QueryRequest) -> Result[CollectiveResul """ Call a procedure in the current graph. """ - + try: - # gs_interactive currently support four type of inputformat, see flex/engines/graph_db/graph_db_session.h + # gs_interactive currently support four type of inputformat, + # see flex/engines/graph_db/graph_db_session.h # Here we add byte of value 1 to denote the input format is in json format response = self._query_api.call_proc_current_with_http_info( - body = append_format_byte(params.to_json().encode(), InputFormat.CYPHER_JSON) + body=append_format_byte( + params.to_json().encode(), InputFormat.CYPHER_JSON + ) ) result = CollectiveResults() if response.status_code == 200: @@ -699,14 +726,15 @@ def call_procedure_raw(self, graph_id: StrictStr, params: bytes) -> Result[str]: """ Call a procedure in the specified graph with raw bytes. """ - + graph_id = self.ensure_param_str("graph_id", graph_id) try: - # gs_interactive currently support four type of inputformat, see flex/engines/graph_db/graph_db_session.h + # gs_interactive currently support four type of inputformat, + # see flex/engines/graph_db/graph_db_session.h # Here we add byte of value 1 to denote the input format is in encoder/decoder format response = self._query_api.call_proc_with_http_info( - graph_id = graph_id, - body = append_format_byte(params, InputFormat.CPP_ENCODER) + graph_id=graph_id, + body=append_format_byte(params, InputFormat.CPP_ENCODER), ) return Result.from_response(response) except Exception as e: @@ -716,23 +744,23 @@ def call_procedure_current_raw(self, params: bytes) -> Result[str]: """ Call a procedure in the current graph with raw bytes. """ - + try: - # gs_interactive currently support four type of inputformat, see flex/engines/graph_db/graph_db_session.h + # gs_interactive currently support four type of inputformat, + # see flex/engines/graph_db/graph_db_session.h # Here we add byte of value 1 to denote the input format is in encoder/decoder format response = self._query_api.call_proc_current_with_http_info( - body = append_format_byte(params, InputFormat.CPP_ENCODER) + body=append_format_byte(params, InputFormat.CPP_ENCODER) ) return Result.from_response(response) except Exception as e: return Result.from_exception(e) - ################ QueryService Interfaces ########## def get_service_status(self) -> Result[ServiceStatus]: """ Get the status of the service. """ - + try: response = self._service_api.get_service_status_with_http_info() return Result.from_response(response) @@ -749,7 +777,7 @@ def start_service( """ Start the service on a specified graph. """ - + try: response = self._service_api.start_service_with_http_info( start_service_request @@ -762,7 +790,7 @@ def stop_service(self) -> Result[str]: """ Stop the service. """ - + try: response = self._service_api.stop_service_with_http_info() return Result.from_response(response) @@ -773,19 +801,18 @@ def restart_service(self) -> Result[str]: """ Restart the service. """ - + try: response = self._service_api.restart_service_with_http_info() return Result.from_response(response) except Exception as e: return Result.from_exception(e) - ################ Job Interfaces ########## def get_job(self, job_id: StrictStr) -> Result[JobStatus]: """ Get the status of a job with the specified job id. """ - + job_id = self.ensure_param_str("job_id", job_id) try: response = self._job_api.get_job_by_id_with_http_info(job_id) @@ -797,7 +824,7 @@ def list_jobs(self) -> Result[List[JobResponse]]: """ List all jobs. """ - + try: response = self._job_api.list_jobs_with_http_info() return Result.from_response(response) @@ -808,7 +835,7 @@ def cancel_job(self, job_id: StrictStr) -> Result[str]: """ Cancel a job with the specified job id. """ - + job_id = self.ensure_param_str("job_id", job_id) try: response = self._job_api.delete_job_by_id_with_http_info(job_id) @@ -822,7 +849,7 @@ def upload_file( """ Upload a file to the server. """ - + try: print("uploading file: ", filestorage) response = self._utils_api.upload_file_with_http_info(filestorage) @@ -839,8 +866,10 @@ def upload_file( def trim_path(self, path: str) -> str: return path[1:] if path.startswith("@") else path - - def preprocess_inputs(self, location: str, inputs: List[str], schema_mapping: SchemaMapping): + + def preprocess_inputs( + self, location: str, inputs: List[str], schema_mapping: SchemaMapping + ): root_dir_marked_with_at = False if location and location.startswith("@"): root_dir_marked_with_at = True @@ -851,20 +880,22 @@ def preprocess_inputs(self, location: str, inputs: List[str], schema_mapping: Sc if input.startswith("@"): print( "Root location given without @, but the input file starts with @" - + input + ", index: " + str(i), + + input + + ", index: " + + str(i), ) return Result.error( Status( StatusCode.BAD_REQUEST, "Root location given without @, but the input file starts with @" - + input + ", index: " + str(i), + + input + + ", index: " + + str(i), ), new_inputs, ) if location: - new_inputs.append( - location + "/" + self.trim_path(input) - ) + new_inputs.append(location + "/" + self.trim_path(input)) else: new_inputs.append(input) return Result.ok(new_inputs) @@ -881,7 +912,9 @@ def check_file_mixup(self, schema_mapping: SchemaMapping) -> Result[SchemaMappin if schema_mapping.vertex_mappings: for vertex_mapping in schema_mapping.vertex_mappings: if vertex_mapping.inputs: - preprocess_result = self.preprocess_inputs(location, vertex_mapping.inputs, schema_mapping) + preprocess_result = self.preprocess_inputs( + location, vertex_mapping.inputs, schema_mapping + ) if not preprocess_result.is_ok(): return Result.error(preprocess_result.status, schema_mapping) vertex_mapping.inputs = preprocess_result.get_value() @@ -889,7 +922,9 @@ def check_file_mixup(self, schema_mapping: SchemaMapping) -> Result[SchemaMappin if schema_mapping.edge_mappings: for edge_mapping in schema_mapping.edge_mappings: if edge_mapping.inputs: - preprocess_result = self.preprocess_inputs(location, edge_mapping.inputs, schema_mapping) + preprocess_result = self.preprocess_inputs( + location, edge_mapping.inputs, schema_mapping + ) if not preprocess_result.is_ok(): return Result.error(preprocess_result.status, schema_mapping) edge_mapping.inputs = preprocess_result.get_value() @@ -912,8 +947,9 @@ def upload_and_replace_input_inplace( self, schema_mapping: SchemaMapping ) -> Result[SchemaMapping]: """ - For each input file in schema_mapping, if the file starts with @, upload the file to the server - and replace the path with the path returned from the server. + For each input file in schema_mapping, if the file starts with @, + upload the file to the server, and replace the path with the + path returned from the server. """ if schema_mapping.vertex_mappings: for vertex_mapping in schema_mapping.vertex_mappings: @@ -941,8 +977,9 @@ def try_upload_files(self, schema_mapping: SchemaMapping) -> Result[SchemaMappin for input files in schema_mapping. Replace the path to the uploaded file with the path returned from the server. - The @ can be added to the beginning of data_source.location in schema_mapping.loading_config - or added to each file in vertex_mappings and edge_mappings. + The @ can be added to the beginning of data_source.location + in schema_mapping.loading_config,or added to each file in vertex_mappings + and edge_mappings. 1. location: @/path/to/dir inputs: @@ -964,7 +1001,8 @@ def try_upload_files(self, schema_mapping: SchemaMapping) -> Result[SchemaMappin inputs: - @/path/to/file1 - @/path/to/file2 - Among the above 4 cases, only the 1, 3, 5 case are valid, for 2,4 the file will not be uploaded + Among the above 4 cases, only the 1, 3, 5 case are valid, + for 2,4 the file will not be uploaded """ check_mixup_res = self.check_file_mixup(schema_mapping) @@ -979,7 +1017,7 @@ def try_upload_files(self, schema_mapping: SchemaMapping) -> Result[SchemaMappin print("new schema_mapping: ", upload_res.get_value()) return Result.ok(upload_res.get_value()) - def ensure_param_str(self, param_name : str, param): + def ensure_param_str(self, param_name: str, param): """ Ensure the param is a string, otherwise raise an exception """ @@ -987,5 +1025,10 @@ def ensure_param_str(self, param_name : str, param): # User may input the graph_id as int, convert it to string if isinstance(param, int): return str(param) - raise Exception("param should be a string, param_name: " + param_name + ", param: " + str(param)) + raise Exception( + "param should be a string, param_name: " + + param_name + + ", param: " + + str(param) + ) return param diff --git a/flex/interactive/sdk/python/gs_interactive/client/status.py b/flex/interactive/sdk/python/gs_interactive/client/status.py index 83350afad440..3dcabb119347 100644 --- a/flex/interactive/sdk/python/gs_interactive/client/status.py +++ b/flex/interactive/sdk/python/gs_interactive/client/status.py @@ -16,18 +16,11 @@ # limitations under the License. # - -from enum import Enum - from gs_interactive.api_response import ApiResponse -from gs_interactive.exceptions import ( - ApiException, - BadRequestException, - ForbiddenException, - NotFoundException, - ServiceException, -) from gs_interactive.client.generated.interactive_pb2 import Code as StatusCode +from gs_interactive.exceptions import (ApiException, BadRequestException, + ForbiddenException, NotFoundException, + ServiceException) from gs_interactive.models.api_response_with_code import APIResponseWithCode @@ -35,11 +28,11 @@ class Status: """ This class represents the status of an operation. It contains the status code and the message. """ - + def __init__(self, status: StatusCode, message: str): """ Construct a new Status object with the specified status code and message. - + Args: status (StatusCode): the returnd code of the operation. message (str): the message returned by the operation. @@ -57,7 +50,7 @@ def is_ok(self) -> bool: """ Whether the operation is successful. """ - + return self.status == StatusCode.OK def is_error(self) -> bool: @@ -65,12 +58,12 @@ def is_error(self) -> bool: Whether the operation is failed. """ return self.status != StatusCode.OK - + def get_code(self): """ Get the status code returned by the operation. """ - + return self.status @property @@ -78,7 +71,7 @@ def get_message(self): """ Get the message returned by the operation. """ - + return self.message # static method create a server internal error object @@ -87,7 +80,7 @@ def server_internal_error(message: str): """ Create a server internal error object with the specified message. """ - + return Status(StatusCode.INTERNAL_ERROR, message) @staticmethod @@ -95,7 +88,7 @@ def from_exception(exception: ApiException): """ Create a Status object from an ApiException. """ - + # mapping from ApiException to StatusCode print("exception: ", exception) if isinstance(exception, BadRequestException): @@ -105,7 +98,7 @@ def from_exception(exception: ApiException): elif isinstance(exception, NotFoundException): return Status(StatusCode.NOT_FOUND, exception.body) elif isinstance(exception, ServiceException): - if (exception.status == 503): + if exception.status == 503: return Status(StatusCode.SERVICE_UNAVAILABLE, exception.body) else: return Status(StatusCode.INTERNAL_ERROR, exception.body) @@ -118,7 +111,7 @@ def from_response(response: ApiResponse): """ Create a Status object from an ApiResponse. """ - + # mapping from ApiResponse to StatusCode if response.status_code == 200: return Status(StatusCode.OK, "OK") @@ -126,7 +119,9 @@ def from_response(response: ApiResponse): # If the status_code is not 200, we expect APIReponseWithCode returned from server api_response_with_code = response.data if isinstance(api_response_with_code, APIResponseWithCode): - return Status(api_response_with_code.code, api_response_with_code.message) + return Status( + api_response_with_code.code, api_response_with_code.message + ) return Status(StatusCode.UNKNOWN, "Unknown Error") @staticmethod @@ -134,5 +129,5 @@ def ok(): """ Create a successful status object. """ - + return Status(StatusCode.OK, "OK") diff --git a/flex/interactive/sdk/python/gs_interactive/client/utils.py b/flex/interactive/sdk/python/gs_interactive/client/utils.py index 6bf3a4b89e72..caf3b028cec2 100644 --- a/flex/interactive/sdk/python/gs_interactive/client/utils.py +++ b/flex/interactive/sdk/python/gs_interactive/client/utils.py @@ -22,146 +22,162 @@ class Encoder: """ A simple encoder to encode the data into bytes """ - - def __init__(self, endian = 'little') -> None: + + def __init__(self, endian="little") -> None: self.byte_array = bytearray() self.endian = endian - + def put_int(self, value: int): """ Put an integer into the byte array, 4 bytes """ - + self.byte_array.extend(value.to_bytes(4, byteorder=self.endian)) - + def put_long(self, value: int): """ Put a long integer into the byte array, 8 bytes """ self.byte_array.extend(value.to_bytes(8, byteorder=self.endian)) - + def put_string(self, value: str): """ Put a string into the byte array, first put the length of the string, then the string """ - + self.put_int(len(value)) - self.byte_array.extend(value.encode('utf-8')) - + self.byte_array.extend(value.encode("utf-8")) + def put_byte(self, value: int): """ Put a single byte into the byte array """ - + self.byte_array.extend(value.to_bytes(1, byteorder=self.endian)) - + def put_bytes(self, value: bytes): """ Put a byte array into the byte array """ - + self.byte_array.extend(value) - + def put_double(self, value: float): """ Put a double into the byte array, 8 bytes """ - + self.byte_array.extend(value.to_bytes(8, byteorder=self.endian)) - + def get_bytes(self): """ Get the bytes from the byte array """ - + # return bytes not bytearray return bytes(self.byte_array) + class Decoder: """ A simple decoder to decode the bytes into data """ - - def __init__(self, byte_array: bytearray,endian = 'little') -> None: + + def __init__(self, byte_array: bytearray, endian="little") -> None: self.byte_array = byte_array self.index = 0 self.endian = endian - + def get_int(self) -> int: """ Get an integer from the byte array, 4 bytes returns: int """ - value = int.from_bytes(self.byte_array[self.index:self.index+4], byteorder=self.endian) + value = int.from_bytes( + self.byte_array[self.index : self.index + 4], # noqa E203 + byteorder=self.endian, + ) self.index += 4 return value - + def get_long(self) -> int: """ Get a long integer from the byte array, 8 bytes - + returns: int """ - value = int.from_bytes(self.byte_array[self.index:self.index+8], byteorder=self.endian) + value = int.from_bytes( + self.byte_array[self.index : self.index + 8], # noqa E203 + byteorder=self.endian, + ) self.index += 8 return value - + def get_double(self) -> float: """ Get a double from the byte array, 8 bytes - + returns: float """ - value = float.from_bytes(self.byte_array[self.index:self.index+8], byteorder=self.endian) + value = float.from_bytes( + self.byte_array[self.index : self.index + 8], # noqa E203 + byteorder=self.endian, + ) self.index += 8 return value - + def get_byte(self) -> int: """ Get a single byte from the byte array - + returns: int """ - - value = int.from_bytes(self.byte_array[self.index:self.index+1], byteorder=self.endian) + + value = int.from_bytes( + self.byte_array[self.index : self.index + 1], # noqa E203 + byteorder=self.endian, + ) self.index += 1 return value - + def get_bytes(self, length: int) -> bytes: """ Get a byte array from the byte array - + returns: A byte array """ - value = self.byte_array[self.index:self.index+length] + value = self.byte_array[self.index : self.index + length] # noqa E203 self.index += length return value - + def get_string(self) -> str: """ Get a string from the byte array, first get the length of the string, then the string - + returns: str """ length = self.get_int() - value = self.byte_array[self.index:self.index+length].decode('utf-8') + value = self.byte_array[self.index : self.index + length].decode( # noqa E203 + "utf-8" + ) self.index += length return value - + def is_empty(self) -> bool: """ Whether the byte array is empty """ - + return self.index == len(self.byte_array) - + class InputFormat(Enum): - CPP_ENCODER = 0 # raw bytes encoded by encoder/decoder - CYPHER_JSON = 1 # json format string - CYPHER_PROTO_ADHOC = 2 # protobuf adhoc bytes - CYPHER_PROTO_PROCEDURE = 3 # protobuf procedure bytes + CPP_ENCODER = 0 # raw bytes encoded by encoder/decoder + CYPHER_JSON = 1 # json format string + CYPHER_PROTO_ADHOC = 2 # protobuf adhoc bytes + CYPHER_PROTO_PROCEDURE = 3 # protobuf procedure bytes + def append_format_byte(input: bytes, input_format: InputFormat): """ @@ -169,4 +185,3 @@ def append_format_byte(input: bytes, input_format: InputFormat): """ new_bytes = input + bytes([input_format.value]) return new_bytes - \ No newline at end of file diff --git a/flex/interactive/sdk/python/gs_interactive/models/long_text.py b/flex/interactive/sdk/python/gs_interactive/models/long_text.py index 545999d64ed0..dfda59f4b055 100644 --- a/flex/interactive/sdk/python/gs_interactive/models/long_text.py +++ b/flex/interactive/sdk/python/gs_interactive/models/long_text.py @@ -14,15 +14,16 @@ from __future__ import annotations + +import json import pprint import re # noqa: F401 -import json +from typing import Any, ClassVar, Dict, List, Optional, Set from pydantic import BaseModel, ConfigDict, StrictStr -from typing import Any, ClassVar, Dict, List, Optional -from typing import Optional, Set from typing_extensions import Self + class LongText(BaseModel): """ LongText diff --git a/flex/interactive/sdk/python/gs_interactive/tests/__init__.py b/flex/interactive/sdk/python/gs_interactive/tests/__init__.py index 9a43cecbaa96..618dda69a7be 100644 --- a/flex/interactive/sdk/python/gs_interactive/tests/__init__.py +++ b/flex/interactive/sdk/python/gs_interactive/tests/__init__.py @@ -15,4 +15,3 @@ # See the License for the specific language governing permissions and # limitations under the License. # - diff --git a/flex/interactive/sdk/python/gs_interactive/tests/conftest.py b/flex/interactive/sdk/python/gs_interactive/tests/conftest.py index b4402cbae0bd..07f5799ed974 100644 --- a/flex/interactive/sdk/python/gs_interactive/tests/conftest.py +++ b/flex/interactive/sdk/python/gs_interactive/tests/conftest.py @@ -16,20 +16,23 @@ # limitations under the License. # -import time -from gs_interactive.client.driver import Driver -from gs_interactive.models import * -from gs_interactive.client.status import StatusCode -from gs_interactive.client.session import Session -from neo4j import Session as Neo4jSession -import pytest - # get the directory of the current file import os -import sys +import time -cur_dir=os.path.dirname(os.path.abspath(__file__)) -MODERN_GRAPH_DATA_DIR=os.path.abspath(os.path.join(cur_dir, "../../../../examples/modern_graph")) +import pytest +from neo4j import Session as Neo4jSession + +from gs_interactive.client.driver import Driver +from gs_interactive.client.session import Session +from gs_interactive.models import (CreateGraphRequest, CreateProcedureRequest, + SchemaMapping, StartServiceRequest, + UpdateProcedureRequest) + +cur_dir = os.path.dirname(os.path.abspath(__file__)) +MODERN_GRAPH_DATA_DIR = os.path.abspath( + os.path.join(cur_dir, "../../../../examples/modern_graph") +) print("MODERN_GRAPH_DATA_DIR: ", MODERN_GRAPH_DATA_DIR) @@ -73,7 +76,7 @@ }, ], "primary_keys": ["id"], - } + }, ], "edge_types": [ { @@ -109,7 +112,7 @@ } ], "primary_keys": [], - } + }, ], }, } @@ -191,77 +194,61 @@ modern_graph_full_import_config = { "loading_config": { - "data_source": { - "scheme": "file", - "location": "@" + MODERN_GRAPH_DATA_DIR - }, + "data_source": {"scheme": "file", "location": "@" + MODERN_GRAPH_DATA_DIR}, "import_option": "init", "format": { "type": "csv", "metadata": { "delimiter": "|", - } - } + }, + }, }, "vertex_mappings": [ { "type_name": "person", - "inputs": [ - "person.csv" - ], + "inputs": ["person.csv"], }, { "type_name": "software", - "inputs": [ - "software.csv" - ], - } + "inputs": ["software.csv"], + }, ], "edge_mappings": [ { "type_triplet": { "edge": "knows", "source_vertex": "person", - "destination_vertex": "person" + "destination_vertex": "person", }, - "inputs": [ - "person_knows_person.csv" - ], + "inputs": ["person_knows_person.csv"], }, { "type_triplet": { "edge": "created", "source_vertex": "person", - "destination_vertex": "software" + "destination_vertex": "software", }, - "inputs": [ - "person_created_software.csv" - ], - } - ] + "inputs": ["person_created_software.csv"], + }, + ], } modern_graph_partial_import_config = { "loading_config": { - "data_source": { - "scheme": "file", - "location": "@" + MODERN_GRAPH_DATA_DIR - }, + "data_source": {"scheme": "file", "location": "@" + MODERN_GRAPH_DATA_DIR}, "import_option": "init", "format": { "type": "csv", "metadata": { "delimiter": "|", - } - } + }, + }, }, "vertex_mappings": [ { "type_name": "person", - "inputs": [ - "person.csv" - ], + "inputs": ["person.csv"], }, ], "edge_mappings": [ @@ -269,41 +256,36 @@ "type_triplet": { "edge": "knows", "source_vertex": "person", - "destination_vertex": "person" + "destination_vertex": "person", }, - "inputs": [ - "person_knows_person.csv" - ], + "inputs": ["person_knows_person.csv"], } - ] + ], } modern_graph_vertex_only_import_config = { "loading_config": { - "data_source": { - "scheme": "file", - "location": "@" + MODERN_GRAPH_DATA_DIR - }, + "data_source": {"scheme": "file", "location": "@" + MODERN_GRAPH_DATA_DIR}, "import_option": "init", "format": { "type": "csv", "metadata": { "delimiter": "|", - } - } + }, + }, }, "vertex_mappings": [ { "type_name": "person", - "inputs": [ - "person.csv" - ], + "inputs": ["person.csv"], } - ] + ], } + + @pytest.fixture(scope="module") def interactive_driver(): - driver = Driver() + driver = Driver() yield driver driver.close() @@ -312,12 +294,14 @@ def interactive_driver(): def interactive_session(interactive_driver): yield interactive_driver.session() + @pytest.fixture(scope="module") def neo4j_session(interactive_driver): - _neo4j_sess = interactive_driver.getNeo4jSession() + _neo4j_sess = interactive_driver.getNeo4jSession() yield _neo4j_sess _neo4j_sess.close() + @pytest.fixture(scope="module") def create_modern_graph(interactive_session): create_graph_request = CreateGraphRequest.from_dict(modern_graph_full) @@ -337,6 +321,7 @@ def create_vertex_only_modern_graph(interactive_session): yield graph_id delete_running_graph(interactive_session, graph_id) + @pytest.fixture(scope="module") def create_partial_modern_graph(interactive_session): create_graph_request = CreateGraphRequest.from_dict(modern_graph_partial) @@ -346,7 +331,8 @@ def create_partial_modern_graph(interactive_session): yield graph_id delete_running_graph(interactive_session, graph_id) -def wait_job_finish(sess : Session, job_id: str): + +def wait_job_finish(sess: Session, job_id: str): assert job_id is not None while True: resp = sess.get_job(job_id) @@ -360,6 +346,7 @@ def wait_job_finish(sess : Session, job_id: str): else: time.sleep(1) + def import_data_to_vertex_only_modern_graph(sess: Session, graph_id: str): schema_mapping = SchemaMapping.from_dict(modern_graph_vertex_only_import_config) resp = sess.bulk_loading(graph_id, schema_mapping) @@ -367,6 +354,7 @@ def import_data_to_vertex_only_modern_graph(sess: Session, graph_id: str): job_id = resp.get_value().job_id assert wait_job_finish(sess, job_id) + def import_data_to_partial_modern_graph(sess: Session, graph_id: str): schema_mapping = SchemaMapping.from_dict(modern_graph_partial_import_config) resp = sess.bulk_loading(graph_id, schema_mapping) @@ -374,6 +362,7 @@ def import_data_to_partial_modern_graph(sess: Session, graph_id: str): job_id = resp.get_value().job_id assert wait_job_finish(sess, job_id) + def import_data_to_full_modern_graph(sess: Session, graph_id: str): schema_mapping = SchemaMapping.from_dict(modern_graph_full_import_config) resp = sess.bulk_loading(graph_id, schema_mapping) @@ -381,21 +370,25 @@ def import_data_to_full_modern_graph(sess: Session, graph_id: str): job_id = resp.get_value().job_id assert wait_job_finish(sess, job_id) -def submit_query_via_neo4j_endpoint(neo4j_sess : Neo4jSession, graph_id: str, query: str): + +def submit_query_via_neo4j_endpoint( + neo4j_sess: Neo4jSession, graph_id: str, query: str +): result = neo4j_sess.run(query) - #check have 1 records, result 0 + # check have 1 records, result 0 result_cnt = 0 for record in result: print("record: ", record) result_cnt += 1 - print("result count: ", result_cnt , " for query ", query) - + print("result count: ", result_cnt, " for query ", query) -def run_cypher_test_suite(neo4j_sess : Neo4jSession, graph_id: str, queries: list[str]): + +def run_cypher_test_suite(neo4j_sess: Neo4jSession, graph_id: str, queries: list[str]): for query in queries: submit_query_via_neo4j_endpoint(neo4j_sess, graph_id, query) -def call_procedure(neo4j_sess : Neo4jSession, graph_id: str, proc_name: str, *args): + +def call_procedure(neo4j_sess: Neo4jSession, graph_id: str, proc_name: str, *args): query = "CALL " + proc_name + "(" + ",".join(args) + ")" result = neo4j_sess.run(query) for record in result: @@ -421,36 +414,45 @@ def delete_running_graph(sess: Session, graph_id: str): resp = sess.delete_graph(graph_id) assert resp.is_ok() -def create_procedure(sess: Session, graph_id: str, name: str, query: str, description = "test proc"): + +def create_procedure( + sess: Session, graph_id: str, name: str, query: str, description="test proc" +): request = CreateProcedureRequest( - name=name, - description=description, - type="cypher", - query=query) + name=name, description=description, type="cypher", query=query + ) resp = sess.create_procedure(graph_id, request) if not resp.is_ok(): print("Failed to create procedure: ", resp.get_status_message()) - raise Exception("Failed to create procedure, status: ", resp.get_status_message()) + raise Exception( + "Failed to create procedure, status: ", resp.get_status_message() + ) proc_id = resp.get_value().procedure_id return proc_id + def delete_procedure(sess: Session, graph_id: str, proc_id: str): resp = sess.delete_procedure(graph_id, proc_id) if not resp.is_ok(): print("Failed to delete procedure: ", resp.get_status_message()) - raise Exception("Failed to delete procedure, status: ", resp.get_status_message()) + raise Exception( + "Failed to delete procedure, status: ", resp.get_status_message() + ) + -def update_procedure(sess: Session, graph_id: str, proc_id: str, desc : str): - request = UpdateProcedureRequest( - description=desc) +def update_procedure(sess: Session, graph_id: str, proc_id: str, desc: str): + request = UpdateProcedureRequest(description=desc) resp = sess.update_procedure(graph_id, proc_id, request) if not resp.is_ok(): print("Failed to update procedure: ", resp.get_status_message()) - raise Exception("Failed to update procedure, status: ", resp.get_status_message()) + raise Exception( + "Failed to update procedure, status: ", resp.get_status_message() + ) + -def start_service_on_graph(interactive_session, graph_id : str): +def start_service_on_graph(interactive_session, graph_id: str): resp = interactive_session.start_service(StartServiceRequest(graph_id=graph_id)) assert resp.is_ok() # wait one second to let compiler get the new graph - time.sleep(1) \ No newline at end of file + time.sleep(1) diff --git a/flex/interactive/sdk/python/gs_interactive/tests/test_driver.py b/flex/interactive/sdk/python/gs_interactive/tests/test_driver.py index 7db0830e2a56..be1ae1dcd163 100644 --- a/flex/interactive/sdk/python/gs_interactive/tests/test_driver.py +++ b/flex/interactive/sdk/python/gs_interactive/tests/test_driver.py @@ -18,16 +18,24 @@ import os +import sys import time import unittest -import sys sys.path.append(os.path.join(os.path.dirname(__file__), "../../")) -from gs_interactive.client.driver import Driver -from gs_interactive.models import * -from gs_interactive.client.status import StatusCode - +from gs_interactive.client.driver import Driver # noqa: E402 +from gs_interactive.client.status import StatusCode # noqa: E402 +from gs_interactive.models import ( # noqa: E402 + BaseEdgeTypeVertexTypePairRelationsInner, CreateEdgeType, + CreateGraphRequest, CreateGraphSchemaRequest, CreateProcedureRequest, + CreatePropertyMeta, CreateVertexType, EdgeMapping, EdgeMappingTypeTriplet, + EdgeRequest, GSDataType, LongText, ModelProperty, PrimitiveType, + QueryRequest, SchemaMapping, SchemaMappingLoadingConfig, + SchemaMappingLoadingConfigDataSource, SchemaMappingLoadingConfigFormat, + SchemaMappingLoadingConfigXCsrParams, StartServiceRequest, StringType, + StringTypeString, TypedValue, VertexEdgeRequest, VertexMapping, + VertexRequest) test_graph_def = { "name": "modern_graph", @@ -43,7 +51,7 @@ }, { "property_name": "name", - "property_type": {"string": {"var_char": {"max_length" : 16}}}, + "property_type": {"string": {"var_char": {"max_length": 16}}}, }, { "property_name": "age", @@ -75,6 +83,7 @@ }, } + class TestDriver(unittest.TestCase): """Test usage of driver""" @@ -130,7 +139,7 @@ def test_example(self): # test stop the service, and submit queries self.queryWithServiceStop() self.createDriver() - + def createGraphFromDict(self): create_graph_request = CreateGraphRequest.from_dict(test_graph_def) resp = self._sess.create_graph(create_graph_request) @@ -306,13 +315,13 @@ def bulkLoadingFailure(self): assert resp.is_ok() job_id = resp.get_value().job_id # Expect to fail - assert self.waitJobFinish(job_id) == False + assert not self.waitJobFinish(job_id) def list_graph(self): resp = self._sess.list_graphs() assert resp.is_ok() print("list graph: ", resp.get_value()) - + def get_graph_meta(self): resp = self._sess.get_graph_meta(self._graph_id) assert resp.is_ok() @@ -321,9 +330,8 @@ def get_graph_meta(self): resp = self._sess.get_graph_meta(1) assert resp.is_ok() # Now test calling with a invalid value, will raise exception - with self.assertRaises(Exception) as context: - resp = self._sess.get_graph_meta([1,2,3]) - + with self.assertRaises(Exception): + resp = self._sess.get_graph_meta([1, 2, 3]) def runCypherQuery(self): query = "MATCH (n) RETURN COUNT(n);" @@ -485,7 +493,7 @@ def callProcedure(self): with self._driver.getNeo4jSession() as session: result = session.run("CALL test_procedure();") print("call procedure result: ", result) - + def callPrcedureWithServiceStop(self): # stop service print("stop service: ") @@ -493,9 +501,9 @@ def callPrcedureWithServiceStop(self): assert stop_res.is_ok() # call procedure on stopped service should raise exception with self._driver.getNeo4jSession() as session: - with self.assertRaises(Exception) as context: + with self.assertRaises(Exception): result = session.run("CALL test_procedure();") - # start service + print("call procedure result: ", result) print("start service: ") start_res = self._sess.start_service( start_service_request=StartServiceRequest(graph_id=self._graph_id) diff --git a/flex/interactive/sdk/python/gs_interactive/tests/test_robustness.py b/flex/interactive/sdk/python/gs_interactive/tests/test_robustness.py index 093dc03b0aca..a1e23467ebe8 100644 --- a/flex/interactive/sdk/python/gs_interactive/tests/test_robustness.py +++ b/flex/interactive/sdk/python/gs_interactive/tests/test_robustness.py @@ -20,17 +20,14 @@ import sys import pytest -sys.path.append(os.path.join(os.path.dirname(__file__), "../../")) - -from gs_interactive.client.driver import Driver -from gs_interactive.client.session import Session -from gs_interactive.models import * - -from gs_interactive.tests.conftest import create_vertex_only_modern_graph, start_service_on_graph,interactive_driver -from gs_interactive.tests.conftest import create_procedure,delete_procedure,update_procedure, delete_running_graph, create_modern_graph, create_partial_modern_graph,run_cypher_test_suite, call_procedure -from gs_interactive.tests.conftest import import_data_to_vertex_only_modern_graph, import_data_to_partial_modern_graph, import_data_to_full_modern_graph +sys.path.append(os.path.join(os.path.dirname(__file__), "../../")) +from gs_interactive.tests.conftest import ( # noqa: E402 + call_procedure, create_procedure, delete_procedure, + import_data_to_full_modern_graph, import_data_to_partial_modern_graph, + import_data_to_vertex_only_modern_graph, run_cypher_test_suite, + start_service_on_graph, update_procedure) vertex_only_cypher_queries = [ "MATCH(n) return count(n)", @@ -40,25 +37,38 @@ # extend the query list to include queries that are not supported by vertex-only graph cypher_queries = vertex_only_cypher_queries + [ - #"MATCH()-[e]->() return count(e)", # currently not supported by compiler+ffi, see https://github.com/alibaba/GraphScope/issues/4192 + # "MATCH()-[e]->() return count(e)", currently not supported by compiler+ffi, + # see https://github.com/alibaba/GraphScope/issues/4192 "MATCH(a)-[b]->(c) return count(b)", "MATCH(a)-[b]->(c) return b", "MATCH(a)-[b]->(c) return c.id", ] -def test_query_on_vertex_only_graph(interactive_session, neo4j_session, create_vertex_only_modern_graph): + +def test_query_on_vertex_only_graph( + interactive_session, neo4j_session, create_vertex_only_modern_graph +): """ Test Query on a graph with only a vertex-only schema defined, no data is imported. """ print("[Query on vertex only graph]") start_service_on_graph(interactive_session, create_vertex_only_modern_graph) - run_cypher_test_suite(neo4j_session, create_vertex_only_modern_graph, vertex_only_cypher_queries) - - start_service_on_graph(interactive_session,"1") - import_data_to_vertex_only_modern_graph(interactive_session, create_vertex_only_modern_graph) - run_cypher_test_suite(neo4j_session, create_vertex_only_modern_graph, vertex_only_cypher_queries) - -def test_query_on_partial_graph(interactive_session,neo4j_session, create_partial_modern_graph): + run_cypher_test_suite( + neo4j_session, create_vertex_only_modern_graph, vertex_only_cypher_queries + ) + + start_service_on_graph(interactive_session, "1") + import_data_to_vertex_only_modern_graph( + interactive_session, create_vertex_only_modern_graph + ) + run_cypher_test_suite( + neo4j_session, create_vertex_only_modern_graph, vertex_only_cypher_queries + ) + + +def test_query_on_partial_graph( + interactive_session, neo4j_session, create_partial_modern_graph +): """ Test Query on a graph with the partial schema of modern graph defined, no data is imported. """ @@ -67,38 +77,59 @@ def test_query_on_partial_graph(interactive_session,neo4j_session, create_partia start_service_on_graph(interactive_session, create_partial_modern_graph) # try to query on the graph run_cypher_test_suite(neo4j_session, create_partial_modern_graph, cypher_queries) - start_service_on_graph(interactive_session,"1") - import_data_to_partial_modern_graph(interactive_session, create_partial_modern_graph) + start_service_on_graph(interactive_session, "1") + import_data_to_partial_modern_graph( + interactive_session, create_partial_modern_graph + ) run_cypher_test_suite(neo4j_session, create_partial_modern_graph, cypher_queries) - -def test_query_on_full_modern_graph(interactive_session, neo4j_session, create_modern_graph): + + +def test_query_on_full_modern_graph( + interactive_session, neo4j_session, create_modern_graph +): """ Test Query on a graph with full schema of modern graph defined, no data is imported. """ print("[Query on full modern graph]") - start_service_on_graph(interactive_session,create_modern_graph) + start_service_on_graph(interactive_session, create_modern_graph) # try to query on the graph run_cypher_test_suite(neo4j_session, create_modern_graph, cypher_queries) - start_service_on_graph(interactive_session,"1") + start_service_on_graph(interactive_session, "1") import_data_to_full_modern_graph(interactive_session, create_modern_graph) run_cypher_test_suite(neo4j_session, create_modern_graph, cypher_queries) - -def test_service_switching(interactive_session,neo4j_session, create_modern_graph, create_vertex_only_modern_graph ): + +def test_service_switching( + interactive_session, + neo4j_session, + create_modern_graph, + create_vertex_only_modern_graph, +): """ - Create a procedure on graph a, and create graph b, and create a procedure with same procedure name. + Create a procedure on graph a, and create graph b, and + create a procedure with same procedure name. Then restart graph on b, and query on graph a's procedure a. """ print("[Cross query]") # create procedure on graph_a_id - a_proc_id = create_procedure(interactive_session, create_modern_graph, "test_proc", "MATCH(n: software) return count(n);") + a_proc_id = create_procedure( + interactive_session, + create_modern_graph, + "test_proc", + "MATCH(n: software) return count(n);", + ) print("Procedure id: ", a_proc_id) start_service_on_graph(interactive_session, create_modern_graph) call_procedure(neo4j_session, create_modern_graph, a_proc_id) # create procedure on graph_b_id - b_proc_id = create_procedure(interactive_session, create_vertex_only_modern_graph, "test_proc", "MATCH(n: person) return count(n);") + b_proc_id = create_procedure( + interactive_session, + create_vertex_only_modern_graph, + "test_proc", + "MATCH(n: person) return count(n);", + ) start_service_on_graph(interactive_session, create_vertex_only_modern_graph) call_procedure(neo4j_session, create_vertex_only_modern_graph, b_proc_id) @@ -106,33 +137,60 @@ def test_service_switching(interactive_session,neo4j_session, create_modern_grap def test_procedure_creation(interactive_session, neo4j_session, create_modern_graph): print("[Test procedure creation]") - # create procedure with description contains spaces,',', and special characters '!','@','#','$','%','^','&','*','(',')' - a_proc_id = create_procedure(interactive_session, create_modern_graph, "test_proc_1", "MATCH(n: software) return count(n);", "This is a test procedure, with special characters: !@#$%^&*()") + # create procedure with description contains spaces,',', and + # special characters '!','@','#','$','%','^','&','*','(',')' + a_proc_id = create_procedure( + interactive_session, + create_modern_graph, + "test_proc_1", + "MATCH(n: software) return count(n);", + "This is a test procedure, with special characters: !@#$%^&*()", + ) print("Procedure id: ", a_proc_id) start_service_on_graph(interactive_session, create_modern_graph) call_procedure(neo4j_session, create_modern_graph, a_proc_id) - # create procedure with name containing space, should fail, expect to raise exception + # create procedure with name containing space, + # should fail, expect to raise exception with pytest.raises(Exception): - create_procedure(interactive_session, create_modern_graph, "test proc", "MATCH(n: software) return count(n);") - + create_procedure( + interactive_session, + create_modern_graph, + "test proc", + "MATCH(n: software) return count(n);", + ) # create procedure with invalid cypher query, should fail, expect to raise exception with pytest.raises(Exception): - create_procedure(interactive_session, create_modern_graph, "test_proc2", "MATCH(n: IDONTKOWN) return count(n)") + create_procedure( + interactive_session, + create_modern_graph, + "test_proc2", + "MATCH(n: IDONTKOWN) return count(n)", + ) + -def test_builtin_procedure(interactive_session,neo4j_session, create_modern_graph): +def test_builtin_procedure(interactive_session, neo4j_session, create_modern_graph): print("[Test builtin procedure]") # Delete the builtin procedure should fail with pytest.raises(Exception): delete_procedure(interactive_session, create_modern_graph, "count_vertices") # Create a procedure with the same name as builtin procedure should fail with pytest.raises(Exception): - create_procedure(interactive_session, create_modern_graph, "count_vertices", "MATCH(n: software) return count(n);") + create_procedure( + interactive_session, + create_modern_graph, + "count_vertices", + "MATCH(n: software) return count(n);", + ) # Update the builtin procedure should fail with pytest.raises(Exception): - update_procedure(interactive_session, create_modern_graph, "count_vertices", "A updated description") + update_procedure( + interactive_session, + create_modern_graph, + "count_vertices", + "A updated description", + ) # Call the builtin procedure start_service_on_graph(interactive_session, create_modern_graph) call_procedure(neo4j_session, create_modern_graph, "count_vertices", '"person"') - diff --git a/flex/interactive/sdk/python/gs_interactive/tests/test_utils.py b/flex/interactive/sdk/python/gs_interactive/tests/test_utils.py index a3c84a8cd936..58eb7766b618 100644 --- a/flex/interactive/sdk/python/gs_interactive/tests/test_utils.py +++ b/flex/interactive/sdk/python/gs_interactive/tests/test_utils.py @@ -17,17 +17,14 @@ # import os -import time +import sys import unittest -import pytest - -import sys sys.path.append(os.path.join(os.path.dirname(__file__), "../../")) -from gs_interactive.client.driver import Driver -from gs_interactive.models import * -from gs_interactive.client.utils import InputFormat, append_format_byte +from gs_interactive.client.utils import InputFormat # noqa: E402 +from gs_interactive.client.utils import append_format_byte + class TestUtils(unittest.TestCase): def setUp(self): @@ -35,9 +32,11 @@ def setUp(self): def tearDown(self): pass - + def test_append_format_byte(self): input = "hello" - new_bytes = append_format_byte(input.encode(), input_format=InputFormat.CPP_ENCODER) - self.assertEqual(new_bytes, b'hello\x00') + new_bytes = append_format_byte( + input.encode(), input_format=InputFormat.CPP_ENCODER + ) + self.assertEqual(new_bytes, b"hello\x00") self.assertEqual(len(new_bytes), len(input) + 1) diff --git a/flex/interactive/sdk/python/setup.py b/flex/interactive/sdk/python/setup.py index 8e4d2ae97a7e..93c43e8ad623 100644 --- a/flex/interactive/sdk/python/setup.py +++ b/flex/interactive/sdk/python/setup.py @@ -51,7 +51,7 @@ def initialize_options(self): def finalize_options(self): pass - def generate_proto(self, proto_path, output_dir, proto_files = None): + def generate_proto(self, proto_path, output_dir, proto_files=None): if proto_files is None: proto_files = glob.glob(os.path.join(proto_path, "*.proto")) os.makedirs(output_dir, exist_ok=True) @@ -74,8 +74,16 @@ def generate_proto(self, proto_path, output_dir, proto_files = None): ) def run(self): - self.generate_proto("../../../../interactive_engine/executor/ir/proto/", "./gs_interactive/client/generated/") - self.generate_proto("../../../../proto/error", "./gs_interactive/client/generated/", ["interactive.proto"]) + self.generate_proto( + "../../../../interactive_engine/executor/ir/proto/", + "./gs_interactive/client/generated/", + ) + self.generate_proto( + "../../../../proto/error", + "./gs_interactive/client/generated/", + ["interactive.proto"], + ) + setup( name=NAME, diff --git a/flex/storages/metadata/metadata_store_factory.h b/flex/storages/metadata/metadata_store_factory.h index 181452c1c5f2..84428c0e8eeb 100644 --- a/flex/storages/metadata/metadata_store_factory.h +++ b/flex/storages/metadata/metadata_store_factory.h @@ -17,9 +17,9 @@ #define FLEX_STORAGES_METADATA_METADATA_STORE_FACTORY_H_ #include -#include "flex/storages/metadata/local_file_metadata_store.h" #include "flex/storages/metadata/default_graph_meta_store.h" #include "flex/storages/metadata/graph_meta_store.h" +#include "flex/storages/metadata/local_file_metadata_store.h" namespace gs { diff --git a/flex/utils/mmap_vector.h b/flex/utils/mmap_vector.h index 410a899940f8..7bf907b426b8 100644 --- a/flex/utils/mmap_vector.h +++ b/flex/utils/mmap_vector.h @@ -78,5 +78,5 @@ class mmap_vector { mmap_array array_; size_t size_; }; -}; // namespace gs +}; // namespace gs #endif // GRAPHSCOPE_UTILS_MMAP_VECTOR_H_